import logging |
# import numpy as np |
import numpy as np |
from mlagents.tf_utils import tf |
import tensorflow as tf |
from mlagents.envs.action_info import ActionInfo |
from mlagents.trainers.models import EncoderType # , LearningRateSchedule |
# from mlagents.trainers.ppo.models import PPOModel |
self.mu = tf.keras.layers.Dense(num_outputs) |
self.log_sigma_sq = tf.keras.layers.Dense(num_outputs) |
def call(self, inputs, epsilon): |
def call(self, inputs): |
return tfp.distrbutions.Normal(loc=mu, scale=tf.sqrt(tf.exp(log_sig))) |
return tfp.distributions.Normal(loc=mu, scale=tf.sqrt(tf.exp(log_sig))) |
# action = mu + tf.sqrt(tf.exp(log_sig)) + epsilon |
# def log_probs(self, inputs) # Compute probability of model output. |
self.create_model( |
brain, trainer_params, reward_signal_configs, is_training, load, seed |
) |
self.brain = brain |
self.sequence_length = 1 if not self.trainer_params["use_recurrent"] else self.trainer_params["sequence_length"] |
self.global_step = tf.Variable(0) |
self.create_reward_signals(reward_signal_configs) |
# with self.graph.as_default(): |
:param seed: Random seed. |
""" |
self.model = ActorCriticPolicy( |
brain=brain, |
act_size=sum(brain.vector_action_space_size), |
m_size=self.m_size, |
m_size=trainer_params["memory_size"], |
stream_names=list(reward_signal_configs.keys()), |
vis_encode_type=EncoderType( |
trainer_params.get("vis_encode_type", "simple") |
entropy, |
beta, |
epsilon, |
lr, |
max_step, |
): |
""" |
Creates training-specific Tensorflow ops for PPO models. |
# ) |
decay_epsilon = self.trainer_params["epsilon"] |
decay_beta = self.trainer_params["beta"] |
# max_step = self.trainer_params["max_step"] |
value_losses = [] |
for name, head in values.items(): |
decay_epsilon, |
) |
v_opt_a = tf.squared_difference(returns[name], tf.reduce_sum(head, axis=1)) |
v_opt_b = tf.squared_difference(returns[name], clipped_value_estimate) |
v_opt_a = tf.math.squared_difference(returns[name], tf.reduce_sum(head, axis=1)) |
v_opt_b = tf.math.squared_difference(returns[name], clipped_value_estimate) |
value_loss = tf.reduce_mean( |
tf.dynamic_partition(tf.maximum(v_opt_a, v_opt_b), masks, 2)[1] |
) |
tf.clip_by_value(r_theta, 1.0 - decay_epsilon, 1.0 + decay_epsilon) |
* advantage |
) |
policy_loss = -tf.reduce_mean( |
tf.dynamic_partition(tf.minimum(p_opt_a, p_opt_b), masks, 2)[1] |
) |
policy_loss = -tf.reduce_mean(tf.minimum(p_opt_a, p_opt_b)) |
# For cleaner stats reporting |
# abs_policy_loss = tf.abs(policy_loss) |
- decay_beta * tf.reduce_mean(tf.dynamic_partition(entropy, masks, 2)[1]) |
- decay_beta * tf.reduce_mean(entropy) |
) |
return loss |
:param reward_signal_configs: Reward signal config. |
""" |
self.reward_signals = {} |
with self.graph.as_default(): |
# Create reward signals |
for reward_signal, config in reward_signal_configs.items(): |
self.reward_signals[reward_signal] = create_reward_signal( |
self, self.model, reward_signal, config |
) |
self.update_dict.update(self.reward_signals[reward_signal].update_dict) |
# with self.graph.as_default(): |
# Create reward signals |
for reward_signal, config in reward_signal_configs.items(): |
self.reward_signals[reward_signal] = create_reward_signal( |
self, self.model, reward_signal, config |
) |
self.update_dict.update(self.reward_signals[reward_signal].update_dict) |
@timed |
def evaluate(self, brain_info): |
""" |
run_out = {} |
run_out["action"], run_out["log_probs"], run_out["entropy"] = self.model.act( |
brain_info.vector_observations |
) |
run_out["value_heads"] = self.model.get_values(brain_info.vector_observations) |
run_out["value"] = tf.reduce_mean(list(self.value_heads.values()), 0) |
action, log_probs, entropy = self.model.act(brain_info.vector_observations) |
run_out["action"] = action.numpy() |
run_out["log_probs"] = log_probs.numpy() |
run_out["entropy"] = entropy.numpy() |
run_out["value_heads"] = { |
name: t.numpy() |
for name, t in self.model.get_values(brain_info.vector_observations).items() |
} |
run_out["value"] = np.mean(list(run_out["value_heads"].values()), 0) |
print(run_out["value_heads"]) |
def get_action(self, brain_info: BrainInfo) -> ActionInfo: |
""" |
Decides actions given observations information, and takes them in environment. |
:param brain_info: A dictionary of brain names and BrainInfo from environment. |
:return: an ActionInfo containing action, memories, values and an object |
to be passed to add experiences |
""" |
if len(brain_info.agents) == 0: |
return ActionInfo([], [], None) |
run_out = self.evaluate(brain_info) # pylint: disable=assignment-from-no-return |
return ActionInfo( |
action=run_out.get("action"), value=run_out.get("value"), outputs=run_out |
) |
@timed |
def update(self, mini_batch, num_sequences): |
""" |
returns[name] = mini_batch["{}_returns".format(name)] |
old_values[name] = mini_batch["{}_value_estimates".format(name)] |
values = self.model.get_values(mini_batch["vector_obs"]) |
action, probs, entropy = self.model.act(mini_batch["vector_obs"]) |
obs = np.array(mini_batch["vector_obs"]) |
values = self.model.get_values(obs) |
action, probs, entropy = self.model.act(obs) |
loss = self.ppo_loss( |
mini_batch["advantages"], |
probs, |
returns, |
mini_batch["masks"], |
np.array(mini_batch["masks"], dtype=np.uint32), |
entropy, |
1e-3, |
1000, |
print(grads) |
self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights)) |
update_stats = {} |
# feed_dict[self.model.prev_action] = [ |
# brain_info.previous_vector_actions[idx] |
# ] |
value_estimates = self.model.get_values(brain_info.vector_observations[idx]) |
value_estimates = self.model.get_values(np.expand_dims(brain_info.vector_observations[idx],0)) |
value_estimates = {k: float(v) for k, v in value_estimates.items()} |
value_estimates[k] = 0.0 |
return value_estimates |
@property |
def vis_obs_size(self): |
return self.brain.number_visual_observations |
@property |
def vec_obs_size(self): |
return self.brain.vector_observation_space_size |
@property |
def use_vis_obs(self): |
return self.vis_obs_size > 0 |
@property |
def use_vec_obs(self): |
return self.vec_obs_size > 0 |
@property |
def use_recurrent(self): |
return False |
@property |
def use_continuous_act(self): |
return True |
def get_current_step(self): |
""" |
Gets current model step. |
:return: current model step. |
""" |
step = self.global_step.numpy() |
return step |
def increment_step(self, n_steps): |
""" |
Increments model step. |
""" |
self.global_step.assign(self.global_step + n_steps) |
return self.get_current_step() |