from typing import Dict , List , Optional , Any
from mlagents.envs.side_channel.side_channel import SideChannel
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import (
BaseEnv ,
BatchedStepResult ,
AgentGroupSpec ,
AgentGroup ,
AgentId ,
)
from .brain import AllBrainInfo , BrainInfo , BrainParameters
)
from mlagents.envs.rpc_utils import (
agent_group_spec_from_proto ,
batched_step_result_from_proto ,
)
from mlagents.envs.communicator_objects.unity_rl_input_pb2 import UnityRLInputProto
logger = logging . getLogger ( " mlagents.envs " )
class UnityEnvironment ( BaseUnity Environment ) :
class UnityEnvironment ( BaseEnv ) :
SCALAR_ACTION_TYPES = ( int , np . int32 , np . int64 , float , np . float32 , np . float64 )
SINGLE_BRAIN_ACTION_TYPES = SCALAR_ACTION_TYPES + ( list , np . ndarray )
API_VERSION = " API-12 "
" {1}. \n Please go to https://github.com/Unity-Technologies/ml-agents to download the latest version "
" of ML-Agents. " . format ( self . _version_ , self . _unity_version )
)
self . _n_agents : Dict [ str , int ] = { }
self . _env_state : Dict [ str , BatchedStepResult ] = { }
self . _env_specs : Dict [ str , AgentGroupSpec ] = { }
self . _env_actions : Dict [ str , np . ndarray ] = { }
self . _academy_name = aca_params . name
self . _log_path = aca_params . log_path
self . _brains : Dict [ str , BrainParameters ] = { }
self . _external_brain_names : List [ str ] = [ ]
self . _num_external_brains = 0
self . _update_brain_parameters ( aca_output )
logger . info (
" \n ' {0} ' started successfully! \n {1} " . format ( self . _academy_name , str ( self ) )
)
@property
def logfile_path ( self ) :
return self . _log_path
@property
def brains ( self ) :
return self . _brains
@property
def academy_name ( self ) :
return self . _academy_name
@property
def number_external_brains ( self ) :
return self . _num_external_brains
@property
def external_brain_names ( self ) :
return self . _external_brain_names
self . _update_group_specs ( aca_output )
@property
def external_brains ( self ) :
external_brains = { }
for brain_name in self . external_brain_names :
external_brains [ brain_name ] = self . brains [ brain_name ]
return external_brains
def executable_launcher ( self , file_name , docker_training , no_graphics , args ) :
cwd = os . getcwd ( )
shell = True ,
)
def __str__ ( self ) :
return """ Unity Academy name: {0} """ . format ( self . _academy_name )
def _update_group_specs ( self , output : UnityOutputProto ) - > None :
init_output = output . rl_initialization_output
for brain_param in init_output . brain_parameters :
# Each BrainParameter in the rl_initialization_output should have at least one AgentInfo
# Get that agent, because we need some of its observations.
agent_infos = output . rl_output . agentInfos [ brain_param . brain_name ]
if agent_infos . value :
agent = agent_infos . value [ 0 ]
new_spec = agent_group_spec_from_proto ( brain_param , agent )
self . _env_specs [ brain_param . brain_name ] = new_spec
logger . info ( f " Connected new brain: \n {brain_param.brain_name} " )
def reset ( self ) - > AllBrainInfo :
def _update_state ( self , output : UnityRLOutputProto ) - > None :
Sends a signal to reset the unity environment .
: return : AllBrainInfo : A data structure corresponding to the initial reset state of the environment .
Collects experience information from all external brains in environment at current step.
for brain_name in self . _env_specs . keys ( ) :
if brain_name in output . agentInfos :
agent_info_list = output . agentInfos [ brain_name ] . value
self . _env_state [ brain_name ] = batched_step_result_from_proto (
agent_info_list , self . _env_specs [ brain_name ]
)
else :
self . _env_state [ brain_name ] = BatchedStepResult . empty (
self . _env_specs [ brain_name ]
)
self . _parse_side_channel_message ( self . side_channels , output . side_channel )
def reset ( self ) - > None :
self . _update_brain_parameters ( outputs )
self . _update_group_specs ( outputs )
s = self . _get_state ( rl_output )
for _b in self . _external_brain_names :
self . _n_agents [ _b ] = len ( s [ _b ] . agents )
self . _update_state ( rl_output )
return s
self . _env_actions . clear ( )
def step (
self ,
vector_action : Dict [ str , np . ndarray ] = None ,
value : Optional [ Dict [ str , np . ndarray ] ] = None ,
) - > AllBrainInfo :
"""
Provides the environment with an action , moves the environment dynamics forward accordingly ,
and returns observation , state , and reward information to the agent .
: param value : Value estimates provided by agents .
: param vector_action : Agent ' s vector action. Can be a scalar or vector of int/floats.
: param memory : Vector corresponding to memory used for recurrent policies .
: return : AllBrainInfo : A Data structure corresponding to the new state of the environment .
"""
def step ( self ) - > None :
vector_action = { } if vector_action is None else vector_action
value = { } if value is None else value
# Check that environment is loaded, and episode is currently running.
else :
if isinstance ( vector_action , self . SINGLE_BRAIN_ACTION_TYPES ) :
if self . _num_external_brains == 1 :
vector_action = { self . _external_brain_names [ 0 ] : vector_action }
elif self . _num_external_brains > 1 :
raise UnityActionException (
" You have {0} brains, you need to feed a dictionary of brain names a keys, "
" and vector_actions as values " . format ( self . _num_external_brains )
)
else :
raise UnityActionException (
" There are no external brains in the environment, "
" step cannot take a vector_action input "
)
# fill the blanks for missing actions
for group_name in self . _env_specs :
if group_name not in self . _env_actions :
n_agents = 0
if group_name in self . _env_state :
n_agents = self . _env_state [ group_name ] . n_agents ( )
self . _env_actions [ group_name ] = self . _env_specs [
group_name
] . create_empty_action ( n_agents )
step_input = self . _generate_step_input ( self . _env_actions )
with hierarchical_timer ( " communicator.exchange " ) :
outputs = self . communicator . exchange ( step_input )
if outputs is None :
raise UnityCommunicationException ( " Communicator has stopped. " )
self . _update_group_specs ( outputs )
rl_output = outputs . rl_output
self . _update_state ( rl_output )
self . _env_actions . clear ( )
if isinstance ( value , self . SINGLE_BRAIN_ACTION_TYPES ) :
if self . _num_external_brains == 1 :
value = { self . _external_brain_names [ 0 ] : value }
elif self . _num_external_brains > 1 :
raise UnityActionException (
" You have {0} brains, you need to feed a dictionary of brain names as keys "
" and state/action value estimates as values " . format (
self . _num_external_brains
)
)
else :
raise UnityActionException (
" There are no external brains in the environment, "
" step cannot take a value input "
)
def get_agent_groups ( self ) - > List [ AgentGroup ] :
return list ( self . _env_specs . keys ( ) )
for brain_name in list ( vector_action . keys ( ) ) :
if brain_name not in self . _external_brain_names :
raise UnityActionException (
" The name {0} does not correspond to an external brain "
" in the environment " . format ( brain_name )
)
def _assert_group_exists ( self , agent_group : str ) - > None :
if agent_group not in self . _env_specs :
raise UnityActionException (
" The group {0} does not correspond to an existing agent group "
" in the environment " . format ( agent_group )
)
for brain_name in self . _external_brain_names :
n_agent = self . _n_agents [ brain_name ]
if brain_name not in vector_action :
if self . _brains [ brain_name ] . vector_action_space_type == " discrete " :
vector_action [ brain_name ] = (
[ 0.0 ]
* n_agent
* len ( self . _brains [ brain_name ] . vector_action_space_size )
)
else :
vector_action [ brain_name ] = (
[ 0.0 ]
* n_agent
* self . _brains [ brain_name ] . vector_action_space_size [ 0 ]
)
else :
vector_action [ brain_name ] = self . _flatten ( vector_action [ brain_name ] )
discrete_check = (
self . _brains [ brain_name ] . vector_action_space_type == " discrete "
def set_actions ( self , agent_group : AgentGroup , action : np . ndarray ) - > None :
self . _assert_group_exists ( agent_group )
if agent_group not in self . _env_state :
return
spec = self . _env_specs [ agent_group ]
expected_type = np . float32 if spec . is_action_continuous ( ) else np . int32
expected_shape = ( self . _env_state [ agent_group ] . n_agents ( ) , spec . action_size )
if action . shape != expected_shape :
raise UnityActionException (
" The group {0} needs an input of dimension {1} but received input of dimension {2} " . format (
agent_group , expected_shape , action . shape
)
if action . dtype != expected_type :
action = action . astype ( expected_type )
self . _env_actions [ agent_group ] = action
expected_discrete_size = n_agent * len (
self . _brains [ brain_name ] . vector_action_space_size
def set_action_for_agent (
self , agent_group : AgentGroup , agent_id : AgentId , action : np . ndarray
) - > None :
self . _assert_group_exists ( agent_group )
if agent_group not in self . _env_state :
return
spec = self . _env_specs [ agent_group ]
expected_shape = ( spec . action_size , )
if action . shape != expected_shape :
raise UnityActionException (
" The Agent {0} in group {1} needs an input of dimension {2} but received input of dimension {3} " . format (
agent_id , agent_group , expected_shape , action . shape
)
expected_type = np . float32 if spec . is_action_continuous ( ) else np . int32
if action . dtype != expected_type :
action = action . astype ( expected_type )
continuous_check = (
self . _brains [ brain_name ] . vector_action_space_type == " continuous "
if agent_group not in self . _env_actions :
self . _env_actions [ agent_group ] = self . _empty_action (
spec , self . _env_state [ agent_group ] . n_agents ( )
)
try :
index = np . where ( self . _env_state [ agent_group ] . agent_id == agent_id ) [ 0 ] [ 0 ]
except IndexError as ie :
raise IndexError (
" agent_id {} is did not request a decision at the previous step " . format (
agent_id
) from ie
self . _env_actions [ agent_group ] [ index ] = action
expected_continuous_size = (
self . _brains [ brain_name ] . vector_action_space_size [ 0 ] * n_agent
)
def get_step_result ( self , agent_group : AgentGroup ) - > BatchedStepResult :
self . _assert_group_exists ( agent_group )
return self . _env_state [ agent_group ]
if not (
(
discrete_check
and len ( vector_action [ brain_name ] ) == expected_discrete_size
)
or (
continuous_check
and len ( vector_action [ brain_name ] ) == expected_continuous_size
)
) :
raise UnityActionException (
" There was a mismatch between the provided action and "
" the environment ' s expectation: "
" The brain {0} expected {1} {2} action(s), but was provided: {3} " . format (
brain_name ,
str ( expected_discrete_size )
if discrete_check
else str ( expected_continuous_size ) ,
self . _brains [ brain_name ] . vector_action_space_type ,
str ( vector_action [ brain_name ] ) ,
)
)
step_input = self . _generate_step_input ( vector_action , value )
with hierarchical_timer ( " communicator.exchange " ) :
outputs = self . communicator . exchange ( step_input )
if outputs is None :
raise UnityCommunicationException ( " Communicator has stopped. " )
self . _update_brain_parameters ( outputs )
rl_output = outputs . rl_output
state = self . _get_state ( rl_output )
for _b in self . _external_brain_names :
self . _n_agents [ _b ] = len ( state [ _b ] . agents )
return state
def get_agent_group_spec ( self , agent_group : AgentGroup ) - > AgentGroupSpec :
self . _assert_group_exists ( agent_group )
return self . _env_specs [ agent_group ]
def close ( self ) :
"""
arr = [ float ( x ) for x in arr ]
return arr
def _get_state ( self , output : UnityRLOutputProto ) - > AllBrainInfo :
"""
Collects experience information from all external brains in environment at current step.
: return : a dictionary of BrainInfo objects .
"""
_data = { }
for brain_name in output . agentInfos :
agent_info_list = output . agentInfos [ brain_name ] . value
_data [ brain_name ] = BrainInfo . from_agent_proto (
self . worker_id , agent_info_list , self . brains [ brain_name ]
)
self . _parse_side_channel_message ( self . side_channels , output . side_channel )
return _data
@staticmethod
def _parse_side_channel_message (
side_channels : Dict [ int , SideChannel ] , data : bytearray
channel . message_queue = [ ]
return result
def _update_brain_parameters ( self , output : UnityOutputProto ) - > None :
init_output = output . rl_initialization_output
for brain_param in init_output . brain_parameters :
# Each BrainParameter in the rl_initialization_output should have at least one AgentInfo
# Get that agent, because we need some of its observations.
agent_infos = output . rl_output . agentInfos [ brain_param . brain_name ]
if agent_infos . value :
agent = agent_infos . value [ 0 ]
new_brain = BrainParameters . from_proto ( brain_param , agent )
self . _brains [ brain_param . brain_name ] = new_brain
logger . info ( f " Connected new brain: \n {new_brain} " )
self . _external_brain_names = list ( self . _brains . keys ( ) )
self . _num_external_brains = len ( self . _external_brain_names )
self , vector_action : Dict [ str , np . ndarray ] , value : Dict [ str , np . ndarray ]
self , vector_action : Dict [ str , np . ndarray ]
n_agents = self . _n_agents [ b ]
n_agents = self . _env_state [ b ] . n_agents ( )
_a_s = len ( vector_action [ b ] ) / / n_agents
action = AgentActionProto (
vector_actions = vector_action [ b ] [ i * _a_s : ( i + 1 ) * _a_s ]
)
if b in value :
if value [ b ] is not None :
action . value = float ( value [ b ] [ i ] )
action = AgentActionProto ( vector_actions = vector_action [ b ] [ i ] )
rl_in . agent_actions [ b ] . value . extend ( [ action ] )
rl_in . command = 0
rl_in . side_channel = bytes ( self . _generate_side_channel_data ( self . side_channels ) )