import atexit
from distutils.version import StrictVersion
import glob
import uuid
from typing import Dict , List , Optional , Any , Tuple
from typing import Dict , List , Optional , Tuple , Mapping as MappingType
from mlagents_envs.side_channel.side_channel import SideChannel , IncomingMessage
from mlagents_envs.side_channel.side_channel import SideChannel
from mlagents_envs.side_channel.side_channel_manager import SideChannelManager
from mlagents_envs import env_utils
from mlagents_envs.base_env import (
BaseEnv ,
BehaviorName ,
AgentId ,
BehaviorMapping ,
)
from mlagents_envs.timers import timed , hierarchical_timer
from mlagents_envs.exception import (
from mlagents_envs.communicator_objects.unity_input_pb2 import UnityInputProto
from .rpc_communicator import RpcCommunicator
from sys import platform
import struct
SCALAR_ACTION_TYPES = ( int , np . int32 , np . int64 , float , np . float32 , np . float64 )
SINGLE_BRAIN_ACTION_TYPES = SCALAR_ACTION_TYPES + ( list , np . ndarray )
# Communication protocol version.
# When connecting to C#, this must be compatible with Academy.k_ApiVersion.
# We follow semantic versioning on the communication version, so existing
BASE_ENVIRONMENT_PORT = 5005
# Command line argument used to pass the port to the executable environment.
PORT_COMMAND_LINE_ARG = " --mlagents-port "
_ PORT_COMMAND_LINE_ARG = " --mlagents-port "
@staticmethod
def _raise_version_exception ( unity_com_ver : str ) - > None :
)
@staticmethod
def check_communication_compatibility (
def _ check_communication_compatibility(
unity_com_ver : str , python_api_version : str , unity_package_version : str
) - > bool :
unity_communicator_version = StrictVersion ( unity_com_ver )
return True
@staticmethod
def get_capabilities_proto ( ) - > UnityRLCapabilitiesProto :
def _ get_capabilities_proto( ) - > UnityRLCapabilitiesProto :
def warn_csharp_base_capabit lities (
def _ warn_csharp_base_capabilities(
caps : UnityRLCapabilitiesProto , unity_package_ver : str , python_package_ver : str
) - > None :
if not caps . baseRLCapabilities :
: str log_folder : Optional folder to write the Unity Player log file into . Requires absolute path .
"""
atexit . register ( self . _close )
self . additional_args = additional_args or [ ]
self . no_graphics = no_graphics
self . _ additional_args = additional_args or [ ]
self . _ no_graphics = no_graphics
# If base port is not specified, use BASE_ENVIRONMENT_PORT if we have
# an environment, otherwise DEFAULT_EDITOR_PORT
if base_port is None :
self . port = base_port + worker_id
self . _port = base_port + worker_id
self . proc1 = None
self . timeout_wait : int = timeout_wait
self . communicator = self . get_communicator ( worker_id , base_port , timeout_wait )
self . worker_id = worker_id
self . side_channels : Dict [ uuid . UUID , SideChannel ] = { }
if side_channels is not None :
for _sc in side_channels :
if _sc . channel_id in self . side_channels :
raise UnityEnvironmentException (
" There cannot be two side channels with the same channel id {0}. " . format (
_sc . channel_id
)
)
self . side_channels [ _sc . channel_id ] = _sc
self . log_folder = log_folder
self . _proc1 = None
self . _timeout_wait : int = timeout_wait
self . _communicator = self . _get_communicator ( worker_id , base_port , timeout_wait )
self . _worker_id = worker_id
self . _side_channel_manager = SideChannelManager ( side_channels )
self . _log_folder = log_folder
# If the environment name is None, a new environment will not be launched
# and the communicator will directly try to connect to an existing unity environment.
" the worker-id must be 0 in order to connect with the Editor. "
)
if file_name is not None :
self . executable_launcher ( file_name , no_graphics , additional_args )
try :
self . _proc1 = env_utils . launch_executable (
file_name , self . _executable_args ( )
)
except UnityEnvironmentException :
self . _close ( 0 )
raise
f " Listening on port {self.port}. "
f " Listening on port {self._port}. "
f " Start training by pressing the Play button in the Unity Editor. "
)
self . _loaded = True
communication_version = self . API_VERSION ,
package_version = mlagents_envs . __version__ ,
capabilities = UnityEnvironment . get_capabilities_proto ( ) ,
capabilities = UnityEnvironment . _ get_capabilities_proto( ) ,
aca_output = self . send_academy_parameters ( rl_init_parameters_in )
aca_output = self . _ send_academy_parameters( rl_init_parameters_in )
if not UnityEnvironment . check_communication_compatibility (
if not UnityEnvironment . _ check_communication_compatibility(
aca_params . communication_version ,
UnityEnvironment . API_VERSION ,
aca_params . package_version ,
UnityEnvironment . warn_csharp_base_capabit lities (
UnityEnvironment . _ warn_csharp_base_capabilities(
aca_params . capabilities ,
aca_params . package_version ,
UnityEnvironment . API_VERSION ,
self . _update_behavior_specs ( aca_output )
@staticmethod
def get_communicator ( worker_id , base_port , timeout_wait ) :
def _get_communicator ( worker_id , base_port , timeout_wait ) :
@staticmethod
def validate_environment_path ( env_path : str ) - > Optional [ str ] :
# Strip out executable extensions if passed
env_path = (
env_path . strip ( )
. replace ( " .app " , " " )
. replace ( " .exe " , " " )
. replace ( " .x86_64 " , " " )
. replace ( " .x86 " , " " )
)
true_filename = os . path . basename ( os . path . normpath ( env_path ) )
logger . debug ( " The true file name is {} " . format ( true_filename ) )
if not ( glob . glob ( env_path ) or glob . glob ( env_path + " .* " ) ) :
return None
cwd = os . getcwd ( )
launch_string = None
true_filename = os . path . basename ( os . path . normpath ( env_path ) )
if platform == " linux " or platform == " linux2 " :
candidates = glob . glob ( os . path . join ( cwd , env_path ) + " .x86_64 " )
if len ( candidates ) == 0 :
candidates = glob . glob ( os . path . join ( cwd , env_path ) + " .x86 " )
if len ( candidates ) == 0 :
candidates = glob . glob ( env_path + " .x86_64 " )
if len ( candidates ) == 0 :
candidates = glob . glob ( env_path + " .x86 " )
if len ( candidates ) > 0 :
launch_string = candidates [ 0 ]
elif platform == " darwin " :
candidates = glob . glob (
os . path . join ( cwd , env_path + " .app " , " Contents " , " MacOS " , true_filename )
)
if len ( candidates ) == 0 :
candidates = glob . glob (
os . path . join ( env_path + " .app " , " Contents " , " MacOS " , true_filename )
)
if len ( candidates ) == 0 :
candidates = glob . glob (
os . path . join ( cwd , env_path + " .app " , " Contents " , " MacOS " , " * " )
)
if len ( candidates ) == 0 :
candidates = glob . glob (
os . path . join ( env_path + " .app " , " Contents " , " MacOS " , " * " )
)
if len ( candidates ) > 0 :
launch_string = candidates [ 0 ]
elif platform == " win32 " :
candidates = glob . glob ( os . path . join ( cwd , env_path + " .exe " ) )
if len ( candidates ) == 0 :
candidates = glob . glob ( env_path + " .exe " )
if len ( candidates ) > 0 :
launch_string = candidates [ 0 ]
return launch_string
def executable_args ( self ) - > List [ str ] :
def _executable_args ( self ) - > List [ str ] :
if self . no_graphics :
if self . _no_graphics :
args + = [ UnityEnvironment . PORT_COMMAND_LINE_ARG , str ( self . port ) ]
if self . log_folder :
args + = [ UnityEnvironment . _PORT_COMMAND_LINE_ARG , str ( self . _port ) ]
if self . _log_folder :
self . log_folder , f " Player-{self.worker_id}.log "
self . _log_folder , f " Player-{self._worker_id}.log "
args + = self . additional_args
args + = self . _additional_args
def executable_launcher ( self , file_name , no_graphics , args ) :
launch_string = self . validate_environment_path ( file_name )
if launch_string is None :
self . _close ( 0 )
raise UnityEnvironmentException (
f " Couldn ' t launch the {file_name} environment. Provided filename does not match any environments. "
)
else :
logger . debug ( " This is the launch string {} " . format ( launch_string ) )
# Launch Unity environment
subprocess_args = [ launch_string ] + self . executable_args ( )
try :
self . proc1 = subprocess . Popen (
subprocess_args ,
# start_new_session=True means that signals to the parent python process
# (e.g. SIGINT from keyboard interrupt) will not be sent to the new process on POSIX platforms.
# This is generally good since we want the environment to have a chance to shutdown,
# but may be undesirable in come cases; if so, we'll add a command-line toggle.
# Note that on Windows, the CTRL_C signal will still be sent.
start_new_session = True ,
)
except PermissionError as perm :
# This is likely due to missing read or execute permissions on file.
raise UnityEnvironmentException (
f " Error when trying to launch environment - make sure "
f " permissions are set correctly. For example "
f ' " chmod -R 755 {launch_string} " '
) from perm
def _update_behavior_specs ( self , output : UnityOutputProto ) - > None :
init_output = output . rl_initialization_output
for brain_param in init_output . brain_parameters :
DecisionSteps . empty ( self . _env_specs [ brain_name ] ) ,
TerminalSteps . empty ( self . _env_specs [ brain_name ] ) ,
)
self . _parse_side_channel_message ( self . side_channels , output . side_channel )
self . _side_channel_manager . process_side_channel_message ( output . side_channel )
outputs = self . communicator . exchange ( self . _generate_reset_input ( ) )
outputs = self . _ communicator. exchange ( self . _generate_reset_input ( ) )
if outputs is None :
raise UnityCommunicatorStoppedException ( " Communicator has exited. " )
self . _update_behavior_specs ( outputs )
] . create_empty_action ( n_agents )
step_input = self . _generate_step_input ( self . _env_actions )
with hierarchical_timer ( " communicator.exchange " ) :
outputs = self . communicator . exchange ( step_input )
outputs = self . _ communicator. exchange ( step_input )
if outputs is None :
raise UnityCommunicatorStoppedException ( " Communicator has exited. " )
self . _update_behavior_specs ( outputs )
def get_behavior_names ( self ) :
return list ( self . _env_specs . keys ( ) )
@property
def behavior_specs ( self ) - > MappingType [ str , BehaviorSpec ] :
return BehaviorMapping ( self . _env_specs )
def _assert_behavior_exists ( self , behavior_name : str ) - > None :
if behavior_name not in self . _env_specs :
expected_shape = ( len ( self . _env_state [ behavior_name ] [ 0 ] ) , spec . action_size )
if action . shape != expected_shape :
raise UnityActionException (
" The behavior {0} needs an input of dimension {1} but received input of dimension {2} " . format (
behavior_name , expected_shape , action . shape
)
" The behavior {0} needs an input of dimension {1} for "
" (<number of agents>, <action size>) but received input of <