import logging
from enum import Enum
from typing import Callable , Dict , List , Optional
from typing import Callable , Dict , List , Tuple , NamedTuple
from mlagents.trainers.tra iner import UnityTrainerException
from mlagents.trainers.excep tio n import UnityTrainerException
from mlagents.trainers.brain import CameraResolution
logger = logging . getLogger ( " mlagents.trainers " )
LINEAR = " linear "
class LearningModel :
_version_number_ = 2
class NormalizerTensors ( NamedTuple ) :
update_op : tf . Operation
steps : tf . Tensor
running_mean : tf . Tensor
running_variance : tf . Tensor
class ModelUtils :
# Minimum supported side for each encoder type. If refactoring an encoder, please
# adjust these also.
MIN_RESOLUTION_FOR_ENCODER = {
}
def __init__ (
self , m_size , normalize , use_recurrent , brain , seed , stream_names = None
) :
tf . set_random_seed ( seed )
self . brain = brain
self . vector_in = None
self . global_step , self . increment_step , self . steps_to_increment = (
self . create_global_steps ( )
)
self . visual_in = [ ]
self . batch_size = tf . placeholder ( shape = None , dtype = tf . int32 , name = " batch_size " )
self . sequence_length = tf . placeholder (
shape = None , dtype = tf . int32 , name = " sequence_length "
)
self . mask_input = tf . placeholder ( shape = [ None ] , dtype = tf . float32 , name = " masks " )
self . mask = tf . cast ( self . mask_input , tf . int32 )
self . stream_names = stream_names or [ ]
self . use_recurrent = use_recurrent
if self . use_recurrent :
self . m_size = m_size
else :
self . m_size = 0
self . normalize = normalize
self . act_size = brain . vector_action_space_size
self . vec_obs_size = brain . vector_observation_space_size
self . vis_obs_size = brain . number_visual_observations
tf . Variable (
int ( brain . vector_action_space_type == " continuous " ) ,
name = " is_continuous_control " ,
trainable = False ,
dtype = tf . int32 ,
)
tf . Variable (
self . _version_number_ ,
name = " version_number " ,
trainable = False ,
dtype = tf . int32 ,
)
tf . Variable ( self . m_size , name = " memory_size " , trainable = False , dtype = tf . int32 )
if brain . vector_action_space_type == " continuous " :
tf . Variable (
self . act_size [ 0 ] ,
name = " action_output_shape " ,
trainable = False ,
dtype = tf . int32 ,
)
else :
tf . Variable (
sum ( self . act_size ) ,
name = " action_output_shape " ,
trainable = False ,
dtype = tf . int32 ,
)
self . value_heads : Dict [ str , tf . Tensor ] = { }
self . normalization_steps : Optional [ tf . Variable ] = None
self . running_mean : Optional [ tf . Variable ] = None
self . running_variance : Optional [ tf . Variable ] = None
self . update_normalization : Optional [ tf . Operation ] = None
self . value : Optional [ tf . Tensor ] = None
self . all_log_probs : Optional [ tf . Tensor ] = None
self . output : Optional [ tf . Tensor ] = None
self . selected_actions : Optional [ tf . Tensor ] = None
self . action_holder : Optional [ tf . Tensor ] = None
@staticmethod
def create_global_steps ( ) :
""" Creates TF ops to track and increment global training step. """
global_step : tf . Tensor ,
max_step : int ,
) - > tf . Tensor :
"""
Create a learning rate tensor .
: param lr_schedule : Type of learning rate schedule .
: param lr : Base learning rate .
: param global_step : A TF Tensor representing the total global step .
: param max_step : The maximum number of steps in the training run .
: return : A Tensor containing the learning rate .
"""
if lr_schedule == LearningRateSchedule . CONSTANT :
learning_rate = tf . Variable ( lr )
elif lr_schedule == LearningRateSchedule . LINEAR :
)
return visual_in
def create_vector_input ( self , name = " vector_observation " ) :
@staticmethod
def create_visual_input_placeholders (
camera_resolutions : List [ CameraResolution ]
) - > List [ tf . Tensor ] :
"""
Creates input placeholders for visual inputs .
: param camera_resolutions : A List of CameraResolutions that specify the resolutions
of the input visual observations .
: returns : A List of Tensorflow placeholders where the input iamges should be fed .
"""
visual_in : List [ tf . Tensor ] = [ ]
for i , camera_resolution in enumerate ( camera_resolutions ) :
visual_input = ModelUtils . create_visual_input (
camera_resolution , name = " visual_observation_ " + str ( i )
)
visual_in . append ( visual_input )
return visual_in
@staticmethod
def create_vector_input (
vec_obs_size : int , name : str = " vector_observation "
) - > tf . Tensor :
: param name : Name of the placeholder op .
: return :
: param name : Name of the placeholder op .
: return : Placeholder for vector observations .
self . vector_in = tf . placeholder (
shape = [ None , self . vec_obs_size ] , dtype = tf . float32 , name = name
vector_in = tf . placeholder (
shape = [ None , vec_obs_size ] , dtype = tf . float32 , name = name
if self . normalize :
self . create_normalizer ( self . vector_in )
return self . normalize_vector_obs ( self . vector_in )
else :
return self . vector_in
return vector_in
def normalize_vector_obs ( self , vector_obs ) :
@staticmethod
def normalize_vector_obs (
vector_obs : tf . Tensor ,
running_mean : tf . Tensor ,
running_variance : tf . Tensor ,
normalization_steps : tf . Tensor ,
) - > tf . Tensor :
"""
Create a normalized version of an input tensor .
: param vector_obs : Input vector observation tensor .
: param running_mean : Tensorflow tensor representing the current running mean .
: param running_variance : Tensorflow tensor representing the current running variance .
: param normalization_steps : Tensorflow tensor representing the current number of normalization_steps .
: return : A normalized version of vector_obs .
"""
( vector_obs - self . running_mean )
( vector_obs - running_mean )
self . running_variance
/ ( tf . cast ( self . normalization_steps , tf . float32 ) + 1 )
running_variance / ( tf . cast ( normalization_steps , tf . float32 ) + 1 )
) ,
- 5 ,
5 ,
def create_normalizer ( self , vector_obs ) :
self . normalization_steps = tf . get_variable (
@staticmethod
def create_normalizer ( vector_obs : tf . Tensor ) - > NormalizerTensors :
"""
Creates the normalizer and the variables required to store its state .
: param vector_obs : A Tensor representing the next value to normalize . When the
update operation is called , it will use vector_obs to update the running mean
and variance .
: return : A NormalizerTensors tuple that holds running mean , running variance , number of steps ,
and the update operation .
"""
vec_obs_size = vector_obs . shape [ 1 ]
steps = tf . get_variable (
" normalization_steps " ,
[ ] ,
trainable = False ,
self . running_mean = tf . get_variable (
running_mean = tf . get_variable (
[ self . vec_obs_size ] ,
[ vec_obs_size ] ,
self . running_variance = tf . get_variable (
running_variance = tf . get_variable (
[ self . vec_obs_size ] ,
[ vec_obs_size ] ,
self . update_normalization = self . create_normalizer_update ( vector_obs )
update_normalization = ModelUtils . create_normalizer_update (
vector_obs , steps , running_mean , running_variance
)
return NormalizerTensors (
update_normalization , steps , running_mean , running_variance
)
def create_normalizer_update ( self , vector_input ) :
@staticmethod
def create_normalizer_update (
vector_input : tf . Tensor ,
steps : tf . Tensor ,
running_mean : tf . Tensor ,
running_variance : tf . Tensor ,
) - > tf . Operation :
"""
Creates the update operation for the normalizer .
: param vector_input : Vector observation to use for updating the running mean and variance .
: param running_mean : Tensorflow tensor representing the current running mean .
: param running_variance : Tensorflow tensor representing the current running variance .
: param steps : Tensorflow tensor representing the current number of steps that have been normalized .
: return : A TF operation that updates the normalization based on vector_input .
"""
total_new_steps = tf . add ( self . normalization_steps , steps_increment )
total_new_steps = tf . add ( steps , steps_increment )
input_to_old_mean = tf . subtract ( vector_input , self . running_mean )
new_mean = self . running_mean + tf . reduce_sum (
input_to_old_mean = tf . subtract ( vector_input , running_mean )
new_mean = running_mean + tf . reduce_sum (
new_variance = self . running_variance + tf . reduce_sum (
new_variance = running_variance + tf . reduce_sum (
update_mean = tf . assign ( self . running_mean , new_mean )
update_variance = tf . assign ( self . running_variance , new_variance )
update_norm_step = tf . assign ( self . normalization_steps , total_new_steps )
update_mean = tf . assign ( running_mean , new_mean )
update_variance = tf . assign ( running_variance , new_variance )
update_norm_step = tf . assign ( steps , total_new_steps )
return tf . group ( [ update_mean , update_variance , update_norm_step ] )
@staticmethod
hidden = tf . layers . flatten ( conv2 )
with tf . variable_scope ( scope + " / " + " flat_encoding " ) :
hidden_flat = Learning Model. create_vector_observation_encoder (
hidden_flat = ModelUtils . create_vector_observation_encoder (
hidden , h_size , activation , num_layers , scope , reuse
)
return hidden_flat
hidden = tf . layers . flatten ( conv3 )
with tf . variable_scope ( scope + " / " + " flat_encoding " ) :
hidden_flat = Learning Model. create_vector_observation_encoder (
hidden_flat = ModelUtils . create_vector_observation_encoder (
hidden , h_size , activation , num_layers , scope , reuse
)
return hidden_flat
hidden = tf . layers . flatten ( hidden )
with tf . variable_scope ( scope + " / " + " flat_encoding " ) :
hidden_flat = Learning Model. create_vector_observation_encoder (
hidden_flat = ModelUtils . create_vector_observation_encoder (
hidden , h_size , activation , num_layers , scope , reuse
)
return hidden_flat
ENCODER_FUNCTION_BY_TYPE = {
EncoderType . SIMPLE : Learning Model. create_visual_observation_encoder ,
EncoderType . NATURE_CNN : Learning Model. create_nature_cnn_visual_observation_encoder ,
EncoderType . RESNET : Learning Model. create_resnet_visual_observation_encoder ,
EncoderType . SIMPLE : ModelUtils . create_visual_observation_encoder ,
EncoderType . NATURE_CNN : ModelUtils . create_nature_cnn_visual_observation_encoder ,
EncoderType . RESNET : ModelUtils . create_resnet_visual_observation_encoder ,
encoder_type , Learning Model. create_visual_observation_encoder
encoder_type , ModelUtils . create_visual_observation_encoder
)
@staticmethod
@staticmethod
def _check_resolution_for_encoder (
camera_res : CameraResolution , vis_encoder_type : EncoderType
vis_in : tf . Tensor , vis_encoder_type : EncoderType
min_res = LearningModel . MIN_RESOLUTION_FOR_ENCODER [ vis_encoder_type ]
if camera_res . height < min_res or camera_res . width < min_res :
min_res = ModelUtils . MIN_RESOLUTION_FOR_ENCODER [ vis_encoder_type ]
height = vis_in . shape [ 1 ]
width = vis_in . shape [ 2 ]
if height < min_res or width < min_res :
f " Visual observation resolution ({camera_res.width}x{camera_res.height}) is too small for "
f " Visual observation resolution ({width}x{height}) is too small for "
@staticmethod
self ,
visual_in : List [ tf . Tensor ] ,
vector_in : tf . Tensor ,
num_streams : int ,
h_size : int ,
num_layers : int ,
the scopes for each of the streams . None if all under the same TF scope .
: return : List of encoded streams .
"""
brain = self . brain
activation_fn = self . swish
self . visual_in = [ ]
for i in range ( brain . number_visual_observations ) :
LearningModel . _check_resolution_for_encoder (
brain . camera_resolutions [ i ] , vis_encode_type
)
visual_input = self . create_visual_input (
brain . camera_resolutions [ i ] , name = " visual_observation_ " + str ( i )
)
self . visual_in . append ( visual_input )
vector_observation_input = self . create_vector_input ( )
activation_fn = ModelUtils . swish
vector_observation_input = vector_in
create_encoder_func = LearningModel . get_encoder_for_type ( vis_encode_type )
create_encoder_func = ModelUtils . get_encoder_for_type ( vis_encode_type )
if self . vis_obs_size > 0 :
for j in range ( brain . number_visual_observations ) :
if len ( visual_in ) > 0 :
for j , vis_in in enumerate ( visual_in ) :
ModelUtils . _check_resolution_for_encoder ( vis_in , vis_encode_type )
self . visual_in [ j ] ,
vis_in ,
h_size ,
activation_fn ,
num_layers ,
visual_encoders . append ( encoded_visual )
hidden_visual = tf . concat ( visual_encoders , axis = 1 )
if brain . vector_observation_space_size > 0 :
hidden_state = self . create_vector_observation_encoder (
if vector_in . get_shape ( ) [ - 1 ] > 0 : # Don't encode 0-shape inputs
hidden_state = ModelUtils . create_vector_observation_encoder (
vector_observation_input ,
h_size ,
activation_fn ,
recurrent_output = tf . reshape ( recurrent_output , shape = [ - 1 , half_point ] )
return recurrent_output , tf . concat ( [ lstm_state_out . c , lstm_state_out . h ] , axis = 1 )
def create_value_heads ( self , stream_names , hidden_input ) :
@staticmethod
def create_value_heads (
stream_names : List [ str ] , hidden_input : tf . Tensor
) - > Tuple [ Dict [ str , tf . Tensor ] , tf . Tensor ] :
"""
Creates one value estimator head for each reward signal in stream_names .
Also creates the node corresponding to the mean of all the value heads in self . value .
of the hidden input .
"""
value_heads = { }
self . value_heads [ name ] = value
self . value = tf . reduce_mean ( list ( self . value_heads . values ( ) ) , 0 )
value_heads [ name ] = value
value = tf . reduce_mean ( list ( value_heads . values ( ) ) , 0 )
return value_heads , value