浏览代码

Add timeout for communicator exchange

When we initially connect to the environment using RPCCommunicator,
the connection is polled so we don't hang forever on `.recv()` when
the environment wasn't launched or failed.  However we don't currently
have any similar check for the exchanges mid-training-run.

This change applies the same timeout from initialization to each exchange,
and extends the default `timeout_wait` to 60 seconds to generally improve
the chances we won't have a mismatch between environment launch time and
the trainer timeout.

Tested on: single-env and multi-env cases.  Killed 1 environment process
manually and saw that the model was saved appropriately and all processes
closed.
/develop-newnormalization
Jonathan Harper 5 年前
当前提交
bae94a76
共有 3 个文件被更改,包括 13 次插入4 次删除
  1. 2
      ml-agents-envs/mlagents/envs/environment.py
  2. 11
      ml-agents-envs/mlagents/envs/rpc_communicator.py
  3. 4
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py

2
ml-agents-envs/mlagents/envs/environment.py


seed: int = 0,
docker_training: bool = False,
no_graphics: bool = False,
timeout_wait: int = 30,
timeout_wait: int = 60,
args: Optional[List[str]] = None,
):
"""

11
ml-agents-envs/mlagents/envs/rpc_communicator.py


finally:
s.close()
def initialize(self, inputs: UnityInputProto) -> UnityOutputProto:
def poll_for_timeout(self):
"""
Polls the GRPC parent connection for data, to be used before calling recv. This prevents
us from hanging indefinitely in the case where the environment process has died or was not
launched.
"""
if not self.unity_to_external.parent_conn.poll(self.timeout_wait):
raise UnityTimeOutException(
"The Unity environment took too long to respond. Make sure that :\n"

)
def initialize(self, inputs: UnityInputProto) -> UnityOutputProto:
self.poll_for_timeout()
aca_param = self.unity_to_external.parent_conn.recv().unity_output
message = UnityMessageProto()
message.header.status = 200

message.header.status = 200
message.unity_input.CopyFrom(inputs)
self.unity_to_external.parent_conn.send(message)
self.poll_for_timeout()
output = self.unity_to_external.parent_conn.recv()
if output.header.status != 200:
return None

4
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


import cloudpickle
from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.exception import UnityCommunicationException
from mlagents.envs.exception import UnityCommunicationException, UnityTimeOutException
from multiprocessing import Process, Pipe, Queue
from multiprocessing.connection import Connection
from queue import Empty as EmptyQueueException

_send_response("reset", all_brain_info)
elif cmd.name == "close":
break
except (KeyboardInterrupt, UnityCommunicationException):
except (KeyboardInterrupt, UnityCommunicationException, UnityTimeOutException):
logger.info(f"UnityEnvironment worker {worker_id}: environment stopping.")
step_queue.put(EnvironmentResponse("env_close", worker_id, None))
finally:

正在加载...
取消
保存