import logging from typing import Any, Dict, Optional, List from mlagents.tf_utils import tf from mlagents_envs.timers import timed from mlagents_envs.base_env import BatchedStepResult from mlagents.trainers.brain import BrainParameters from mlagents.trainers.models import EncoderType from mlagents.trainers.models import ModelUtils from mlagents.trainers.policy.tf_policy import TFPolicy from mlagents.trainers.distributions import ( GaussianDistribution, MultiCategoricalDistribution, ) logger = logging.getLogger("mlagents.trainers") EPSILON = 1e-6 # Small value to avoid divide by zero class NNPolicy(TFPolicy): def __init__( self, seed: int, brain: BrainParameters, trainer_params: Dict[str, Any], is_training: bool, load: bool, tanh_squash: bool = False, reparameterize: bool = False, condition_sigma_on_obs: bool = True, create_tf_graph: bool = True, ): """ Policy that uses a multilayer perceptron to map the observations to actions. Could also use a CNN to encode visual input prior to the MLP. Supports discrete and continuous action spaces, as well as recurrent networks. :param seed: Random seed. :param brain: Assigned BrainParameters object. :param trainer_params: Defined training parameters. :param is_training: Whether the model should be trained. :param load: Whether a pre-trained model will be loaded or a new one created. :param tanh_squash: Whether to use a tanh function on the continuous output, or a clipped output. :param reparameterize: Whether we are using the resampling trick to update the policy in continuous output. """ super().__init__(seed, brain, trainer_params, load) self.grads = None self.update_batch: Optional[tf.Operation] = None num_layers = trainer_params["num_layers"] self.h_size = trainer_params["hidden_units"] if num_layers < 1: num_layers = 1 self.num_layers = num_layers self.vis_encode_type = EncoderType( trainer_params.get("vis_encode_type", "simple") ) self.tanh_squash = tanh_squash self.reparameterize = reparameterize self.condition_sigma_on_obs = condition_sigma_on_obs self.trainable_variables: List[tf.Variable] = [] # Non-exposed parameters; these aren't exposed because they don't have a # good explanation and usually shouldn't be touched. self.log_std_min = -20 self.log_std_max = 2 if create_tf_graph: self.create_tf_graph() def get_trainable_variables(self) -> List[tf.Variable]: """ Returns a List of the trainable variables in this policy. if create_tf_graph hasn't been called, returns empty list. """ return self.trainable_variables def create_tf_graph(self) -> None: """ Builds the tensorflow graph needed for this policy. """ with self.graph.as_default(): tf.set_random_seed(self.seed) _vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) if len(_vars) > 0: # We assume the first thing created in the graph is the Policy. If # already populated, don't create more tensors. return self.create_input_placeholders() encoded = self._create_encoder( self.visual_in, self.processed_vector_in, self.h_size, self.num_layers, self.vis_encode_type, ) if self.use_continuous_act: self._create_cc_actor( encoded, self.tanh_squash, self.reparameterize, self.condition_sigma_on_obs, ) else: self._create_dc_actor(encoded) self.trainable_variables = tf.get_collection( tf.GraphKeys.TRAINABLE_VARIABLES, scope="policy" ) self.trainable_variables += tf.get_collection( tf.GraphKeys.TRAINABLE_VARIABLES, scope="lstm" ) # LSTMs need to be root scope for Barracuda export self.inference_dict: Dict[str, tf.Tensor] = { "action": self.output, "log_probs": self.all_log_probs, "entropy": self.entropy, } if self.use_continuous_act: self.inference_dict["pre_action"] = self.output_pre if self.use_recurrent: self.inference_dict["memory_out"] = self.memory_out # We do an initialize to make the Policy usable out of the box. If an optimizer is needed, # it will re-load the full graph self._initialize_graph() @timed def evaluate( self, batched_step_result: BatchedStepResult, global_agent_ids: List[str] ) -> Dict[str, Any]: """ Evaluates policy for the agent experiences provided. :param batched_step_result: BatchedStepResult object containing inputs. :param global_agent_ids: The global (with worker ID) agent ids of the data in the batched_step_result. :return: Outputs from network as defined by self.inference_dict. """ feed_dict = { self.batch_size_ph: batched_step_result.n_agents(), self.sequence_length_ph: 1, } if self.use_recurrent: if not self.use_continuous_act: feed_dict[self.prev_action] = self.retrieve_previous_action( global_agent_ids ) feed_dict[self.memory_in] = self.retrieve_memories(global_agent_ids) feed_dict = self.fill_eval_dict(feed_dict, batched_step_result) run_out = self._execute_model(feed_dict, self.inference_dict) return run_out def _create_encoder( self, visual_in: List[tf.Tensor], vector_in: tf.Tensor, h_size: int, num_layers: int, vis_encode_type: EncoderType, ) -> tf.Tensor: """ Creates an encoder for visual and vector observations. :param h_size: Size of hidden linear layers. :param num_layers: Number of hidden linear layers. :param vis_encode_type: Type of visual encoder to use if visual input. :return: The hidden layer (tf.Tensor) after the encoder. """ with tf.variable_scope("policy"): encoded = ModelUtils.create_observation_streams( self.visual_in, self.processed_vector_in, 1, h_size, num_layers, vis_encode_type, )[0] return encoded def _create_cc_actor( self, encoded: tf.Tensor, tanh_squash: bool = False, reparameterize: bool = False, condition_sigma_on_obs: bool = True, ) -> None: """ Creates Continuous control actor-critic model. :param h_size: Size of hidden linear layers. :param num_layers: Number of hidden linear layers. :param vis_encode_type: Type of visual encoder to use if visual input. :param tanh_squash: Whether to use a tanh function, or a clipped output. :param reparameterize: Whether we are using the resampling trick to update the policy. """ if self.use_recurrent: self.memory_in = tf.placeholder( shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in" ) hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder( encoded, self.memory_in, self.sequence_length_ph, name="lstm_policy" ) self.memory_out = tf.identity(memory_policy_out, name="recurrent_out") else: hidden_policy = encoded with tf.variable_scope("policy"): distribution = GaussianDistribution( hidden_policy, self.act_size, reparameterize=reparameterize, tanh_squash=tanh_squash, ) if tanh_squash: self.output_pre = distribution.sample self.output = tf.identity(self.output_pre, name="action") else: self.output_pre = distribution.sample # Clip and scale output to ensure actions are always within [-1, 1] range. output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3 self.output = tf.identity(output_post, name="action") self.selected_actions = tf.stop_gradient(self.output) self.all_log_probs = tf.identity(distribution.log_probs, name="action_probs") self.entropy = distribution.entropy # We keep these tensors the same name, but use new nodes to keep code parallelism with discrete control. self.total_log_probs = distribution.total_log_probs def _create_dc_actor(self, encoded: tf.Tensor) -> None: """ Creates Discrete control actor-critic model. :param h_size: Size of hidden linear layers. :param num_layers: Number of hidden linear layers. :param vis_encode_type: Type of visual encoder to use if visual input. """ if self.use_recurrent: self.prev_action = tf.placeholder( shape=[None, len(self.act_size)], dtype=tf.int32, name="prev_action" ) prev_action_oh = tf.concat( [ tf.one_hot(self.prev_action[:, i], self.act_size[i]) for i in range(len(self.act_size)) ], axis=1, ) hidden_policy = tf.concat([encoded, prev_action_oh], axis=1) self.memory_in = tf.placeholder( shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in" ) hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder( hidden_policy, self.memory_in, self.sequence_length_ph, name="lstm_policy", ) self.memory_out = tf.identity(memory_policy_out, "recurrent_out") else: hidden_policy = encoded self.action_masks = tf.placeholder( shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks" ) with tf.variable_scope("policy"): distribution = MultiCategoricalDistribution( hidden_policy, self.act_size, self.action_masks ) # It's important that we are able to feed_dict a value into this tensor to get the # right one-hot encoding, so we can't do identity on it. self.output = distribution.sample self.all_log_probs = tf.identity(distribution.log_probs, name="action") self.selected_actions = tf.stop_gradient( distribution.sample_onehot ) # In discrete, these are onehot self.entropy = distribution.entropy self.total_log_probs = distribution.total_log_probs