Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

453 行
16 KiB

import logging
import numpy as np
from typing import Any, Dict # , Optional
import tensorflow as tf
import tensorflow_probability as tfp
from mlagents.envs.timers import timed
from mlagents.envs.brain import BrainInfo, BrainParameters
from mlagents.envs.action_info import ActionInfo
from mlagents.trainers.models import EncoderType # , LearningRateSchedule
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)
logger = logging.getLogger("mlagents.trainers")
class VectorEncoder(tf.keras.layers.Layer):
def __init__(self, hidden_size, num_layers, **kwargs):
super(VectorEncoder, self).__init__(**kwargs)
self.layers = []
for i in range(num_layers):
self.layers.append(tf.keras.layers.Dense(hidden_size))
def call(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x
class Critic(tf.keras.layers.Layer):
def __init__(self, stream_names, encoder, **kwargs):
super(Critic, self).__init__(**kwargs)
self.stream_names = stream_names
self.encoder = encoder
self.value_heads = {}
for name in stream_names:
value = tf.keras.layers.Dense(1, name="{}_value".format(name))
self.value_heads[name] = value
def call(self, inputs):
hidden = self.encoder(inputs)
value_outputs = {}
for stream_name, value in self.value_heads.items():
value_outputs[stream_name] = self.value_heads[stream_name](hidden)
return value_outputs
class GaussianDistribution(tf.keras.layers.Layer):
def __init__(self, num_outputs, **kwargs):
super(GaussianDistribution, self).__init__(**kwargs)
self.mu = tf.keras.layers.Dense(
num_outputs,
kernel_initializer=tf.keras.initializers.VarianceScaling(scale=0.01),
)
# self.log_sigma_sq = tf.keras.layers.Dense(
# num_outputs,
# kernel_initializer=tf.keras.initializers.VarianceScaling(scale=0.01),
# )
self.log_sigma_sq = tf.Variable(
name="log_sig_sq", dtype=tf.float32, initial_value=tf.zeros([num_outputs])
)
def call(self, inputs):
mu = self.mu(inputs)
log_sig = self.log_sigma_sq
return tfp.distributions.MultivariateNormalDiag(
loc=mu, scale_diag=tf.sqrt(tf.exp(log_sig))
)
class Normalizer(tf.keras.layers.Layer):
def __init__(self, vec_obs_size, **kwargs):
super(Normalizer, self).__init__(**kwargs)
print(vec_obs_size)
self.normalization_steps = tf.Variable(
name="normalization_steps", trainable=False, dtype=tf.int32, initial_value=1
)
self.running_mean = tf.Variable(
name="running_mean",
shape=[vec_obs_size],
trainable=False,
dtype=tf.float32,
initial_value=tf.zeros([vec_obs_size]),
)
self.running_variance = tf.Variable(
name="running_variance",
shape=[vec_obs_size],
trainable=False,
dtype=tf.float32,
initial_value=tf.ones([vec_obs_size]),
)
def call(self, inputs):
normalized_state = tf.clip_by_value(
(inputs - self.running_mean)
/ tf.sqrt(
self.running_variance
/ (tf.cast(self.normalization_steps, tf.float32) + 1)
),
-5,
5,
name="normalized_state",
)
return normalized_state
def update(self, vector_input):
mean_current_observation = tf.cast(
tf.reduce_mean(vector_input, axis=0), tf.float32
)
new_mean = self.running_mean + (
mean_current_observation - self.running_mean
) / tf.cast(tf.add(self.normalization_steps, 1), tf.float32)
new_variance = self.running_variance + (mean_current_observation - new_mean) * (
mean_current_observation - self.running_mean
)
self.running_mean.assign(new_mean)
self.running_variance.assign(new_variance)
self.normalization_steps.assign(self.normalization_steps + 1)
class ActorCriticPolicy(tf.keras.Model):
def __init__(
self,
h_size,
act_size,
normalize,
num_layers,
m_size,
stream_names,
vis_encode_type,
):
super(ActorCriticPolicy, self).__init__()
self.encoder = VectorEncoder(h_size, num_layers)
self.distribution = GaussianDistribution(act_size)
self.critic = Critic(stream_names, VectorEncoder(h_size, num_layers))
self.act_size = act_size
self.normalize = normalize
self.normalizer = None
def build(self, input_size):
self.normalizer = Normalizer(input_size[1])
def call(self, inputs):
if self.normalize:
inputs = self.normalizer(inputs)
_hidden = self.encoder(inputs)
# epsilon = np.random.normal(size=(input.shape[0], self.act_size))
dist = self.distribution(_hidden)
# raw_action = dist.sample()
# action = tf.clip_by_value(raw_action, -3, 3) / 3
# log_prob = dist.log_prob(raw_action)
# entropy = dist.entropy()
return dist
def update_normalization(self, inputs):
if self.normalize:
self.normalizer.update(inputs)
def get_values(self, inputs):
if self.normalize:
inputs = self.normalizer(inputs)
return self.critic(inputs)
class PPOPolicy(object):
def __init__(
self,
seed: int,
brain: BrainParameters,
trainer_params: Dict[str, Any],
is_training: bool,
load: bool,
):
"""
Policy for Proximal Policy Optimization Networks.
:param seed: Random seed.
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param is_training: Whether the model should be trained.
:param load: Whether a pre-trained model will be loaded or a new one created.
"""
# super().__init__(seed, brain, trainer_params)
reward_signal_configs = trainer_params["reward_signals"]
self.inference_dict: Dict[str, tf.Tensor] = {}
self.update_dict: Dict[str, tf.Tensor] = {}
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
}
self.create_model(
brain, trainer_params, reward_signal_configs, is_training, load, seed
)
self.brain = brain
self.trainer_params = trainer_params
self.optimizer = tf.keras.optimizers.Adam(
lr=self.trainer_params["learning_rate"]
)
self.sequence_length = (
1
if not self.trainer_params["use_recurrent"]
else self.trainer_params["sequence_length"]
)
self.global_step = tf.Variable(0)
self.create_reward_signals(reward_signal_configs)
def create_model(
self, brain, trainer_params, reward_signal_configs, is_training, load, seed
):
"""
Create PPO model
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param reward_signal_configs: Reward signal config
:param seed: Random seed.
"""
self.model = ActorCriticPolicy(
h_size=int(trainer_params["hidden_units"]),
act_size=sum(brain.vector_action_space_size),
normalize=trainer_params["normalize"],
num_layers=int(trainer_params["num_layers"]),
m_size=trainer_params["memory_size"],
stream_names=list(reward_signal_configs.keys()),
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
def ppo_value_loss(self, values, old_values, returns):
"""
Creates training-specific Tensorflow ops for PPO models.
:param probs: Current policy probabilities
:param old_probs: Past policy probabilities
:param value_heads: Value estimate tensors from each value stream
:param beta: Entropy regularization strength
:param entropy: Current policy entropy
:param epsilon: Value for policy-divergence threshold
:param lr: Learning rate
:param max_step: Total number of training steps.
"""
decay_epsilon = self.trainer_params["epsilon"]
value_losses = []
for name, head in values.items():
clipped_value_estimate = old_values[name] + tf.clip_by_value(
tf.reduce_sum(head, axis=1) - old_values[name],
-decay_epsilon,
decay_epsilon,
)
v_opt_a = tf.math.squared_difference(
returns[name], tf.reduce_sum(head, axis=1)
)
v_opt_b = tf.math.squared_difference(returns[name], clipped_value_estimate)
value_loss = tf.reduce_mean(tf.maximum(v_opt_a, v_opt_b))
value_losses.append(value_loss)
value_loss = tf.reduce_mean(value_losses)
return value_loss
def ppo_policy_loss(self, advantages, probs, old_probs, masks, epsilon):
"""
Creates training-specific Tensorflow ops for PPO models.
:param probs: Current policy probabilities
:param old_probs: Past policy probabilities
:param value_heads: Value estimate tensors from each value stream
:param beta: Entropy regularization strength
:param entropy: Current policy entropy
:param epsilon: Value for policy-divergence threshold
:param lr: Learning rate
:param max_step: Total number of training steps.
"""
advantage = tf.expand_dims(advantages, -1)
decay_epsilon = self.trainer_params["epsilon"]
r_theta = tf.exp(probs - old_probs)
p_opt_a = r_theta * advantage
p_opt_b = (
tf.clip_by_value(r_theta, 1.0 - decay_epsilon, 1.0 + decay_epsilon)
* advantage
)
policy_loss = -tf.reduce_mean(tf.minimum(p_opt_a, p_opt_b))
# For cleaner stats reporting
# abs_policy_loss = tf.abs(policy_loss)
return policy_loss
def create_reward_signals(self, reward_signal_configs):
"""
Create reward signals
:param reward_signal_configs: Reward signal config.
"""
self.reward_signals = {}
# with self.graph.as_default():
# Create reward signals
for reward_signal, config in reward_signal_configs.items():
self.reward_signals[reward_signal] = create_reward_signal(
self, self.model, reward_signal, config
)
self.update_dict.update(self.reward_signals[reward_signal].update_dict)
@timed
def evaluate(self, brain_info):
"""
Evaluates policy for the agent experiences provided.
:param brain_info: BrainInfo object containing inputs.
:return: Outputs from network as defined by self.inference_dict.
"""
run_out = {}
action_dist = self.model(brain_info.vector_observations)
action = action_dist.sample()
log_probs = action_dist.log_prob(action)
entropy = action_dist.entropy()
run_out["action"] = action.numpy()
run_out["log_probs"] = log_probs.numpy()
run_out["entropy"] = entropy.numpy()
run_out["value_heads"] = {
name: t.numpy()
for name, t in self.model.get_values(brain_info.vector_observations).items()
}
run_out["value"] = np.mean(list(run_out["value_heads"].values()), 0)
run_out["learning_rate"] = 0.0
self.model.update_normalization(brain_info.vector_observations)
return run_out
def get_action(self, brain_info: BrainInfo) -> ActionInfo:
"""
Decides actions given observations information, and takes them in environment.
:param brain_info: A dictionary of brain names and BrainInfo from environment.
:return: an ActionInfo containing action, memories, values and an object
to be passed to add experiences
"""
if len(brain_info.agents) == 0:
return ActionInfo([], [], None)
run_out = self.evaluate(brain_info) # pylint: disable=assignment-from-no-return
return ActionInfo(
action=run_out.get("action"), value=run_out.get("value"), outputs=run_out
)
@timed
def update(self, mini_batch, num_sequences):
"""
Performs update on model.
:param mini_batch: Batch of experiences.
:param num_sequences: Number of sequences to process.
:return: Results of update.
"""
with tf.GradientTape() as tape:
returns = {}
old_values = {}
for name in self.reward_signals:
returns[name] = mini_batch["{}_returns".format(name)]
old_values[name] = mini_batch["{}_value_estimates".format(name)]
obs = np.array(mini_batch["vector_obs"])
values = self.model.get_values(obs)
dist = self.model(obs)
probs = dist.log_prob(np.array(mini_batch["actions"]))
entropy = dist.entropy()
value_loss = self.ppo_value_loss(values, old_values, returns)
policy_loss = self.ppo_policy_loss(
np.array(mini_batch["advantages"]),
probs,
np.array(mini_batch["action_probs"]),
np.array(mini_batch["masks"], dtype=np.uint32),
1e-3,
)
loss = (
policy_loss
+ 0.5 * value_loss
- self.trainer_params["beta"] * tf.reduce_mean(entropy)
)
grads = tape.gradient(loss, self.model.trainable_weights)
# for grad,weight in zip(grads, self.model.trainable_weights):
# if "critic/" in weight.name:
# print(grad,weight.name)
self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
update_stats = {}
update_stats["Losses/Policy Loss"] = policy_loss
update_stats["Losses/Value Loss"] = value_loss
# for stat_name, update_name in stats_needed.items():
# update_stats[stat_name] = update_vals[update_name]
return update_stats
def get_value_estimates(
self, brain_info: BrainInfo, idx: int, done: bool
) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param brain_info: BrainInfo to be used for bootstrapping.
:param idx: Index in BrainInfo of agent.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
value_estimates = self.model.get_values(
np.expand_dims(brain_info.vector_observations[idx], 0)
)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates
@property
def vis_obs_size(self):
return self.brain.number_visual_observations
@property
def vec_obs_size(self):
return self.brain.vector_observation_space_size
@property
def use_vis_obs(self):
return self.vis_obs_size > 0
@property
def use_vec_obs(self):
return self.vec_obs_size > 0
@property
def use_recurrent(self):
return False
@property
def use_continuous_act(self):
return True
def get_current_step(self):
"""
Gets current model step.
:return: current model step.
"""
step = self.global_step.numpy()
return step
def increment_step(self, n_steps):
"""
Increments model step.
"""
self.global_step.assign(self.global_step + n_steps)
return self.get_current_step()