from gym import error , spaces
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.base_env import BatchedStepResult
from mlagents_envs.base_env import DecisionSteps , TerminalSteps
from mlagents_envs import logging_util
logger = logging_util . get_logger ( __name__ )
logging_util . set_log_level ( logging_util . INFO )
GymSingleStepResult = Tuple [ np . ndarray , float , bool , Dict ]
GymMultiStepResult = Tuple [ List [ np . ndarray ] , List [ float ] , List [ bool ] , Dict ]
GymStepResult = Union [ GymSingleStepResult , GymMultiStepResult ]
GymStepResult = Tuple [ np . ndarray , float , bool , Dict ]
Multi - agent environments use lists for object types , as done here :
https : / / github . com / openai / multiagent - particle - envs
"""
def __init__ (
use_visual : bool = False ,
uint8_visual : bool = False ,
multiagent : bool = False ,
flatten_branched : bool = False ,
no_graphics : bool = False ,
allow_multiple_visual_obs : bool = False ,
: param worker_id : Worker number for environment .
: param use_visual : Whether to use visual observation or vector observation .
: param uint8_visual : Return visual observations as uint8 ( 0 - 255 ) matrices instead of float ( 0.0 - 1.0 ) .
: param multiagent : Whether to run in multi - agent mode ( lists of obs , reward , done ) .
: param flatten_branched : If True , turn branched discrete action spaces into a Discrete space rather than
MultiDiscrete .
: param no_graphics : Whether to run the Unity simulator in no - graphics mode
)
# Take a single step so that the brain information will be sent over
if not self . _env . get_agent_groups ( ) :
if not self . _env . get_behavior_names ( ) :
self . agent_mapper = AgentIdIndexMapper ( )
self . _previous_step_result : BatchedStepResult = None
self . _multiagent = multiagent
self . _previous_decision_step : DecisionSteps = None
self . _flattener = None
# Hidden flag used by Atari environments to determine if the game is over
self . game_over = False
if len ( self . _env . get_agent_groups ( ) ) != 1 :
if len ( self . _env . get_behavior_names ( ) ) != 1 :
" There can only be one brain in a UnityEnvironment "
" There can only be one behavior in a UnityEnvironment "
self . brain_name = self . _env . get_agent_groups ( ) [ 0 ]
self . name = self . brain_name
self . group_spec = self . _env . get_agent_group_spec ( self . brain_name )
self . name = self . _env . get_behavior_names ( ) [ 0 ]
self . group_spec = self . _env . get_behavior_spec ( self . name )
if use_visual and self . _get_n_vis_obs ( ) == 0 :
raise UnityGymException (
# Check for number of agents in scene.
self . _env . reset ( )
step_result = self . _env . get_step_result ( self . brain_name )
self . _check_agents ( step_result . n_agents ( ) )
self . _previous_step_result = step_result
self . agent_mapper . set_initial_agents ( list ( self . _previous_step_result . agent_id ) )
decision_steps , _ = self . _env . get_steps ( self . name )
self . _check_agents ( len ( decision_steps ) )
self . _previous_decision_step = decision_steps
# Set observation and action spaces
if self . group_spec . is_action_discrete ( ) :
def reset ( self ) - > Union [ List [ np . ndarray ] , np . ndarray ] :
""" Resets the state of the environment and returns an initial observation.
In the case of multi - agent environments , this is a list .
step_result = self . _step ( True )
n_agents = step_result . n_agents ( )
self . _env . reset ( )
decision_step , _ = self . _env . get_steps ( self . name )
n_agents = len ( decision_step )
if not self . _multiagent :
res : GymStepResult = self . _single_step ( step_result )
else :
res = self . _multi_step ( step_result )
res : GymStepResult = self . _single_step ( decision_step )
return res [ 0 ]
def step ( self , action : List [ Any ] ) - > GymStepResult :
Accepts an action and returns a tuple ( observation , reward , done , info ) .
In the case of multi - agent environments , these are lists .
Args :
action ( object / list ) : an action provided by the environment
Returns :
info ( dict ) : contains auxiliary diagnostic information , including BatchedStepResult .
info ( dict ) : contains auxiliary diagnostic information .
# Use random actions for all other agents in environment.
if self . _multiagent :
if not isinstance ( action , list ) :
raise UnityGymException (
" The environment was expecting `action` to be a list. "
)
if len ( action ) != self . _n_agents :
raise UnityGymException (
" The environment was expecting a list of {} actions. " . format (
self . _n_agents
)
)
else :
if self . _flattener is not None :
# Action space is discrete and flattened - we expect a list of scalars
action = [ self . _flattener . lookup_action ( _act ) for _act in action ]
action = np . array ( action )
else :
if self . _flattener is not None :
# Translate action into list
action = self . _flattener . lookup_action ( action )
if self . _flattener is not None :
# Translate action into list
action = self . _flattener . lookup_action ( action )
action = np . array ( action ) . reshape ( ( self . _n_agents , spec . action_size ) )
action = self . _sanitize_action ( action )
self . _env . set_actions ( self . brain_name , action )
action = np . array ( action ) . reshape ( ( 1 , spec . action_size ) )
self . _env . set_actions ( self . name , action )
step_result = self . _step ( )
n_agents = step_result . n_agents ( )
self . _check_agents ( n_agents )
if not self . _multiagent :
single_res = self . _single_step ( step_result )
self . game_over = single_res [ 2 ]
return single_res
self . _env . step ( )
decision_step , terminal_step = self . _env . get_steps ( self . name )
if len ( terminal_step ) != 0 :
# The agent is done
self . game_over = True
return self . _single_step ( terminal_step )
multi_res = self . _multi_step ( step_result )
self . game_over = all ( multi_res [ 2 ] )
return multi_res
return self . _single_step ( decision_step )
def _single_step ( self , info : BatchedStepResult ) - > GymSingleStepResult :
def _single_step ( self , info : Union [ DecisionSteps , TerminalSteps ] ) - > GymStepResult :
if self . use_visual :
visual_obs = self . _get_vis_obs_list ( info )
" The Agent does not have vector observations and the environment was not setup "
+ " to use visual observations. "
)
done = isinstance ( info , TerminalSteps )
return (
default_observation ,
info . reward [ 0 ] ,
info . done [ 0 ] ,
{ " batched_step_result " : info } ,
)
return ( default_observation , info . reward [ 0 ] , done , { " step " : info } )
def _preprocess_single ( self , single_visual_obs : np . ndarray ) - > np . ndarray :
if self . uint8_visual :
def _multi_step ( self , info : BatchedStepResult ) - > GymMultiStepResult :
if self . use_visual :
self . visual_obs = self . _preprocess_multi ( self . _get_vis_obs_list ( info ) )
default_observation = self . visual_obs
else :
default_observation = self . _get_vector_obs ( info )
return (
list ( default_observation ) ,
list ( info . reward ) ,
list ( info . done ) ,
{ " batched_step_result " : info } ,
)
def _get_n_vis_obs ( self ) - > int :
result = 0
for shape in self . group_spec . observation_shapes :
return shape
return None
def _get_vis_obs_list ( self , step_result : BatchedStepResult ) - > List [ np . ndarray ] :
def _get_vis_obs_list (
self , step_result : Union [ DecisionSteps , TerminalSteps ]
) - > List [ np . ndarray ] :
result : List [ np . ndarray ] = [ ]
for obs in step_result . obs :
if len ( obs . shape ) == 4 :
def _get_vector_obs ( self , step_result : BatchedStepResult ) - > np . ndarray :
def _get_vector_obs (
self , step_result : Union [ DecisionSteps , TerminalSteps ]
) - > np . ndarray :
result : List [ np . ndarray ] = [ ]
for obs in step_result . obs :
if len ( obs . shape ) == 2 :
result + = shape [ 0 ]
return result
def _preprocess_multi (
self , multiple_visual_obs : List [ np . ndarray ]
) - > List [ np . ndarray ] :
if self . uint8_visual :
return [
( 255.0 * _visual_obs ) . astype ( np . uint8 )
for _visual_obs in multiple_visual_obs
]
else :
return multiple_visual_obs
def render ( self , mode = " rgb_array " ) :
return self . visual_obs
return
def _check_agents ( self , n_agents : int ) - > None :
if not self . _multiagent and n_agents > 1 :
raise UnityGymException (
" The environment was launched as a single-agent environment, however "
" there is more than one agent in the scene. "
)
elif self . _multiagent and n_agents < = 1 :
raise UnityGymException (
" The environment was launched as a mutli-agent environment, however "
" there is only one agent in the scene. "
)
if self . _n_agents == - 1 :
self . _n_agents = n_agents
logger . info ( " {} agents within environment. " . format ( n_agents ) )
elif self . _n_agents != n_agents :
if self . _n_agents > 1 :
" The number of agents in the environment has changed since "
" initialization. This is not supported. "
" There can only be one Agent in the environment but {n_agents} were detected. "
def _sanitize_info ( self , step_result : BatchedStepResult ) - > BatchedStepResult :
n_extra_agents = step_result . n_agents ( ) - self . _n_agents
if n_extra_agents < 0 :
# In this case, some Agents did not request a decision when expected
raise UnityGymException (
" The number of agents in the scene does not match the expected number. "
)
if step_result . n_agents ( ) - sum ( step_result . done ) != self . _n_agents :
raise UnityGymException (
" The number of agents in the scene does not match the expected number. "
)
for index , agent_id in enumerate ( step_result . agent_id ) :
if step_result . done [ index ] :
self . agent_mapper . mark_agent_done ( agent_id , step_result . reward [ index ] )
# Set the new AgentDone flags to True
# Note that the corresponding agent_id that gets marked done will be different
# than the original agent that was done, but this is OK since the gym interface
# only cares about the ordering.
for index , agent_id in enumerate ( step_result . agent_id ) :
if not self . _previous_step_result . contains_agent ( agent_id ) :
if step_result . done [ index ] :
# If the Agent is already done (e.g. it ended its epsiode twice in one step)
# Don't try to register it here.
continue
# Register this agent, and get the reward of the previous agent that
# was in its index, so that we can return it to the gym.
last_reward = self . agent_mapper . register_new_agent_id ( agent_id )
step_result . done [ index ] = True
step_result . reward [ index ] = last_reward
self . _previous_step_result = step_result # store the new original
# Get a permutation of the agent IDs so that a given ID stays in the same
# index as where it was first seen.
new_id_order = self . agent_mapper . get_id_permutation ( list ( step_result . agent_id ) )
_mask : Optional [ List [ np . array ] ] = None
if step_result . action_mask is not None :
_mask = [ ]
for mask_index in range ( len ( step_result . action_mask ) ) :
_mask . append ( step_result . action_mask [ mask_index ] [ new_id_order ] )
new_obs : List [ np . array ] = [ ]
for obs_index in range ( len ( step_result . obs ) ) :
new_obs . append ( step_result . obs [ obs_index ] [ new_id_order ] )
return BatchedStepResult (
obs = new_obs ,
reward = step_result . reward [ new_id_order ] ,
done = step_result . done [ new_id_order ] ,
max_step = step_result . max_step [ new_id_order ] ,
agent_id = step_result . agent_id [ new_id_order ] ,
action_mask = _mask ,
)
def _sanitize_action ( self , action : np . array ) - > np . array :
sanitized_action = np . zeros (
( self . _previous_step_result . n_agents ( ) , self . group_spec . action_size )
)
for index , agent_id in enumerate ( self . _previous_step_result . agent_id ) :
if not self . _previous_step_result . done [ index ] :
array_index = self . agent_mapper . get_gym_index ( agent_id )
sanitized_action [ index , : ] = action [ array_index , : ]
return sanitized_action
def _step ( self , needs_reset : bool = False ) - > BatchedStepResult :
if needs_reset :
self . _env . reset ( )
else :
self . _env . step ( )
info = self . _env . get_step_result ( self . brain_name )
# Two possible cases here:
# 1) all agents requested decisions (some of which might be done)
# 2) some Agents were marked Done in between steps.
# In case 2, we re-request decisions until all agents request a real decision.
while info . n_agents ( ) - sum ( info . done ) < self . _n_agents :
if not info . done . all ( ) :
raise UnityGymException (
" The environment does not have the expected amount of agents. "
+ " Some agents did not request decisions at the same time. "
)
for agent_id , reward in zip ( info . agent_id , info . reward ) :
self . agent_mapper . mark_agent_done ( agent_id , reward )
self . _env . step ( )
info = self . _env . get_step_result ( self . brain_name )
return self . _sanitize_info ( info )
@property
def metadata ( self ) :
return { " render.modes " : [ " rgb_array " ] }
: return : The List containing the branched actions .
"""
return self . action_lookup [ action ]
class AgentIdIndexMapper :
def __init__ ( self ) - > None :
self . _agent_id_to_gym_index : Dict [ int , int ] = { }
self . _done_agents_index_to_last_reward : Dict [ int , float ] = { }
def set_initial_agents ( self , agent_ids : List [ int ] ) - > None :
"""
Provide the initial list of agent ids for the mapper
"""
for idx , agent_id in enumerate ( agent_ids ) :
self . _agent_id_to_gym_index [ agent_id ] = idx
def mark_agent_done ( self , agent_id : int , reward : float ) - > None :
"""
Declare the agent done with the corresponding final reward .
"""
if agent_id in self . _agent_id_to_gym_index :
gym_index = self . _agent_id_to_gym_index . pop ( agent_id )
self . _done_agents_index_to_last_reward [ gym_index ] = reward
else :
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
pass
def register_new_agent_id ( self , agent_id : int ) - > float :
"""
Adds the new agent ID and returns the reward to use for the previous agent in this index
"""
# Any free index is OK here.
free_index , last_reward = self . _done_agents_index_to_last_reward . popitem ( )
self . _agent_id_to_gym_index [ agent_id ] = free_index
return last_reward
def get_id_permutation ( self , agent_ids : List [ int ] ) - > List [ int ] :
"""
Get the permutation from new agent ids to the order that preserves the positions of previous agents.
The result is a list with each integer from 0 to len ( _agent_id_to_gym_index ) - 1
appearing exactly once .
"""
# Map the new agent ids to the their index
new_agent_ids_to_index = {
agent_id : idx for idx , agent_id in enumerate ( agent_ids )
}
# Make the output list. We don't write to it sequentially, so start with dummy values.
new_permutation = [ - 1 ] * len ( self . _agent_id_to_gym_index )
# For each agent ID, find the new index of the agent, and write it in the original index.
for agent_id , original_index in self . _agent_id_to_gym_index . items ( ) :
new_permutation [ original_index ] = new_agent_ids_to_index [ agent_id ]
return new_permutation
def get_gym_index ( self , agent_id : int ) - > int :
"""
Get the gym index for the current agent .
"""
return self . _agent_id_to_gym_index [ agent_id ]
class AgentIdIndexMapperSlow :
"""
Reference implementation of AgentIdIndexMapper .
The operations are O ( N ^ 2 ) so it shouldn ' t be used for large numbers of agents.
See AgentIdIndexMapper for method descriptions
"""
def __init__ ( self ) - > None :
self . _gym_id_order : List [ int ] = [ ]
self . _done_agents_index_to_last_reward : Dict [ int , float ] = { }
def set_initial_agents ( self , agent_ids : List [ int ] ) - > None :
self . _gym_id_order = list ( agent_ids )
def mark_agent_done ( self , agent_id : int , reward : float ) - > None :
try :
gym_index = self . _gym_id_order . index ( agent_id )
self . _done_agents_index_to_last_reward [ gym_index ] = reward
self . _gym_id_order [ gym_index ] = - 1
except ValueError :
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
pass
def register_new_agent_id ( self , agent_id : int ) - > float :
original_index = self . _gym_id_order . index ( - 1 )
self . _gym_id_order [ original_index ] = agent_id
reward = self . _done_agents_index_to_last_reward . pop ( original_index )
return reward
def get_id_permutation ( self , agent_ids ) :
new_id_order = [ ]
for agent_id in self . _gym_id_order :
new_id_order . append ( agent_ids . index ( agent_id ) )
return new_id_order
def get_gym_index ( self , agent_id : int ) - > int :
return self . _gym_id_order . index ( agent_id )