# # Unity ML-Agents Toolkit
import logging
from typing import Dict , List , Deque , Any
import time
import abc
from mlagents.tf_utils import tf
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.policy import Policy
from mlagents_envs.timers import hierarchical_timer
LOGGER = logging . getLogger ( " mlagents.trainers " )
pass
class Trainer ( object ) :
class Trainer ( abc . ABC ) :
""" This class is the base class for the mlagents_envs.trainers """
def __init__ (
self . cumulative_returns_since_policy_update : List [ float ] = [ ]
self . is_training = training
self . _reward_buffer : Deque [ float ] = deque ( maxlen = reward_buff_cap )
self . policy_queues : List [ AgentManagerQueue [ Policy ] ] = [ ]
self . trajectory_queues : List [ AgentManagerQueue [ Trajectory ] ] = [ ]
self . training_start_time = time . time ( )
self . summary_freq = self . trainer_parameters [ " summary_freq " ]
self . next_update_step = self . summary_freq
def check_param_keys ( self ) :
def _check_param_keys ( self ) :
for k in self . param_keys :
if k not in self . trainer_parameters :
raise UnityTrainerException (
LOGGER . info ( " Could not write text summary for Tensorboard. " )
pass
def dict_to_str ( self , param_dict : Dict [ str , Any ] , num_tabs : int ) - > str :
def _ dict_to_str( self , param_dict : Dict [ str , Any ] , num_tabs : int ) - > str :
"""
Takes a parameter dictionary and converts it to a human - readable string .
Recurses if there are multiple levels of dict . Used to print out hyperaparameters .
" \t "
+ " " * num_tabs
+ " {0}: \t {1} " . format (
x , self . dict_to_str ( param_dict [ x ] , num_tabs + 1 )
x , self . _ dict_to_str( param_dict [ x ] , num_tabs + 1 )
)
for x in param_dict
]
return """ Hyperparameters for the {0} of brain {1}: \n {2} """ . format (
self . __class__ . __name__ ,
self . brain_name ,
self . dict_to_str ( self . trainer_parameters , 0 ) ,
self . _ dict_to_str( self . trainer_parameters , 0 ) ,
)
@property
return self . trainer_parameters
@property
def get_max_steps ( self ) - > floa t:
def get_max_steps ( self ) - > in t:
return float ( self . trainer_parameters [ " max_steps " ] )
return int ( float ( self . trainer_parameters [ " max_steps " ] ) )
@property
def get_step ( self ) - > int :
return self . step
@property
def should_still_train ( self ) - > bool :
"""
Returns whether or not the trainer should train . A Trainer could
stop training if it wasn ' t training to begin with, or if max_steps
is reached .
"""
return self . is_training and self . get_step < = self . get_max_steps
@property
def reward_buffer ( self ) - > Deque [ float ] :
"""
Returns the reward buffer . The reward buffer contains the cumulative
"""
return self . _reward_buffer
def increment_step ( self , n_steps : int ) - > None :
def _increment_step ( self , n_steps : int , name_behavior_id : str ) - > None :
self . next_update_step = self . step + (
self . summary_freq - self . step % self . summary_freq
)
p = self . get_policy ( name_behavior_id )
if p :
p . increment_step ( n_steps )
def save_model ( self , name_behavior_id : str ) - > None :
"""
"""
self . get_policy ( name_behavior_id ) . export_model ( )
def write_summary ( self , global_step : int , delta_train_start : float ) - > None :
def _write_summary ( self , step : int ) - > None :
: param delta_train_start : Time elapsed since training started .
: param global_step : The number of steps the simulation has been going for
if (
global_step % self . trainer_parameters [ " summary_freq " ] == 0
and global_step != 0
) :
is_training = (
" Training. "
if self . is_training and self . get_step < = self . get_max_steps
else " Not Training. "
is_training = " Training. " if self . should_still_train else " Not Training. "
stats_summary = self . stats_reporter . get_stats_summaries (
" Environment/Cumulative Reward "
)
if stats_summary . num > 0 :
LOGGER . info (
" {}: {}: Step: {}. "
" Time Elapsed: {:0.3f} s "
" Mean "
" Reward: {:0.3f} "
" . Std of Reward: {:0.3f}. {} " . format (
self . run_id ,
self . brain_name ,
step ,
time . time ( ) - self . training_start_time ,
stats_summary . mean ,
stats_summary . std ,
is_training ,
)
step = min ( self . get_step , self . get_max_steps )
stats_summary = self . stats_reporter . get_stats_summaries (
" Environment/Cumulative Reward "
)
if stats_summary . num > 0 :
LOGGER . info (
" {}: {}: Step: {}. "
" Time Elapsed: {:0.3f} s "
" Mean "
" Reward: {:0.3f} "
" . Std of Reward: {:0.3f}. {} " . format (
self . run_id ,
self . brain_name ,
step ,
delta_train_start ,
stats_summary . mean ,
stats_summary . std ,
is_training ,
)
)
set_gauge ( f " {self.brain_name}.mean_reward " , stats_summary . mean )
else :
LOGGER . info (
" {}: {}: Step: {}. No episode was completed since last summary. {} " . format (
self . run_id , self . brain_name , step , is_training
)
set_gauge ( f " {self.brain_name}.mean_reward " , stats_summary . mean )
else :
LOGGER . info (
" {}: {}: Step: {}. No episode was completed since last summary. {} " . format (
self . run_id , self . brain_name , step , is_training
self . stats_reporter . write_stats ( int ( step ) )
)
self . stats_reporter . write_stats ( int ( step ) )
def process_trajectory ( self , trajectory : Trajectory ) - > None :
@abc.abstractmethod
def _process_trajectory ( self , trajectory : Trajectory ) - > None :
Processing involves calculating value and advantage targets for model updating step .
raise UnityTrainerException (
" The process_experiences method was not implemented. "
)
self . _maybe_write_summary ( self . get_step + len ( trajectory . steps ) )
self . _increment_step ( len ( trajectory . steps ) , trajectory . behavior_id )
def _maybe_write_summary ( self , step_after_process : int ) - > None :
"""
If processing the trajectory will make the step exceed the next summary write ,
write the summary . This logic ensures summaries are written on the update step and not in between .
: param step_after_process : the step count after processing the next trajectory .
"""
if step_after_process > = self . next_update_step and self . get_step != 0 :
self . _write_summary ( self . next_update_step )
@abc.abstractmethod
raise UnityTrainerException ( " The end_episode method was not implemented. " )
pass
@abc.abstractmethod
def create_policy ( self , brain_parameters : BrainParameters ) - > TFPolicy :
"""
Creates policy
"""
pass
@abc.abstractmethod
def add_policy ( self , name_behavior_id : str , policy : TFPolicy ) - > None :
"""
Adds policy to trainer
"""
pass
@abc.abstractmethod
def get_policy ( self , name_behavior_id : str ) - > TFPolicy :
"""
Gets policy from trainer
"""
pass
def is_ready_update ( self ) :
@abc.abstractmethod
def _is_ready_update ( self ) :
raise UnityTrainerException ( " The is_ready_update method was not implemented. " )
return False
def update_policy ( self ) :
@abc.abstractmethod
def _update_policy ( self ) :
raise UnityTrainerException ( " The update_model method was not implemented. " )
pass
def create_policy ( self , brain_parameters : BrainParameters ) - > TFPolicy :
def advance ( self ) - > None :
Creates policy
Steps the trainer , taking in trajectories and updates if ready .
raise UnityTrainerException ( " The create_policy method was not implemented. " )
with hierarchical_timer ( " process_trajectory " ) :
for traj_queue in self . trajectory_queues :
try :
t = traj_queue . get_nowait ( )
self . _process_trajectory ( t )
except AgentManagerQueue . Empty :
pass
if self . should_still_train :
if self . _is_ready_update ( ) :
with hierarchical_timer ( " _update_policy " ) :
self . _update_policy ( )
for q in self . policy_queues :
# Get policies that correspond to the policy queue in question
q . put ( self . get_policy ( q . behavior_id ) )
def add_policy ( self , name_behavior_id : str , policy : TFPolicy ) - > None :
def publish_policy_queue ( self , policy_queue : AgentManagerQueue [ Policy ] ) - > None :
Adds policy to trainer
Adds a policy queue to the list of queues to publish to when this Trainer
makes a policy update
: param queue : Policy queue to publish to .
raise UnityTrainerException ( " The add_policy method was not implemented " )
self . policy_queues . append ( policy_queue )
def get_policy ( self , name_behavior_id : str ) - > TFPolicy :
def subscribe_trajectory_queue (
self , trajectory_queue : AgentManagerQueue [ Trajectory ]
) - > None :
Gets policy from trainer
Adds a trajectory queue to the list of queues for the trainer injest Trajectories from .
: param queue : Trajectory queue to publish to .
raise UnityTrainerException ( " The get_policy method was not implemented. " )
def advance ( self ) - > None :
pass
self . trajectory_queues . append ( trajectory_queue )