# # Unity ML-Agents Toolkit
# ## ML-Agent Learning (PPO)
# Contains an implementation of PPO as described (https://arxiv.org/abs/1707.06347).
# Contains an implementation of PPO as described in: https://arxiv.org/abs/1707.06347
from collections import deque
from collections import deque , defaultdict
from typing import Any , List
import numpy as np
from mlagents.trainers.buffer import Buffer
from mlagents.trainers.ppo.policy import PPOPolicy
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.trainer import Trainer , UnityTrainerException
from mlagents.trainers.action_info import ActionInfoOutputs
logger = logging . getLogger ( " mlagents.trainers " )
" beta " ,
" buffer_size " ,
" epsilon " ,
" gamma " ,
" hidden_units " ,
" lambd " ,
" learning_rate " ,
" use_recurrent " ,
" summary_path " ,
" memory_size " ,
" use_curiosity " ,
" curiosity_strength " ,
" curiosity_enc_size " ,
" reward_signals " ,
self . check_param_keys ( )
self . check_param_keys ( )
self . use_curiosity = bool ( trainer_parameters [ " use_curiosity " ] )
# Make sure we have at least one reward_signal
if not self . trainer_parameters [ " reward_signals " ] :
raise UnityTrainerException (
" No reward signals were defined. At least one must be used with {}. " . format (
self . __class__ . __name__
)
)
stats = {
" Environment/Cumulative Reward " : [ ] ,
" Environment/Episode Length " : [ ] ,
" Policy/Value Estimate " : [ ] ,
" Policy/Entropy " : [ ] ,
" Losses/Value Loss " : [ ] ,
" Losses/Policy Loss " : [ ] ,
" Policy/Learning Rate " : [ ] ,
}
if self . use_curiosity :
stats [ " Losses/Forward Loss " ] = [ ]
stats [ " Losses/Inverse Loss " ] = [ ]
stats [ " Policy/Curiosity Reward " ] = [ ]
self . intrinsic_rewards = { }
stats = defaultdict ( list )
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self . collected_rewards = { " environment " : { } }
for _reward_signal in self . policy . reward_signals . keys ( ) :
self . collected_rewards [ _reward_signal ] = { }
self . cumulative_rewards = { }
return """ Hyperparameters for the PPO Trainer of brain {0}: \n {1} """ . format (
return """ Hyperparameters for the {0} of brain {1}: \n {2} """ . format (
self . __class__ . __name__ ,
" \n " . join (
[
" \t {0}: \t {1} " . format ( x , self . trainer_parameters [ x ] )
for x in self . param_keys
]
) ,
self . dict_to_str ( self . trainer_parameters , 0 ) ,
)
@property
"""
Increment the step count of the trainer and Updates the last reward
"""
if len ( self . stats [ " Environment/Cumulative Reward " ] ) > 0 :
if self . stats [ " Environment/Cumulative Reward " ] :
mean_reward = np . mean ( self . stats [ " Environment/Cumulative Reward " ] )
self . policy . update_reward ( mean_reward )
self . policy . increment_step ( )
"""
Constructs a BrainInfo which contains the most recent previous experiences for all agents info
Constructs a BrainInfo which contains the most recent previous experiences for all agents
which correspond to the agents in a provided next_info .
: BrainInfo next_info : A t + 1 BrainInfo .
: return : curr_info : Reconstructed BrainInfo to match agents of next_info .
"""
self . trainer_metrics . start_experience_collection_timer ( )
if take_action_outputs :
self . stats [ " Policy/Value Estimate " ] . append (
take_action_outputs [ " value " ] . mean ( )
)
for name , signal in self . policy . reward_signals . items ( ) :
self . stats [ signal . value_name ] . append (
np . mean ( take_action_outputs [ " value " ] [ name ] )
)
curr_info = curr_all_info [ self . brain_name ]
next_info = next_all_info [ self . brain_name ]
else :
curr_to_use = curr_info
intrinsic_rewards = self . policy . get_intrinsic_rewards ( curr_to_use , next_info )
tmp_rewards_dict = { }
for name , signal in self . policy . reward_signals . items ( ) :
tmp_rewards_dict [ name ] = signal . evaluate ( curr_to_use , next_info )
for agent_id in next_info . agents :
stored_info = self . training_buffer [ agent_id ] . last_brain_info
stored_info . action_masks [ idx ] , padding_value = 1
)
a_dist = stored_take_action_outputs [ " log_probs " ]
# value is a dictionary from name of reward to value estimate of the value head
value = stored_take_action_outputs [ " value " ]
self . training_buffer [ agent_id ] [ " actions " ] . append ( actions [ idx ] )
self . training_buffer [ agent_id ] [ " prev_action " ] . append (
if self . use_curiosity :
self . training_buffer [ agent_id ] [ " rewards " ] . append (
next_info . rewards [ next_idx ] + intrinsic_rewards [ next_idx ]
)
else :
self . training_buffer [ agent_id ] [ " rewards " ] . append (
next_info . rewards [ next_idx ]
)
self . training_buffer [ agent_id ] [ " action_probs " ] . append ( a_dist [ idx ] )
self . training_buffer [ agent_id ] [ " value_estimates " ] . append (
value [ idx ] [ 0 ]
self . training_buffer [ agent_id ] [ " done " ] . append (
next_info . local_done [ next_idx ]
if agent_id not in self . cumulative_rewards :
self . cumulative_rewards [ agent_id ] = 0
self . cumulative_rewards [ agent_id ] + = next_info . rewards [ next_idx ]
if self . use_curiosity :
if agent_id not in self . intrinsic_rewards :
self . intrinsic_rewards [ agent_id ] = 0
self . intrinsic_rewards [ agent_id ] + = intrinsic_rewards [ next_idx ]
for name , reward_result in tmp_rewards_dict . items ( ) :
# 0 because we use the scaled reward to train the agent
self . training_buffer [ agent_id ] [
" {}_rewards " . format ( name )
] . append ( reward_result . scaled_reward [ next_idx ] )
self . training_buffer [ agent_id ] [
" {}_value_estimates " . format ( name )
] . append ( value [ name ] [ idx ] [ 0 ] )
self . training_buffer [ agent_id ] [ " action_probs " ] . append ( a_dist [ idx ] )
for name , rewards in self . collected_rewards . items ( ) :
if agent_id not in rewards :
rewards [ agent_id ] = 0
if name == " environment " :
# Report the reward from the environment
rewards [ agent_id ] + = np . array ( next_info . rewards ) [ next_idx ]
else :
# Report the reward signals
rewards [ agent_id ] + = tmp_rewards_dict [ name ] . scaled_reward [
next_idx
]
if not next_info . local_done [ next_idx ] :
if agent_id not in self . episode_steps :
self . episode_steps [ agent_id ] = 0
: param current_info : Dictionary of all current brains and corresponding BrainInfo .
: param new_info : Dictionary of all next brains and corresponding BrainInfo .
"""
self . trainer_metrics . start_experience_collection_timer ( )
info = new_info [ self . brain_name ]
for l in range ( len ( info . agents ) ) :
agent_actions = self . training_buffer [ info . agents [ l ] ] [ " actions " ]
) and len ( agent_actions ) > 0 :
agent_id = info . agents [ l ]
if info . max_reached [ l ] :
bootstrapping_info = self . training_buffer [ agent_id ] . last_brain_info
idx = bootstrapping_info . agents . index ( agent_id )
else :
bootstrapping_info = info
idx = l
value_next = self . policy . get_value_estimates ( bootstrapping_info , idx )
value_next = 0.0
else :
if info . max_reached [ l ] :
bootstrapping_info = self . training_buffer [
agent_id
] . last_brain_info
idx = bootstrapping_info . agents . index ( agent_id )
else :
bootstrapping_info = info
idx = l
value_next = self . policy . get_value_estimate ( bootstrapping_info , idx )
value_next [ " extrinsic " ] = 0.0
tmp_advantages = [ ]
tmp_returns = [ ]
for name in self . policy . reward_signals :
bootstrap_value = value_next [ name ]
self . training_buffer [ agent_id ] [ " advantages " ] . set (
get_gae (
rewards = self . training_buffer [ agent_id ] [ " rewards " ] . get_batch ( ) ,
value_estimates = self . training_buffer [ agent_id ] [
" value_estimates "
] . get_batch ( ) ,
value_next = value_next ,
gamma = self . trainer_parameters [ " gamma " ] ,
local_rewards = self . training_buffer [ agent_id ] [
" {}_rewards " . format ( name )
] . get_batch ( )
local_value_estimates = self . training_buffer [ agent_id ] [
" {}_value_estimates " . format ( name )
] . get_batch ( )
local_advantage = get_gae (
rewards = local_rewards ,
value_estimates = local_value_estimates ,
value_next = bootstrap_value ,
gamma = self . policy . reward_signals [ name ] . gamma ,
)
self . training_buffer [ agent_id ] [ " discounted_returns " ] . set (
self . training_buffer [ agent_id ] [ " advantages " ] . get_batch ( )
+ self . training_buffer [ agent_id ] [ " value_estimates " ] . get_batch ( )
)
local_return = local_advantage + local_value_estimates
# This is later use as target for the different value estimates
self . training_buffer [ agent_id ] [ " {}_returns " . format ( name ) ] . set (
local_return
)
self . training_buffer [ agent_id ] [ " {}_advantage " . format ( name ) ] . set (
local_advantage
)
tmp_advantages . append ( local_advantage )
tmp_returns . append ( local_return )
global_advantages = list ( np . mean ( np . array ( tmp_advantages ) , axis = 0 ) )
global_returns = list ( np . mean ( np . array ( tmp_returns ) , axis = 0 ) )
self . training_buffer [ agent_id ] [ " advantages " ] . set ( global_advantages )
self . training_buffer [ agent_id ] [ " discounted_returns " ] . set ( global_returns )
self . training_buffer . append_update_buffer (
agent_id ,
self . training_buffer [ agent_id ] . reset_agent ( )
if info . local_done [ l ] :
self . cumulative_returns_since_policy_update . append (
self . cumulative_rewards . get ( agent_id , 0 )
)
self . stats [ " Environment/Cumulative Reward " ] . append (
self . cumulative_rewards . get ( agent_id , 0 )
)
self . reward_buffer . appendleft (
self . cumulative_rewards . get ( agent_id , 0 )
)
self . cumulative_rewards [ agent_id ] = 0
if self . use_curiosity :
self . stats [ " Policy/Curiosity Reward " ] . append (
self . intrinsic_rewards . get ( agent_id , 0 )
)
self . intrinsic_rewards [ agent_id ] = 0
self . trainer_metrics . end_experience_collection_timer ( )
for name , rewards in self . collected_rewards . items ( ) :
if name == " environment " :
self . cumulative_returns_since_policy_update . append (
rewards . get ( agent_id , 0 )
)
self . stats [ " Environment/Cumulative Reward " ] . append (
rewards . get ( agent_id , 0 )
)
rewards [ agent_id ] = 0
self . reward_buffer . appendleft ( rewards . get ( agent_id , 0 ) )
else :
self . stats [
self . policy . reward_signals [ name ] . stat_name
] . append ( rewards . get ( agent_id , 0 ) )
rewards [ agent_id ] = 0
def end_episode ( self ) :
"""
self . training_buffer . reset_local_buffers ( )
for agent_id in self . cumulative_rewards :
self . cumulative_rewards [ agent_id ] = 0
if self . use_curiosity :
for agent_id in self . intrinsic_rewards :
self . intrinsic_rewards [ agent_id ] = 0
for rewards in self . collected_rewards . values ( ) :
for agent_id in rewards :
rewards [ agent_id ] = 0
def is_ready_update ( self ) :
"""
def update_policy ( self ) :
"""
Uses demonstration_buffer to update the policy .
The reward signal generators must be updated in this method at their own pace .
"""
self . trainer_metrics . start_policy_update_timer (
number_experiences = len ( self . training_buffer . update_buffer [ " actions " ] ) ,
)
value_total . append ( run_out [ " value_loss " ] )
policy_total . append ( np . abs ( run_out [ " policy_loss " ] ) )
if self . use_curiosity :
inverse_total . append ( run_out [ " inverse_loss " ] )
forward_total . append ( run_out [ " forward_loss " ] )
if self . use_curiosity :
self . stats [ " Losses/Forward Loss " ] . append ( np . mean ( forward_total ) )
self . stats [ " Losses/Inverse Loss " ] . append ( np . mean ( in verse_tot al) )
for _ , reward_signal in self . policy . reward_signals . items ( ) :
update_stats = reward_signal . update (
self . training_buffer . update_buffer , n_sequences
)
for stat , val in update_stats . items ( ) :
self . stats [ stat ] . append ( val )
self . training_buffer . reset_update_buffer ( )
self . trainer_metrics . end_policy_update ( )