import logging
from collections import defaultdict
from typing import List , Any
from typing import List , Any , Dict
import numpy as np
from mlagents.trainers.ppo.multi_gpu_policy import MultiGpuPPOPolicy , get_devices
from mlagents.trainers.trainer import Trainer , UnityTrainerException
from mlagents.trainers.trainer import UnityTrainerException
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.components.reward_signals import RewardSignalResult
class PPOTrainer ( Trainer ) :
class PPOTrainer ( RLTrainer ) :
""" The PPOTrainer is an implementation of the PPO algorithm. """
def __init__ (
: param seed : The seed the model will be initialized with
: param run_id : The identifier of the current run
"""
super ( ) . __init__ ( brain , trainer_parameters , training , run_id , reward_buff_cap )
super ( PPOTrainer , self ) . __init__ (
brain , trainer_parameters , training , run_id , reward_buff_cap
)
self . param_keys = [
" batch_size " ,
" beta " ,
]
self . check_param_keys ( )
# Make sure we have at least one reward_signal
if not self . trainer_parameters [ " reward_signals " ] :
raise UnityTrainerException (
" No reward signals were defined. At least one must be used with {}. " . format (
self . __class__ . __name__
)
)
self . step = 0
if multi_gpu and len ( get_devices ( ) ) > 1 :
self . policy = MultiGpuPPOPolicy (
seed , brain , trainer_parameters , self . is_training , load
seed , brain , trainer_parameters , self . is_training , load
)
stats = defaultdict ( list )
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self . collected_rewards = { " environment " : { } }
self . stats = stats
self . training_buffer = Buffer ( )
self . episode_steps = { }
def __str__ ( self ) :
return """ Hyperparameters for the {0} of brain {1}: \n {2} """ . format (
self . __class__ . __name__ ,
self . brain_name ,
self . dict_to_str ( self . trainer_parameters , 0 ) ,
)
@property
def parameters ( self ) :
"""
Returns the trainer parameters of the trainer .
"""
return self . trainer_parameters
@property
def get_max_steps ( self ) :
"""
Returns the maximum number of steps . Is used to know when the trainer should be stopped .
: return : The maximum number of steps of the trainer
"""
return float ( self . trainer_parameters [ " max_steps " ] )
@property
def get_step ( self ) :
"""
Returns the number of steps the trainer has performed
: return : the step count of the trainer
"""
return self . step
def increment_step ( self , n_steps : int ) - > None :
"""
Increment the step count of the trainer
: param n_steps : number of steps to increment the step count by
"""
self . step = self . policy . increment_step ( n_steps )
def construct_curr_info ( self , next_info : BrainInfo ) - > BrainInfo :
"""
Constructs a BrainInfo which contains the most recent previous experiences for all agents
which correspond to the agents in a provided next_info .
: BrainInfo next_info : A t + 1 BrainInfo .
: return : curr_info : Reconstructed BrainInfo to match agents of next_info .
"""
visual_observations : List [ List [ Any ] ] = [
[ ]
] # TODO add types to brain.py methods
vector_observations = [ ]
text_observations = [ ]
memories = [ ]
rewards = [ ]
local_dones = [ ]
max_reacheds = [ ]
agents = [ ]
prev_vector_actions = [ ]
prev_text_actions = [ ]
action_masks = [ ]
for agent_id in next_info . agents :
agent_brain_info = self . training_buffer [ agent_id ] . last_brain_info
if agent_brain_info is None :
agent_brain_info = next_info
agent_index = agent_brain_info . agents . index ( agent_id )
for i in range ( len ( next_info . visual_observations ) ) :
visual_observations [ i ] . append (
agent_brain_info . visual_observations [ i ] [ agent_index ]
)
vector_observations . append (
agent_brain_info . vector_observations [ agent_index ]
)
text_observations . append ( agent_brain_info . text_observations [ agent_index ] )
if self . policy . use_recurrent :
if len ( agent_brain_info . memories ) > 0 :
memories . append ( agent_brain_info . memories [ agent_index ] )
else :
memories . append ( self . policy . make_empty_memory ( 1 ) )
rewards . append ( agent_brain_info . rewards [ agent_index ] )
local_dones . append ( agent_brain_info . local_done [ agent_index ] )
max_reacheds . append ( agent_brain_info . max_reached [ agent_index ] )
agents . append ( agent_brain_info . agents [ agent_index ] )
prev_vector_actions . append (
agent_brain_info . previous_vector_actions [ agent_index ]
)
prev_text_actions . append (
agent_brain_info . previous_text_actions [ agent_index ]
)
action_masks . append ( agent_brain_info . action_masks [ agent_index ] )
if self . policy . use_recurrent :
memories = np . vstack ( memories )
curr_info = BrainInfo (
visual_observations ,
vector_observations ,
text_observations ,
memories ,
rewards ,
agents ,
local_dones ,
prev_vector_actions ,
prev_text_actions ,
max_reacheds ,
action_masks ,
)
return curr_info
def add_experiences (
self ,
curr_all_info : AllBrainInfo ,
next_all_info : AllBrainInfo ,
take_action_outputs : ActionInfoOutputs ,
) - > None :
"""
Adds experiences to each agent ' s experience history.
: param curr_all_info : Dictionary of all current brains and corresponding BrainInfo .
: param next_all_info : Dictionary of all current brains and corresponding BrainInfo .
: param take_action_outputs : The outputs of the Policy ' s get_action method.
"""
self . trainer_metrics . start_experience_collection_timer ( )
if take_action_outputs :
self . stats [ " Policy/Entropy " ] . append ( take_action_outputs [ " entropy " ] . mean ( ) )
self . stats [ " Policy/Learning Rate " ] . append (
take_action_outputs [ " learning_rate " ]
)
for name , signal in self . policy . reward_signals . items ( ) :
self . stats [ signal . value_name ] . append (
np . mean ( take_action_outputs [ " value " ] [ name ] )
)
curr_info = curr_all_info [ self . brain_name ]
next_info = next_all_info [ self . brain_name ]
for agent_id in curr_info . agents :
self . training_buffer [ agent_id ] . last_brain_info = curr_info
self . training_buffer [
agent_id
] . last_take_action_outputs = take_action_outputs
if curr_info . agents != next_info . agents :
curr_to_use = self . construct_curr_info ( next_info )
else :
curr_to_use = curr_info
tmp_rewards_dict = { }
for name , signal in self . policy . reward_signals . items ( ) :
tmp_rewards_dict [ name ] = signal . evaluate ( curr_to_use , next_info )
for agent_id in next_info . agents :
stored_info = self . training_buffer [ agent_id ] . last_brain_info
stored_take_action_outputs = self . training_buffer [
agent_id
] . last_take_action_outputs
if stored_info is not None :
idx = stored_info . agents . index ( agent_id )
next_idx = next_info . agents . index ( agent_id )
if not stored_info . local_done [ idx ] :
for i , _ in enumerate ( stored_info . visual_observations ) :
self . training_buffer [ agent_id ] [ " visual_obs %d " % i ] . append (
stored_info . visual_observations [ i ] [ idx ]
)
self . training_buffer [ agent_id ] [ " next_visual_obs %d " % i ] . append (
next_info . visual_observations [ i ] [ next_idx ]
)
if self . policy . use_vec_obs :
self . training_buffer [ agent_id ] [ " vector_obs " ] . append (
stored_info . vector_observations [ idx ]
)
self . training_buffer [ agent_id ] [ " next_vector_in " ] . append (
next_info . vector_observations [ next_idx ]
)
if self . policy . use_recurrent :
if stored_info . memories . shape [ 1 ] == 0 :
stored_info . memories = np . zeros (
( len ( stored_info . agents ) , self . policy . m_size )
)
self . training_buffer [ agent_id ] [ " memory " ] . append (
stored_info . memories [ idx ]
)
actions = stored_take_action_outputs [ " action " ]
if self . policy . use_continuous_act :
actions_pre = stored_take_action_outputs [ " pre_action " ]
self . training_buffer [ agent_id ] [ " actions_pre " ] . append (
actions_pre [ idx ]
)
epsilons = stored_take_action_outputs [ " random_normal_epsilon " ]
self . training_buffer [ agent_id ] [ " random_normal_epsilon " ] . append (
epsilons [ idx ]
)
else :
self . training_buffer [ agent_id ] [ " action_mask " ] . append (
stored_info . action_masks [ idx ] , padding_value = 1
)
a_dist = stored_take_action_outputs [ " log_probs " ]
# value is a dictionary from name of reward to value estimate of the value head
value = stored_take_action_outputs [ " value " ]
self . training_buffer [ agent_id ] [ " actions " ] . append ( actions [ idx ] )
self . training_buffer [ agent_id ] [ " prev_action " ] . append (
stored_info . previous_vector_actions [ idx ]
)
self . training_buffer [ agent_id ] [ " masks " ] . append ( 1.0 )
self . training_buffer [ agent_id ] [ " done " ] . append (
next_info . local_done [ next_idx ]
)
for name , reward_result in tmp_rewards_dict . items ( ) :
# 0 because we use the scaled reward to train the agent
self . training_buffer [ agent_id ] [
" {}_rewards " . format ( name )
] . append ( reward_result . scaled_reward [ next_idx ] )
self . training_buffer [ agent_id ] [
" {}_value_estimates " . format ( name )
] . append ( value [ name ] [ idx ] [ 0 ] )
self . training_buffer [ agent_id ] [ " action_probs " ] . append ( a_dist [ idx ] )
for name , rewards in self . collected_rewards . items ( ) :
if agent_id not in rewards :
rewards [ agent_id ] = 0
if name == " environment " :
# Report the reward from the environment
rewards [ agent_id ] + = np . array ( next_info . rewards ) [ next_idx ]
else :
# Report the reward signals
rewards [ agent_id ] + = tmp_rewards_dict [ name ] . scaled_reward [
next_idx
]
if not next_info . local_done [ next_idx ] :
if agent_id not in self . episode_steps :
self . episode_steps [ agent_id ] = 0
self . episode_steps [ agent_id ] + = 1
self . trainer_metrics . end_experience_collection_timer ( )
def process_experiences (
self , current_info : AllBrainInfo , new_info : AllBrainInfo
) - > None :
self . policy . reward_signals [ name ] . stat_name
] . append ( rewards . get ( agent_id , 0 ) )
rewards [ agent_id ] = 0
def add_policy_outputs (
self , take_action_outputs : ActionInfoOutputs , agent_id : str , agent_idx : int
) - > None :
"""
Takes the output of the last action and store it into the training buffer .
"""
actions = take_action_outputs [ " action " ]
if self . policy . use_continuous_act :
actions_pre = take_action_outputs [ " pre_action " ]
self . training_buffer [ agent_id ] [ " actions_pre " ] . append ( actions_pre [ agent_idx ] )
epsilons = take_action_outputs [ " random_normal_epsilon " ]
self . training_buffer [ agent_id ] [ " random_normal_epsilon " ] . append (
epsilons [ agent_idx ]
)
a_dist = take_action_outputs [ " log_probs " ]
# value is a dictionary from name of reward to value estimate of the value head
self . training_buffer [ agent_id ] [ " actions " ] . append ( actions [ agent_idx ] )
self . training_buffer [ agent_id ] [ " action_probs " ] . append ( a_dist [ agent_idx ] )
def add_rewards_outputs (
self ,
value : Dict [ str , Any ] ,
rewards_dict : Dict [ str , RewardSignalResult ] ,
agent_id : str ,
agent_idx : int ,
agent_next_idx : int ,
) - > None :
"""
Takes the value output of the last action and store it into the training buffer .
"""
for name , reward_result in rewards_dict . items ( ) :
# 0 because we use the scaled reward to train the agent
self . training_buffer [ agent_id ] [ " {}_rewards " . format ( name ) ] . append (
reward_result . scaled_reward [ agent_idx ]
)
self . training_buffer [ agent_id ] [ " {}_value_estimates " . format ( name ) ] . append (
value [ name ] [ agent_next_idx ] [ 0 ]
)
def end_episode ( self ) :
"""