浏览代码

Raise exceptions from environment subprocesses (#3680)

This commit surfaces exceptions from environment worker subprocesses,
and changes the SubprocessEnvManager to raise those exceptions when
caught.  Additionally TrainerController was changed to treat environment
exceptions differently than KeyboardInterrupts.  We now raise the
environment exceptions after exporting the model, so that ML-Agents will
correctly exit with a non-zero return code.
/develop/add-fire
GitHub 5 年前
当前提交
807a1441
共有 4 个文件被更改,包括 115 次插入50 次删除
  1. 1
      com.unity.ml-agents/CHANGELOG.md
  2. 104
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  3. 46
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py
  4. 14
      ml-agents/mlagents/trainers/trainer_controller.py

1
com.unity.ml-agents/CHANGELOG.md


- The way that UnityEnvironment decides the port was changed. If no port is specified, the behavior will depend on the `file_name` parameter. If it is `None`, 5004 (the editor port) will be used; otherwise 5005 (the base environment port) will be used.
- Fixed an issue where switching models using `SetModel()` during training would use an excessive amount of memory. (#3664)
- Environment subprocesses now close immediately on timeout or wrong API version. (#3679)
- Fixed an issue where exceptions from environments provided a returncode of 0. (#3680)
## [0.15.0-preview] - 2020-03-18
### Major Changes

104
ml-agents/mlagents/trainers/subprocess_env_manager.py


import logging
from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set, Tuple
import cloudpickle
import enum
from mlagents_envs.exception import UnityCommunicationException, UnityTimeOutException
from mlagents_envs.exception import (
UnityCommunicationException,
UnityTimeOutException,
UnityEnvironmentException,
)
from multiprocessing import Process, Pipe, Queue
from multiprocessing.connection import Connection
from queue import Empty as EmptyQueueException

logger = logging.getLogger("mlagents.trainers")
class EnvironmentCommand(NamedTuple):
name: str
class EnvironmentCommand(enum.Enum):
STEP = 1
EXTERNAL_BRAINS = 2
GET_PROPERTIES = 3
RESET = 4
CLOSE = 5
ENV_EXITED = 6
class EnvironmentRequest(NamedTuple):
cmd: EnvironmentCommand
name: str
cmd: EnvironmentCommand
worker_id: int
payload: Any

self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False
def send(self, name: str, payload: Any = None) -> None:
def send(self, cmd: EnvironmentCommand, payload: Any = None) -> None:
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)
req = EnvironmentRequest(cmd, payload)
self.conn.send(req)
except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: send failed.")

if response.cmd == EnvironmentCommand.ENV_EXITED:
env_exception: Exception = response.payload
raise env_exception
return response
except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: recv failed.")

self.conn.send(EnvironmentCommand("close"))
self.conn.send(EnvironmentRequest(EnvironmentCommand.CLOSE))
except (BrokenPipeError, EOFError):
logger.debug(
f"UnityEnvWorker {self.worker_id} got exception trying to close."

engine_configuration_channel = EngineConfigurationChannel()
engine_configuration_channel.set_configuration(engine_configuration)
stats_channel = StatsSideChannel()
env: BaseEnv = env_factory(
worker_id,
[shared_float_properties, engine_configuration_channel, stats_channel],
)
env: BaseEnv = None
def _send_response(cmd_name, payload):
def _send_response(cmd_name: EnvironmentCommand, payload: Any) -> None:
parent_conn.send(EnvironmentResponse(cmd_name, worker_id, payload))
def _generate_all_results() -> AllStepResult:

return result
try:
env = env_factory(
worker_id,
[shared_float_properties, engine_configuration_channel, stats_channel],
)
cmd: EnvironmentCommand = parent_conn.recv()
if cmd.name == "step":
all_action_info = cmd.payload
req: EnvironmentRequest = parent_conn.recv()
if req.cmd == EnvironmentCommand.STEP:
all_action_info = req.payload
for brain_name, action_info in all_action_info.items():
if len(action_info.action) != 0:
env.set_actions(brain_name, action_info.action)

step_response = StepResponse(
all_step_result, get_timer_root(), env_stats
)
step_queue.put(EnvironmentResponse("step", worker_id, step_response))
step_queue.put(
EnvironmentResponse(
EnvironmentCommand.STEP, worker_id, step_response
)
)
elif cmd.name == "external_brains":
_send_response("external_brains", external_brains())
elif cmd.name == "get_properties":
elif req.cmd == EnvironmentCommand.EXTERNAL_BRAINS:
_send_response(EnvironmentCommand.EXTERNAL_BRAINS, external_brains())
elif req.cmd == EnvironmentCommand.GET_PROPERTIES:
_send_response("get_properties", reset_params)
elif cmd.name == "reset":
for k, v in cmd.payload.items():
_send_response(EnvironmentCommand.GET_PROPERTIES, reset_params)
elif req.cmd == EnvironmentCommand.RESET:
for k, v in req.payload.items():
_send_response("reset", all_step_result)
elif cmd.name == "close":
_send_response(EnvironmentCommand.RESET, all_step_result)
elif req.cmd == EnvironmentCommand.CLOSE:
except (KeyboardInterrupt, UnityCommunicationException, UnityTimeOutException):
except (
KeyboardInterrupt,
UnityCommunicationException,
UnityTimeOutException,
UnityEnvironmentException,
) as ex:
step_queue.put(EnvironmentResponse("env_close", worker_id, None))
step_queue.put(
EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex)
)
_send_response(EnvironmentCommand.ENV_EXITED, ex)
finally:
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread()

step_queue.cancel_join_thread()
step_queue.close()
env.close()
if env is not None:
env.close()
logger.debug(f"UnityEnvironment worker {worker_id} done.")

if not env_worker.waiting:
env_action_info = self._take_step(env_worker.previous_step)
env_worker.previous_all_action_info = env_action_info
env_worker.send("step", env_action_info)
env_worker.send(EnvironmentCommand.STEP, env_action_info)
env_worker.waiting = True
def _step(self) -> List[EnvironmentStep]:

while len(worker_steps) < 1:
try:
while True:
step = self.step_queue.get_nowait()
if step.name == "env_close":
raise UnityCommunicationException(
"At least one of the environments has closed."
)
step: EnvironmentResponse = self.step_queue.get_nowait()
if step.cmd == EnvironmentCommand.ENV_EXITED:
env_exception: Exception = step.payload
raise env_exception
self.env_workers[step.worker_id].waiting = False
if step.worker_id not in step_workers:
worker_steps.append(step)

self.env_workers[step.worker_id].waiting = False
# First enqueue reset commands for all workers so that they reset in parallel
for ew in self.env_workers:
ew.send("reset", config)
ew.send(EnvironmentCommand.RESET, config)
# Next (synchronously) collect the reset observations from each worker in sequence
for ew in self.env_workers:
ew.previous_step = EnvironmentStep(ew.recv().payload, ew.worker_id, {}, {})

def external_brains(self) -> Dict[AgentGroup, BrainParameters]:
self.env_workers[0].send("external_brains")
self.env_workers[0].send(EnvironmentCommand.EXTERNAL_BRAINS)
self.env_workers[0].send("get_properties")
self.env_workers[0].send(EnvironmentCommand.GET_PROPERTIES)
return self.env_workers[0].recv().payload
def close(self) -> None:

46
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


SubprocessEnvManager,
EnvironmentResponse,
StepResponse,
EnvironmentCommand,
from mlagents_envs.exception import UnityEnvironmentException
from mlagents.trainers.tests.simple_test_envs import SimpleEnvironment
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.tests.test_simple_rl import (

def create_worker_mock(worker_id, step_queue, env_factor, engine_c):
return MockEnvWorker(worker_id, EnvironmentResponse("reset", worker_id, worker_id))
return MockEnvWorker(
worker_id, EnvironmentResponse(EnvironmentCommand.RESET, worker_id, worker_id)
)
class SubprocessEnvManagerTest(unittest.TestCase):

)
params = {"test": "params"}
manager._reset_env(params)
manager.env_workers[0].send.assert_called_with("reset", (params))
manager.env_workers[0].send.assert_called_with(
EnvironmentCommand.RESET, (params)
)
@mock.patch(
"mlagents.trainers.subprocess_env_manager.SubprocessEnvManager.create_worker"

params = {"test": "params"}
res = manager._reset_env(params)
for i, env in enumerate(manager.env_workers):
env.send.assert_called_with("reset", (params))
env.send.assert_called_with(EnvironmentCommand.RESET, (params))
env.recv.assert_called()
# Check that the "last steps" are set to the value returned for each step
self.assertEqual(

)
manager.step_queue = Mock()
manager.step_queue.get_nowait.side_effect = [
EnvironmentResponse("step", 0, StepResponse(0, None, {})),
EnvironmentResponse("step", 1, StepResponse(1, None, {})),
EnvironmentResponse(EnvironmentCommand.STEP, 0, StepResponse(0, None, {})),
EnvironmentResponse(EnvironmentCommand.STEP, 1, StepResponse(1, None, {})),
EmptyQueue(),
]
step_mock = Mock()

res = manager._step()
for i, env in enumerate(manager.env_workers):
if i < 2:
env.send.assert_called_with("step", step_mock)
env.send.assert_called_with(EnvironmentCommand.STEP, step_mock)
manager.step_queue.get_nowait.assert_called()
# Check that the "last steps" are set to the value returned for each step
self.assertEqual(

env_manager.advance()
assert env_manager.policies[brain_name] == mock_policy
assert agent_manager_mock.policy == mock_policy
def simple_env_factory(worker_id, config):
env = SimpleEnvironment(["1D"], use_discrete=True)
return env
def simple_env_factory(worker_id, config):
env = SimpleEnvironment(["1D"], use_discrete=True)
return env
env_manager = SubprocessEnvManager(
simple_env_factory, EngineConfig.default_config(), num_envs
)

val > 0.7 for val in StatsReporter.writers[0].get_last_rewards().values()
)
env_manager.close()
@pytest.mark.parametrize("num_envs", [1, 4])
def test_subprocess_env_raises_errors(num_envs):
def failing_env_factory(worker_id, config):
import time
# Sleep momentarily to allow time for the EnvManager to be waiting for the
# subprocess response. We won't be able to capture failures from the subprocess
# that cause it to close the pipe before we can send the first message.
time.sleep(0.1)
raise UnityEnvironmentException()
env_manager = SubprocessEnvManager(
failing_env_factory, EngineConfig.default_config(), num_envs
)
with pytest.raises(UnityEnvironmentException):
env_manager.reset()
env_manager.close()

14
ml-agents/mlagents/trainers/trainer_controller.py


# Final save Tensorflow model
if global_step != 0 and self.train_model:
self._save_model()
except (KeyboardInterrupt, UnityCommunicationException):
except (
KeyboardInterrupt,
UnityCommunicationException,
UnityEnvironmentException,
) as ex:
pass
if isinstance(ex, KeyboardInterrupt):
pass
else:
# If the environment failed, we want to make sure to raise
# the exception so we exit the process with an return code of 1.
raise ex
if self.train_model:
self._export_graph()

正在加载...
取消
保存