您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
97 行
3.7 KiB
97 行
3.7 KiB
import logging
|
|
import grpc
|
|
|
|
from multiprocessing import Pipe
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from .communicator import Communicator
|
|
from .communicator_objects import UnityToExternalServicer, add_UnityToExternalServicer_to_server
|
|
from .communicator_objects import UnityMessage, UnityInput, UnityOutput
|
|
from .exception import UnityTimeOutException
|
|
|
|
|
|
logger = logging.getLogger("mlagents.envs")
|
|
|
|
|
|
class UnityToExternalServicerImplementation(UnityToExternalServicer):
|
|
def __init__(self):
|
|
self.parent_conn, self.child_conn = Pipe()
|
|
|
|
def Initialize(self, request, context):
|
|
self.child_conn.send(request)
|
|
return self.child_conn.recv()
|
|
|
|
def Exchange(self, request, context):
|
|
self.child_conn.send(request)
|
|
return self.child_conn.recv()
|
|
|
|
|
|
class RpcCommunicator(Communicator):
|
|
def __init__(self, worker_id=0,
|
|
base_port=5005):
|
|
"""
|
|
Python side of the grpc communication. Python is the server and Unity the client
|
|
|
|
|
|
:int base_port: Baseline port number to connect to Unity environment over. worker_id increments over this.
|
|
:int worker_id: Number to add to communication port (5005) [0]. Used for asynchronous agent scenarios.
|
|
"""
|
|
self.port = base_port + worker_id
|
|
self.worker_id = worker_id
|
|
self.server = None
|
|
self.unity_to_external = None
|
|
self.is_open = False
|
|
|
|
def initialize(self, inputs: UnityInput) -> UnityOutput:
|
|
try:
|
|
# Establish communication grpc
|
|
self.server = grpc.server(ThreadPoolExecutor(max_workers=10))
|
|
self.unity_to_external = UnityToExternalServicerImplementation()
|
|
add_UnityToExternalServicer_to_server(self.unity_to_external, self.server)
|
|
self.server.add_insecure_port('[::]:'+str(self.port))
|
|
self.server.start()
|
|
except :
|
|
raise UnityTimeOutException(
|
|
"Couldn't start socket communication because worker number {} is still in use. "
|
|
"You may need to manually close a previously opened environment "
|
|
"or use a different worker number.".format(str(self.worker_id)))
|
|
if not self.unity_to_external.parent_conn.poll(30):
|
|
raise UnityTimeOutException(
|
|
"The Unity environment took too long to respond. Make sure that :\n"
|
|
"\t The environment does not need user interaction to launch\n"
|
|
"\t The Academy and the External Brain(s) are attached to objects in the Scene\n"
|
|
"\t The environment and the Python interface have compatible versions.")
|
|
aca_param = self.unity_to_external.parent_conn.recv().unity_output
|
|
self.is_open = True
|
|
message = UnityMessage()
|
|
message.header.status = 200
|
|
message.unity_input.CopyFrom(inputs)
|
|
self.unity_to_external.parent_conn.send(message)
|
|
self.unity_to_external.parent_conn.recv()
|
|
return aca_param
|
|
|
|
def exchange(self, inputs: UnityInput) -> UnityOutput:
|
|
message = UnityMessage()
|
|
message.header.status = 200
|
|
message.unity_input.CopyFrom(inputs)
|
|
self.unity_to_external.parent_conn.send(message)
|
|
output = self.unity_to_external.parent_conn.recv()
|
|
if output.header.status != 200:
|
|
return None
|
|
return output.unity_output
|
|
|
|
def close(self):
|
|
"""
|
|
Sends a shutdown signal to the unity environment, and closes the grpc connection.
|
|
"""
|
|
if self.is_open:
|
|
message_input = UnityMessage()
|
|
message_input.header.status = 400
|
|
self.unity_to_external.parent_conn.send(message_input)
|
|
self.unity_to_external.parent_conn.close()
|
|
self.server.stop(False)
|
|
self.is_open = False
|
|
|
|
|
|
|
|
|