浏览代码

Fix SubprocessEnvManager hanging on unexpected exceptions. (#4699)

* Add shutdown sentinel value to subprocess_env_manager.
* Add Sanity Check for Zombie Workers
/MLA-1734-demo-provider
GitHub 4 年前
当前提交
a4c9f58e
共有 3 个文件被更改,包括 80 次插入12 次删除
  1. 52
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  2. 9
      ml-agents/mlagents/trainers/tests/simple_test_envs.py
  3. 31
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py

52
ml-agents/mlagents/trainers/subprocess_env_manager.py


from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set
import cloudpickle
import enum
import time
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.exception import (

logger = logging_util.get_logger(__name__)
WORKER_SHUTDOWN_TIMEOUT_S = 10
class EnvironmentCommand(enum.Enum):

RESET = 4
CLOSE = 5
ENV_EXITED = 6
CLOSED = 7
class EnvironmentRequest(NamedTuple):

self.previous_step: EnvironmentStep = EnvironmentStep.empty(worker_id)
self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False
self.closed = False
def send(self, cmd: EnvironmentCommand, payload: Any = None) -> None:
try:

except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: recv failed.")
def close(self):
def request_close(self):
try:
self.conn.send(EnvironmentRequest(EnvironmentCommand.CLOSE))
except (BrokenPipeError, EOFError):

pass
logger.debug(f"UnityEnvWorker {self.worker_id} joining process.")
self.process.join()
def worker(

EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex)
)
_send_response(EnvironmentCommand.ENV_EXITED, ex)
except Exception as ex:
logger.error(
f"UnityEnvironment worker {worker_id}: environment raised an unexpected exception."
)
step_queue.put(
EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex)
)
_send_response(EnvironmentCommand.ENV_EXITED, ex)
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread()
# See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.cancel_join_thread for
# more info.
step_queue.cancel_join_thread()
step_queue.close()
parent_conn.close()
step_queue.put(EnvironmentResponse(EnvironmentCommand.CLOSED, worker_id, None))
step_queue.close()
class SubprocessEnvManager(EnvManager):

super().__init__()
self.env_workers: List[UnityEnvWorker] = []
self.step_queue: Queue = Queue()
self.workers_alive = 0
for worker_idx in range(n_env):
self.env_workers.append(
self.create_worker(

self.workers_alive += 1
@staticmethod
def create_worker(

def close(self) -> None:
logger.debug("SubprocessEnvManager closing.")
for env_worker in self.env_workers:
env_worker.request_close()
# Pull messages out of the queue until every worker has CLOSED or we time out.
deadline = time.time() + WORKER_SHUTDOWN_TIMEOUT_S
while self.workers_alive > 0 and time.time() < deadline:
try:
step: EnvironmentResponse = self.step_queue.get_nowait()
env_worker = self.env_workers[step.worker_id]
if step.cmd == EnvironmentCommand.CLOSED and not env_worker.closed:
env_worker.closed = True
self.workers_alive -= 1
# Discard all other messages.
except EmptyQueueException:
pass
# Sanity check to kill zombie workers and report an issue if they occur.
if self.workers_alive > 0:
logger.error("SubprocessEnvManager had workers that didn't signal shutdown")
for env_worker in self.env_workers:
if not env_worker.closed and env_worker.process.is_alive():
env_worker.process.terminate()
logger.error(
"A SubprocessEnvManager worker did not shut down correctly so it was forcefully terminated."
)
for env_worker in self.env_workers:
env_worker.close()
def _postprocess_steps(
self, env_steps: List[EnvironmentResponse]

9
ml-agents/mlagents/trainers/tests/simple_test_envs.py


else:
self.action[name] = [[float(self.goal[name])]]
self.step()
class UnexpectedExceptionEnvironment(SimpleEnvironment):
def __init__(self, brain_names, use_discrete, to_raise):
super().__init__(brain_names, use_discrete)
self.to_raise = to_raise
def step(self) -> None:
raise self.to_raise()

31
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


from mlagents_envs.side_channel.engine_configuration_channel import EngineConfig
from mlagents_envs.side_channel.stats_side_channel import StatsAggregationMethod
from mlagents_envs.exception import UnityEnvironmentException
from mlagents.trainers.tests.simple_test_envs import SimpleEnvironment
from mlagents.trainers.tests.simple_test_envs import (
SimpleEnvironment,
UnexpectedExceptionEnvironment,
)
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests.check_env_trains import (

assert all(
val > 0.7 for val in StatsReporter.writers[0].get_last_rewards().values()
)
env_manager.close()
class CustomTestOnlyException(Exception):
pass
@pytest.mark.parametrize("num_envs", [1, 4])
def test_subprocess_failing_step(num_envs):
def failing_step_env_factory(_worker_id, _config):
env = UnexpectedExceptionEnvironment(
["1D"], use_discrete=True, to_raise=CustomTestOnlyException
)
return env
env_manager = SubprocessEnvManager(
failing_step_env_factory, EngineConfig.default_config()
)
# Expect the exception raised to be routed back up to the top level.
with pytest.raises(CustomTestOnlyException):
check_environment_trains(
failing_step_env_factory(0, []),
{"1D": ppo_dummy_config()},
env_manager=env_manager,
success_threshold=None,
)
env_manager.close()

正在加载...
取消
保存