import atexit
import io
import json
import io
import socket
import struct
from PIL import Image
from communicator_objects import UnityRLInput , UnityRLOutput , AgentActionProto , \
EnvironmentParametersProto , UnityRLInitializationInput , UnityRLInitializationOutput , \
UnityInput , UnityOutput
from .rpc_communicator import RpcCommunicator
from .socket_communicator import SocketCommunicator
from PIL import Image
logging . basicConfig ( level = logging . INFO )
logger = logging . getLogger ( " unityagents " )
def __init__ ( self , file_name , worker_id = 0 ,
def __init__ ( self , file_name = None , worker_id = 0 ,
base_port = 5005 , curriculum = None ,
seed = 0 , docker_training = False ) :
"""
: param docker_training : Informs this class whether the process is being run within a container .
"""
atexit . register ( self . close )
atexit . register ( self . _close )
self . _version_ = " API-3 "
self . _loaded = False
self . _open_socket = False
self . _version_ = " API-4 "
self . _loaded = False # If true, this means the environment was successfully loaded
self . proc1 = None # The process that is started. If None, no process was started
self . communicator = self . get_communicator ( worker_id , base_port )
# If the environment name is 'editor', a new environment will not be launched
# and the communicator will directly try to connect to an existing unity environment.
if file_name is not None :
self . executable_launcher ( file_name , docker_training )
else :
logger . info ( " Ready to connect with the Editor. " )
self . _loaded = True
rl_init_parameters_in = UnityRLInitializationInput (
seed = seed
)
# Establish communication socket
self . _socket = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
self . _socket . setsockopt ( socket . SOL_SOCKET , socket . SO_REUSEADDR , 1 )
self . _socket . bind ( ( " localhost " , self . port ) )
self . _open_socket = True
except socket . error :
self . _open_socket = True
self . close ( )
raise socket . error ( " Couldn ' t launch new environment because worker number {} is still in use. "
" You may need to manually close a previously opened environment "
" or use a different worker number. " . format ( str ( worker_id ) ) )
aca_params = self . send_academy_parameters ( rl_init_parameters_in )
except UnityTimeOutException :
self . _close ( )
raise
# TODO : think of a better way to expose the academyParameters
self . _unity_version = aca_params . version
if self . _unity_version != self . _version_ :
raise UnityEnvironmentException (
" The API number is not compatible between Unity and python. Python API : {0}, Unity API : "
" {1}. \n Please go to https://github.com/Unity-Technologies/ml-agents to download the latest version "
" of ML-Agents. " . format ( self . _version_ , self . _unity_version ) )
self . _n_agents = { }
self . _global_done = None
self . _academy_name = aca_params . name
self . _log_path = aca_params . log_path
self . _brains = { }
self . _brain_names = [ ]
self . _external_brain_names = [ ]
for brain_param in aca_params . brain_parameters :
self . _brain_names + = [ brain_param . brain_name ]
resolution = [ {
" height " : x . height ,
" width " : x . width ,
" blackAndWhite " : x . gray_scale
} for x in brain_param . camera_resolutions ]
self . _brains [ brain_param . brain_name ] = \
BrainParameters ( brain_param . brain_name , {
" vectorObservationSize " : brain_param . vector_observation_size ,
" numStackedVectorObservations " : brain_param . num_stacked_vector_observations ,
" cameraResolutions " : resolution ,
" vectorActionSize " : brain_param . vector_action_size ,
" vectorActionDescriptions " : brain_param . vector_action_descriptions ,
" vectorActionSpaceType " : brain_param . vector_action_space_type ,
" vectorObservationSpaceType " : brain_param . vector_observation_space_type
} )
if brain_param . brain_type == 2 :
self . _external_brain_names + = [ brain_param . brain_name ]
self . _num_brains = len ( self . _brain_names )
self . _num_external_brains = len ( self . _external_brain_names )
self . _resetParameters = dict ( aca_params . environment_parameters . float_parameters ) # TODO
self . _curriculum = Curriculum ( curriculum , self . _resetParameters )
logger . info ( " \n ' {0} ' started successfully! \n {1} " . format ( self . _academy_name , str ( self ) ) )
if self . _num_external_brains == 0 :
logger . warning ( " No External Brains found in the Unity Environment. "
" You will not be able to pass actions to your agent(s). " )
@property
def curriculum ( self ) :
return self . _curriculum
@property
def logfile_path ( self ) :
return self . _log_path
@property
def brains ( self ) :
return self . _brains
@property
def global_done ( self ) :
return self . _global_done
@property
def academy_name ( self ) :
return self . _academy_name
@property
def number_brains ( self ) :
return self . _num_brains
@property
def number_external_brains ( self ) :
return self . _num_external_brains
@property
def brain_names ( self ) :
return self . _brain_names
@property
def external_brain_names ( self ) :
return self . _external_brain_names
def executable_launcher ( self , file_name , docker_training ) :
cwd = os . getcwd ( )
file_name = ( file_name . strip ( )
. replace ( ' .app ' , ' ' ) . replace ( ' .exe ' , ' ' ) . replace ( ' .x86_64 ' , ' ' ) . replace ( ' .x86 ' , ' ' ) )
if len ( candidates ) > 0 :
launch_string = candidates [ 0 ]
if launch_string is None :
self . close ( )
self . _ close( )
raise UnityEnvironmentException ( " Couldn ' t launch the {0} environment. "
" Provided filename does not match any environments. "
. format ( true_filename ) )
if docker_training == False :
proc1 = subprocess . Popen (
if not docker_training :
self . proc1 = subprocess . Popen (
' --port ' , str ( self . port ) ,
' --seed ' , str ( seed ) ] )
' --port ' , str ( self . port ) ] )
else :
"""
Comments for future maintenance :
"""
docker_ls = ( " exec xvfb-run --auto-servernum "
" --server-args= ' -screen 0 640x480x24 ' "
" {0} --port {1} --seed {2} " ) . format ( launch_string ,
str ( self . port ) ,
str ( seed ) )
proc1 = subprocess . Popen ( docker_ls ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
shell = True )
self . _socket . settimeout ( 30 )
try :
try :
self . _socket . listen ( 1 )
self . _conn , _ = self . _socket . accept ( )
self . _conn . settimeout ( 30 )
p = self . _conn . recv ( self . _buffer_size ) . decode ( ' utf-8 ' )
p = json . loads ( p )
except socket . timeout as e :
raise UnityTimeOutException (
" The Unity environment took too long to respond. Make sure {} does not need user interaction to "
" launch and that the Academy and the external Brain(s) are attached to objects in the Scene. "
. format ( str ( file_name ) ) )
if " apiNumber " not in p :
self . _unity_version = " API-1 "
else :
self . _unity_version = p [ " apiNumber " ]
if self . _unity_version != self . _version_ :
raise UnityEnvironmentException (
" The API number is not compatible between Unity and python. Python API : {0}, Unity API : "
" {1}. \n Please go to https://github.com/Unity-Technologies/ml-agents to download the latest version "
" of ML-Agents. " . format ( self . _version_ , self . _unity_version ) )
self . _data = { }
self . _global_done = None
self . _academy_name = p [ " AcademyName " ]
self . _log_path = p [ " logPath " ]
# Need to instantiate new AllBrainInfo
self . _brains = { }
self . _brain_names = p [ " brainNames " ]
self . _external_brain_names = p [ " externalBrainNames " ]
self . _external_brain_names = [ ] if self . _external_brain_names is None else self . _external_brain_names
self . _num_brains = len ( self . _brain_names )
self . _num_external_brains = len ( self . _external_brain_names )
self . _resetParameters = p [ " resetParameters " ]
self . _curriculum = Curriculum ( curriculum , self . _resetParameters )
for i in range ( self . _num_brains ) :
self . _brains [ self . _brain_names [ i ] ] = BrainParameters ( self . _brain_names [ i ] , p [ " brainParameters " ] [ i ] )
self . _loaded = True
logger . info ( " \n ' {0} ' started successfully! \n {1} " . format ( self . _academy_name , str ( self ) ) )
if self . _num_external_brains == 0 :
logger . warning ( " No External Brains found in the Unity Environment. "
" You will not be able to pass actions to your agent(s). " )
except UnityEnvironmentException :
proc1 . kill ( )
self . close ( )
raise
@property
def curriculum ( self ) :
return self . _curriculum
@property
def logfile_path ( self ) :
return self . _log_path
@property
def brains ( self ) :
return self . _brains
@property
def global_done ( self ) :
return self . _global_done
@property
def academy_name ( self ) :
return self . _academy_name
@property
def number_brains ( self ) :
return self . _num_brains
@property
def number_external_brains ( self ) :
return self . _num_external_brains
" {0} --port {1} " ) . format ( launch_string , str ( self . port ) )
self . proc1 = subprocess . Popen ( docker_ls ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
shell = True )
@property
def brain_names ( self ) :
return self . _brain_names
@property
def external_brain_names ( self ) :
return self . _external_brain_names
@staticmethod
def _process_pixels ( image_bytes = None , bw = False ) :
"""
Converts byte array observation image into numpy array , re - sizes it , and optionally converts it to grey scale
: param image_bytes : input byte array corresponding to image
: return : processed numpy array of observation from environment
"""
s = bytearray ( image_bytes )
image = Image . open ( io . BytesIO ( s ) )
s = np . array ( image ) / 255.0
if bw :
s = np . mean ( s , axis = 2 )
s = np . reshape ( s , [ s . shape [ 0 ] , s . shape [ 1 ] , 1 ] )
return s
def get_communicator ( self , worker_id , base_port ) :
return RpcCommunicator ( worker_id , base_port )
# return SocketCommunicator(worker_id, base_port)
def __str__ ( self ) :
_new_reset_param = self . _curriculum . get_config ( )
for k in self . _resetParameters ] ) ) + ' \n ' + \
' \n ' . join ( [ str ( self . _brains [ b ] ) for b in self . _brains ] )
def _recv_bytes ( self ) :
try :
s = self . _conn . recv ( self . _buffer_size )
message_length = struct . unpack ( " I " , bytearray ( s [ : 4 ] ) ) [ 0 ]
s = s [ 4 : ]
while len ( s ) != message_length :
s + = self . _conn . recv ( self . _buffer_size )
except socket . timeout as e :
raise UnityTimeOutException ( " The environment took too long to respond. " , self . _log_path )
return s
def _get_state_image ( self , bw ) :
"""
Receives observation from socket , and confirms .
: param bw :
: return :
"""
s = self . _recv_bytes ( )
s = self . _process_pixels ( image_bytes = s , bw = bw )
self . _conn . send ( b " RECEIVED " )
return s
def _get_state_dict ( self ) :
"""
Receives dictionary of state information from socket , and confirms .
: return :
"""
state = self . _recv_bytes ( ) . decode ( ' utf-8 ' )
if state [ : 14 ] == " END_OF_MESSAGE " :
return { } , state [ 15 : ] == ' True '
self . _conn . send ( b " RECEIVED " )
state_dict = json . loads ( state )
return state_dict , None
def reset ( self , train_mode = True , config = None , lesson = None ) - > AllBrainInfo :
"""
Sends a signal to reset the unity environment .
raise UnityEnvironmentException ( " The parameter ' {0} ' is not a valid parameter. " . format ( k ) )
if self . _loaded :
self . _conn . send ( b " RESET " )
try :
self . _conn . recv ( self . _buffer_size )
except socket . timeout as e :
raise UnityTimeOutException ( " The environment took too long to respond. " , self . _log_path )
self . _conn . send ( json . dumps ( { " train_model " : train_mode , " parameters " : config } ) . encode ( ' utf-8 ' ) )
return self . _get_state ( )
outputs = self . communicator . exchange (
self . _generate_reset_input ( train_mode , config )
)
if outputs is None :
raise KeyboardInterrupt
rl_output = outputs . rl_output
s = self . _get_state ( rl_output )
self . _global_done = s [ 1 ]
for _b in self . _external_brain_names :
self . _n_agents [ _b ] = len ( s [ 0 ] [ _b ] . agents )
return s [ 0 ]
def _get_state ( self ) - > AllBrainInfo :
"""
Collects experience information from all external brains in environment at current step.
: return : a dictionary of BrainInfo objects .
"""
self . _data = { }
while True :
state_dict , end_of_message = self . _get_state_dict ( )
if end_of_message is not None :
self . _global_done = end_of_message
for _b in self . _brain_names :
if _b not in self . _data :
self . _data [ _b ] = BrainInfo ( [ ] , np . array ( [ ] ) , [ ] , np . array ( [ ] ) ,
[ ] , [ ] , [ ] , np . array ( [ ] ) , [ ] , max_reached = [ ] )
return self . _data
b = state_dict [ " brain_name " ]
n_agent = len ( state_dict [ " agents " ] )
try :
if self . _brains [ b ] . vector_observation_space_type == " continuous " :
vector_obs = np . array ( state_dict [ " vectorObservations " ] ) . reshape (
( n_agent , self . _brains [ b ] . vector_observation_space_size
* self . _brains [ b ] . num_stacked_vector_observations ) )
else :
vector_obs = np . array ( state_dict [ " vectorObservations " ] ) . reshape (
( n_agent , self . _brains [ b ] . num_stacked_vector_observations ) )
except UnityActionException :
raise UnityActionException ( " Brain {0} has an invalid vector observation. "
" Expecting {1} {2} vector observations but received {3}. "
. format ( b , n_agent if self . _brains [ b ] . vector_observation_space_type == " discrete "
else str ( self . _brains [ b ] . vector_observation_space_size * n_agent
* self . _brains [ b ] . num_stacked_vector_observations ) ,
self . _brains [ b ] . vector_observation_space_type ,
len ( state_dict [ " vectorObservations " ] ) ) )
memories = np . array ( state_dict [ " memories " ] ) . reshape ( ( n_agent , - 1 ) )
text_obs = state_dict [ " textObservations " ]
rewards = state_dict [ " rewards " ]
dones = state_dict [ " dones " ]
agents = state_dict [ " agents " ]
maxes = state_dict [ " maxes " ]
if n_agent > 0 :
vector_actions = np . array ( state_dict [ " previousVectorActions " ] ) . reshape ( ( n_agent , - 1 ) )
text_actions = state_dict [ " previousTextActions " ]
else :
vector_actions = np . array ( [ ] )
text_actions = [ ]
observations = [ ]
for o in range ( self . _brains [ b ] . number_visual_observations ) :
obs_n = [ ]
for a in range ( n_agent ) :
obs_n . append ( self . _get_state_image ( self . _brains [ b ] . camera_resolutions [ o ] [ ' blackAndWhite ' ] ) )
observations . append ( np . array ( obs_n ) )
self . _data [ b ] = BrainInfo ( observations , vector_obs , text_obs , memories , rewards ,
agents , dones , vector_actions , text_actions , max_reached = maxes )
def _send_action ( self , vector_action , memory , text_action ) :
"""
Send dictionary of actions , memories , and value estimates over socket .
: param vector_action : a dictionary of lists of vector actions .
: param memory : a dictionary of lists of of memories .
: param text_action : a dictionary of lists of text actions .
"""
try :
self . _conn . recv ( self . _buffer_size )
except socket . timeout as e :
raise UnityTimeOutException ( " The environment took too long to respond. " , self . _log_path )
action_message = { " vector_action " : vector_action , " memory " : memory , " text_action " : text_action }
self . _conn . send ( self . _append_length ( json . dumps ( action_message ) . encode ( ' utf-8 ' ) ) )
@staticmethod
def _append_length ( message ) :
return struct . pack ( " I " , len ( message ) ) + message
@staticmethod
def _flatten ( arr ) :
"""
Converts dictionary of arrays to list for transmission over socket .
: param arr : numpy vector .
: return : flattened list .
"""
if isinstance ( arr , ( int , np . int_ , float , np . float_ ) ) :
arr = [ float ( arr ) ]
if isinstance ( arr , np . ndarray ) :
arr = arr . tolist ( )
if len ( arr ) == 0 :
return arr
if isinstance ( arr [ 0 ] , np . ndarray ) :
arr = [ item for sublist in arr for item in sublist . tolist ( ) ]
if isinstance ( arr [ 0 ] , list ) :
arr = [ item for sublist in arr for item in sublist ]
arr = [ float ( x ) for x in arr ]
return arr
def step ( self , vector_action = None , memory = None , text_action = None ) - > AllBrainInfo :
"""
Provides the environment with an action , moves the environment dynamics forward accordingly , and returns
" in the environment " . format ( brain_name ) )
for b in self . _external_brain_names :
n_agent = len ( self . _data [ b ] . agents )
n_agent = self . _n_agents [ b ]
if b not in vector_action :
# raise UnityActionException("You need to input an action for the brain {0}".format(b))
if self . _brains [ b ] . vector_action_space_type == " discrete " :
text_action [ b ] = [ " " ] * n_agent
else :
if text_action [ b ] is None :
text_action [ b ] = [ ]
text_action [ b ] = [ " " ] * n_agent
if isinstance ( text_action [ b ] , str ) :
text_action [ b ] = [ text_action [ b ] ] * n_agent
if not ( ( len ( text_action [ b ] ) == n_agent ) or len ( text_action [ b ] ) == 0 ) :
self . _brains [ b ] . vector_action_space_type ,
str ( vector_action [ b ] ) ) )
self . _conn . send ( b " STEP " )
self . _send_action ( vector_action , memory , text_action )
return self . _get_state ( )
outputs = self . communicator . exchange (
self . _generate_step_input ( vector_action , memory , text_action )
)
if outputs is None :
raise KeyboardInterrupt
rl_output = outputs . rl_output
s = self . _get_state ( rl_output )
self . _global_done = s [ 1 ]
for _b in self . _external_brain_names :
self . _n_agents [ _b ] = len ( s [ 0 ] [ _b ] . agents )
return s [ 0 ]
elif not self . _loaded :
raise UnityEnvironmentException ( " No Unity environment is loaded. " )
elif self . _global_done :
"""
Sends a shutdown signal to the unity environment , and closes the socket connection .
"""
if self . _loaded & self . _open_socket :
self . _conn . send ( b " EXIT " )
self . _conn . close ( )
if self . _open_socket :
self . _socket . close ( )
self . _loaded = False
if self . _loaded :
self . _close ( )
def _close ( self ) :
self . _loaded = False
self . communicator . close ( )
if self . proc1 is not None :
self . proc1 . kill ( )
@staticmethod
def _flatten ( arr ) :
"""
Converts arrays to list .
: param arr : numpy vector .
: return : flattened list .
"""
if isinstance ( arr , ( int , np . int_ , float , np . float_ ) ) :
arr = [ float ( arr ) ]
if isinstance ( arr , np . ndarray ) :
arr = arr . tolist ( )
if len ( arr ) == 0 :
return arr
if isinstance ( arr [ 0 ] , np . ndarray ) :
arr = [ item for sublist in arr for item in sublist . tolist ( ) ]
if isinstance ( arr [ 0 ] , list ) :
arr = [ item for sublist in arr for item in sublist ]
arr = [ float ( x ) for x in arr ]
return arr
@staticmethod
def _process_pixels ( image_bytes , gray_scale ) :
"""
Converts byte array observation image into numpy array , re - sizes it , and optionally converts it to grey scale
: param image_bytes : input byte array corresponding to image
: return : processed numpy array of observation from environment
"""
s = bytearray ( image_bytes )
image = Image . open ( io . BytesIO ( s ) )
s = np . array ( image ) / 255.0
if gray_scale :
s = np . mean ( s , axis = 2 )
s = np . reshape ( s , [ s . shape [ 0 ] , s . shape [ 1 ] , 1 ] )
return s
def _get_state ( self , output : UnityRLOutput ) - > ( AllBrainInfo , bool ) :
"""
Collects experience information from all external brains in environment at current step.
: return : a dictionary of BrainInfo objects .
"""
_data = { }
global_done = output . global_done
for b in output . agentInfos :
agent_info_list = output . agentInfos [ b ] . value
vis_obs = [ ]
for i in range ( self . brains [ b ] . number_visual_observations ) :
obs = [
self . _process_pixels ( x . visual_observations [ i ] , self . brains [ b ] . camera_resolutions [ i ] [ ' blackAndWhite ' ] )
for x in agent_info_list ]
vis_obs + = [ np . array ( obs ) ]
memory_size = max ( [ len ( x . memories ) for x in agent_info_list ] )
if memory_size == 0 :
memory = np . zeros ( ( 0 , 0 ) )
else :
[ x . memories . extend ( [ 0 ] * ( memory_size - len ( x . memories ) ) ) for x in agent_info_list ]
memory = np . array ( [ x . memories for x in agent_info_list ] )
_data [ b ] = BrainInfo (
visual_observation = vis_obs ,
vector_observation = np . array ( [ x . stacked_vector_observation for x in agent_info_list ] ) ,
text_observations = [ x . text_observation for x in agent_info_list ] ,
memory = memory ,
reward = [ x . reward for x in agent_info_list ] ,
agents = [ x . id for x in agent_info_list ] ,
local_done = [ x . done for x in agent_info_list ] ,
vector_action = np . array ( [ x . stored_vector_actions for x in agent_info_list ] ) ,
text_action = [ x . stored_text_actions for x in agent_info_list ] ,
max_reached = [ x . max_step_reached for x in agent_info_list ]
)
return _data , global_done
def _generate_step_input ( self , vector_action , memory , text_action ) - > UnityRLInput :
rl_in = UnityRLInput ( )
for b in vector_action :
n_agents = self . _n_agents [ b ]
if n_agents == 0 :
continue
_a_s = len ( vector_action [ b ] ) / / n_agents
_m_s = len ( memory [ b ] ) / / n_agents
for i in range ( n_agents ) :
action = AgentActionProto (
vector_actions = vector_action [ b ] [ i * _a_s : ( i + 1 ) * _a_s ] ,
memories = memory [ b ] [ i * _m_s : ( i + 1 ) * _m_s ] ,
text_actions = text_action [ b ] [ i ]
)
rl_in . agent_actions [ b ] . value . extend ( [ action ] )
rl_in . command = 0
return self . wrap_unity_input ( rl_in )
def _generate_reset_input ( self , training , config ) - > UnityRLInput :
rl_in = UnityRLInput ( )
rl_in . is_training = training
rl_in . environment_parameters . CopyFrom ( EnvironmentParametersProto ( ) )
for key in config :
rl_in . environment_parameters . float_parameters [ key ] = config [ key ]
rl_in . command = 1
return self . wrap_unity_input ( rl_in )
def send_academy_parameters ( self , init_parameters : UnityRLInitializationInput ) - > UnityRLInitializationOutput :
inputs = UnityInput ( )
inputs . rl_initialization_input . CopyFrom ( init_parameters )
return self . communicator . initialize ( inputs ) . rl_initialization_output
def wrap_unity_input ( self , rl_input : UnityRLInput ) - > UnityOutput :
result = UnityInput ( )
result . rl_input . CopyFrom ( rl_input )
return result