import numpy as np
import os
import subprocess
from typing import Dict , List , Optional , Any , Tuple , Mapping as MappingType
from typing import Dict , List , Optional , Tuple , Mapping as MappingType
import mlagents_envs
class UnityEnvironment ( BaseEnv ) :
SCALAR_ACTION_TYPES = ( int , np . int32 , np . int64 , float , np . float32 , np . float64 )
SINGLE_BRAIN_ACTION_TYPES = SCALAR_ACTION_TYPES + ( list , np . ndarray )
# Communication protocol version.
# When connecting to C#, this must be compatible with Academy.k_ApiVersion.
# We follow semantic versioning on the communication version, so existing
BASE_ENVIRONMENT_PORT = 5005
# Command line argument used to pass the port to the executable environment.
PORT_COMMAND_LINE_ARG = " --mlagents-port "
_ PORT_COMMAND_LINE_ARG = " --mlagents-port "
@staticmethod
def _raise_version_exception ( unity_com_ver : str ) - > None :
)
@staticmethod
def check_communication_compatibility (
def _ check_communication_compatibility(
unity_com_ver : str , python_api_version : str , unity_package_version : str
) - > bool :
unity_communicator_version = StrictVersion ( unity_com_ver )
return True
@staticmethod
def get_capabilities_proto ( ) - > UnityRLCapabilitiesProto :
def _ get_capabilities_proto( ) - > UnityRLCapabilitiesProto :
def warn_csharp_base_capabilities (
def _ warn_csharp_base_capabilities(
caps : UnityRLCapabilitiesProto , unity_package_ver : str , python_package_ver : str
) - > None :
if not caps . baseRLCapabilities :
: str log_folder : Optional folder to write the Unity Player log file into . Requires absolute path .
"""
atexit . register ( self . _close )
self . additional_args = additional_args or [ ]
self . no_graphics = no_graphics
self . _ additional_args = additional_args or [ ]
self . _ no_graphics = no_graphics
# If base port is not specified, use BASE_ENVIRONMENT_PORT if we have
# an environment, otherwise DEFAULT_EDITOR_PORT
if base_port is None :
self . port = base_port + worker_id
self . _port = base_port + worker_id
self . proc1 = None
self . timeout_wait : int = timeout_wait
self . communicator = self . get_communicator ( worker_id , base_port , timeout_wait )
self . worker_id = worker_id
self . side_channel_manager = SideChannelManager ( side_channels )
self . log_folder = log_folder
self . _proc1 = None
self . _timeout_wait : int = timeout_wait
self . _communicator = self . _get_communicator ( worker_id , base_port , timeout_wait )
self . _worker_id = worker_id
self . _side_channel_manager = SideChannelManager ( side_channels )
self . _log_folder = log_folder
# If the environment name is None, a new environment will not be launched
# and the communicator will directly try to connect to an existing unity environment.
)
if file_name is not None :
try :
self . proc1 = env_utils . launch_executable (
file_name , self . executable_args ( )
self . _ proc1 = env_utils . launch_executable (
file_name , self . _ executable_args( )
)
except UnityEnvironmentException :
self . _close ( 0 )
f " Listening on port {self.port}. "
f " Listening on port {self._port}. "
f " Start training by pressing the Play button in the Unity Editor. "
)
self . _loaded = True
communication_version = self . API_VERSION ,
package_version = mlagents_envs . __version__ ,
capabilities = UnityEnvironment . get_capabilities_proto ( ) ,
capabilities = UnityEnvironment . _ get_capabilities_proto( ) ,
aca_output = self . send_academy_parameters ( rl_init_parameters_in )
aca_output = self . _ send_academy_parameters( rl_init_parameters_in )
if not UnityEnvironment . check_communication_compatibility (
if not UnityEnvironment . _ check_communication_compatibility(
aca_params . communication_version ,
UnityEnvironment . API_VERSION ,
aca_params . package_version ,
UnityEnvironment . warn_csharp_base_capabilities (
UnityEnvironment . _ warn_csharp_base_capabilities(
aca_params . capabilities ,
aca_params . package_version ,
UnityEnvironment . API_VERSION ,
self . _update_behavior_specs ( aca_output )
@staticmethod
def get_communicator ( worker_id , base_port , timeout_wait ) :
def _ get_communicator( worker_id , base_port , timeout_wait ) :
def executable_args ( self ) - > List [ str ] :
def _ executable_args( self ) - > List [ str ] :
if self . no_graphics :
if self . _ no_graphics:
args + = [ UnityEnvironment . PORT_COMMAND_LINE_ARG , str ( self . port ) ]
if self . log_folder :
args + = [ UnityEnvironment . _ PORT_COMMAND_LINE_ARG, str ( self . _ port) ]
if self . _ log_folder:
self . log_folder , f " Player-{self.worker_id}.log "
self . _ log_folder, f " Player-{self._ worker_id}.log "
args + = self . additional_args
args + = self . _ additional_args
return args
def _update_behavior_specs ( self , output : UnityOutputProto ) - > None :
DecisionSteps . empty ( self . _env_specs [ brain_name ] ) ,
TerminalSteps . empty ( self . _env_specs [ brain_name ] ) ,
)
self . side_channel_manager . process_side_channel_message ( output . side_channel )
self . _ side_channel_manager. process_side_channel_message ( output . side_channel )
outputs = self . communicator . exchange ( self . _generate_reset_input ( ) )
outputs = self . _ communicator. exchange ( self . _generate_reset_input ( ) )
if outputs is None :
raise UnityCommunicatorStoppedException ( " Communicator has exited. " )
self . _update_behavior_specs ( outputs )
] . create_empty_action ( n_agents )
step_input = self . _generate_step_input ( self . _env_actions )
with hierarchical_timer ( " communicator.exchange " ) :
outputs = self . communicator . exchange ( step_input )
outputs = self . _ communicator. exchange ( step_input )
if outputs is None :
raise UnityCommunicatorStoppedException ( " Communicator has exited. " )
self . _update_behavior_specs ( outputs )
force - killing it . Defaults to `self.timeout_wait` .
"""
if timeout is None :
timeout = self . timeout_wait
timeout = self . _timeout_wait
self . communicator . close ( )
if self . proc1 is not None :
self . _communicator . close ( )
if self . _proc1 is not None :
self . proc1 . wait ( timeout = timeout )
signal_name = self . returncode_to_signal_name ( self . proc1 . returncode )
self . _proc1 . wait ( timeout = timeout )
signal_name = self . _returncode_to_signal_name ( self . _proc1 . returncode )
return_info = f " Environment shut down with return code {self.proc1.returncode}{signal_name}. "
return_info = f " Environment shut down with return code {self._proc1.returncode}{signal_name}. "
self . proc1 . kill ( )
self . _proc1 . kill ( )
self . proc1 = None
@classmethod
def _flatten ( cls , arr : Any ) - > List [ float ] :
"""
Converts arrays to list .
: param arr : numpy vector .
: return : flattened list .
"""
if isinstance ( arr , cls . SCALAR_ACTION_TYPES ) :
arr = [ float ( arr ) ]
if isinstance ( arr , np . ndarray ) :
arr = arr . tolist ( )
if len ( arr ) == 0 :
return arr
if isinstance ( arr [ 0 ] , np . ndarray ) :
# pylint: disable=no-member
arr = [ item for sublist in arr for item in sublist . tolist ( ) ]
if isinstance ( arr [ 0 ] , list ) :
# pylint: disable=not-an-iterable
arr = [ item for sublist in arr for item in sublist ]
arr = [ float ( x ) for x in arr ]
return arr
self . _proc1 = None
@timed
def _generate_step_input (
rl_in . agent_actions [ b ] . value . extend ( [ action ] )
rl_in . command = STEP
rl_in . side_channel = bytes (
self . side_channel_manager . generate_side_channel_messages ( )
self . _ side_channel_manager. generate_side_channel_messages ( )
return self . wrap_unity_input ( rl_in )
return self . _ wrap_unity_input( rl_in )
self . side_channel_manager . generate_side_channel_messages ( )
self . _ side_channel_manager. generate_side_channel_messages ( )
return self . wrap_unity_input ( rl_in )
return self . _ wrap_unity_input( rl_in )
def send_academy_parameters (
def _ send_academy_parameters(
return self . communicator . initialize ( inputs )
return self . _ communicator. initialize ( inputs )
def wrap_unity_input ( rl_input : UnityRLInputProto ) - > UnityInputProto :
def _ wrap_unity_input( rl_input : UnityRLInputProto ) - > UnityInputProto :
def returncode_to_signal_name ( returncode : int ) - > Optional [ str ] :
def _ returncode_to_signal_name( returncode : int ) - > Optional [ str ] :
"""
Try to convert return codes into their corresponding signal name .
E . g . returncode_to_signal_name ( - 2 ) - > " SIGINT "