浏览代码

Check that worker port is available in RpcCommunicator (#1415)

* Check that worker port is available in RpcCommunicator

Previously the RpcCommunicator did not check the port or create the
RPC server until `initialize()` was called.  Since "initialize"
requires the environment to be available, this means we might create
a new environment which connects to an existing RPC server running
in another process.  This causes both training runs to fail.

As a remedy to this issue, this commit moves the server creation into
the RpcCommunicator constructor and adds an explicit socket binding
check to the requested port.

* Fixes suggested by Codacy

* Update rpc_communicator.py

* Addressing feedback: formatting & consistency
/develop-generalizationTraining-TrainerController
GitHub 6 年前
当前提交
2d68b835
共有 3 个文件被更改,包括 64 次插入16 次删除
  1. 16
      ml-agents/mlagents/envs/exception.py
  2. 42
      ml-agents/mlagents/envs/rpc_communicator.py
  3. 22
      ml-agents/tests/envs/test_rpc_communicator.py

16
ml-agents/mlagents/envs/exception.py


"You can check the logfile for more information at {}".format(log_file_path))
except:
logger.error("An error might have occured in the environment. "
"No UnitySDK.log file could be found.")
"No UnitySDK.log file could be found.")
class UnityWorkerInUseException(UnityException):
"""
This error occurs when the port for a certain worker ID is already reserved.
"""
MESSAGE_TEMPLATE = (
"Couldn't start socket communication because worker number {} is still in use. "
"You may need to manually close a previously opened environment "
"or use a different worker number.")
def __init__(self, worker_id):
message = self.MESSAGE_TEMPLATE.format(str(worker_id))
super(UnityWorkerInUseException, self).__init__(message)

42
ml-agents/mlagents/envs/rpc_communicator.py


import logging
import grpc
import socket
from multiprocessing import Pipe
from concurrent.futures import ThreadPoolExecutor

from .exception import UnityTimeOutException
from .exception import UnityTimeOutException, UnityWorkerInUseException
logger = logging.getLogger("mlagents.envs")

class RpcCommunicator(Communicator):
def __init__(self, worker_id=0,
base_port=5005):
def __init__(self, worker_id=0, base_port=5005):
"""
Python side of the grpc communication. Python is the server and Unity the client

self.server = None
self.unity_to_external = None
self.is_open = False
self.create_server()
def initialize(self, inputs: UnityInput) -> UnityOutput:
def create_server(self):
"""
Creates the GRPC server.
"""
self.check_port(self.port)
self.server.add_insecure_port('[::]:'+str(self.port))
self.server.add_insecure_port('localhost:' + str(self.port))
except :
raise UnityTimeOutException(
"Couldn't start socket communication because worker number {} is still in use. "
"You may need to manually close a previously opened environment "
"or use a different worker number.".format(str(self.worker_id)))
except:
raise UnityWorkerInUseException(self.worker_id)
def check_port(self, port):
"""
Attempts to bind to the requested communicator port, checking if it is already in use.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("localhost", port))
except socket.error:
raise UnityWorkerInUseException(self.worker_id)
finally:
s.close()
def initialize(self, inputs: UnityInput) -> UnityOutput:
if not self.unity_to_external.parent_conn.poll(30):
raise UnityTimeOutException(
"The Unity environment took too long to respond. Make sure that :\n"

self.unity_to_external.parent_conn.close()
self.server.stop(False)
self.is_open = False

22
ml-agents/tests/envs/test_rpc_communicator.py


import pytest
from mlagents.envs import RpcCommunicator
from mlagents.envs import UnityWorkerInUseException
def test_rpc_communicator_checks_port_on_create():
first_comm = RpcCommunicator()
with pytest.raises(UnityWorkerInUseException):
second_comm = RpcCommunicator()
second_comm.close()
first_comm.close()
def test_rpc_communicator_create_multiple_workers():
# Ensures multiple RPC communicators can be created with
# different worker_ids without causing an error.
first_comm = RpcCommunicator()
second_comm = RpcCommunicator(worker_id=1)
first_comm.close()
second_comm.close()
正在加载...
取消
保存