浏览代码

Merge branch 'master' into fix-conflict-base-env

/fix-conflict-base-env
Andrew Cohen 4 年前
当前提交
d624b54b
共有 8 个文件被更改,包括 95 次插入18 次删除
  1. 1
      com.unity.ml-agents/CHANGELOG.md
  2. 4
      com.unity.ml-agents/Runtime/Sensors/ObservationWriter.cs
  3. 10
      ml-agents-envs/mlagents_envs/base_env.py
  4. 2
      ml-agents-envs/mlagents_envs/environment.py
  5. 4
      ml-agents-envs/mlagents_envs/rpc_utils.py
  6. 52
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  7. 9
      ml-agents/mlagents/trainers/tests/simple_test_envs.py
  8. 31
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py

1
com.unity.ml-agents/CHANGELOG.md


### Minor Changes
#### com.unity.ml-agents / com.unity.ml-agents.extensions (C#)
#### ml-agents / ml-agents-envs / gym-unity (Python)
- `ActionSpec.validate_action()` now enforces that `UnityEnvironment.set_action_for_agent()` receives a 1D `np.array`.
### Bug Fixes
#### com.unity.ml-agents (C#)

4
com.unity.ml-agents/Runtime/Sensors/ObservationWriter.cs


{
m_TensorShape = new TensorShape(m_Batch, shape[0]);
}
else if (shape.Length == 2)
{
m_TensorShape = new TensorShape(new int[] { m_Batch, 1, shape[0], shape[1] });
}
else
{
m_TensorShape = new TensorShape(m_Batch, shape[0], shape[1], shape[2]);

10
ml-agents-envs/mlagents_envs/base_env.py


return ActionTuple(continuous=_continuous, discrete=_discrete)
def _validate_action(
self, actions: ActionTuple, n_agents: int, name: str
self, actions: ActionTuple, n_agents: Optional[int], name: str
_expected_shape = (n_agents, self.continuous_size)
_expected_shape = (
(n_agents, self.continuous_size) if n_agents else (self.continuous_size,)
)
if actions.continuous.shape != _expected_shape:
raise UnityActionException(
f"The behavior {name} needs a continuous input of dimension "

_expected_shape = (n_agents, self.discrete_size)
_expected_shape = (
(n_agents, self.discrete_size) if n_agents else (self.discrete_size,)
)
if actions.discrete.shape != _expected_shape:
raise UnityActionException(
f"The behavior {name} needs a discrete input of dimension "

2
ml-agents-envs/mlagents_envs/environment.py


return
action_spec = self._env_specs[behavior_name].action_spec
num_agents = len(self._env_state[behavior_name][0])
action = action_spec._validate_action(action, num_agents, behavior_name)
action = action_spec._validate_action(action, None, behavior_name)
if behavior_name not in self._env_actions:
self._env_actions[behavior_name] = action_spec.empty_action(num_agents)
try:

4
ml-agents-envs/mlagents_envs/rpc_utils.py


], # pylint: disable=unsubscriptable-object
) -> np.ndarray:
if len(agent_info_list) == 0:
return np.zeros((0, shape[0]), dtype=np.float32)
return np.zeros((0,) + shape, dtype=np.float32)
np_obs = np.array(
[
agent_obs.observations[obs_index].float_data.data

)
).reshape((len(agent_info_list),) + shape)
_raise_on_nan_and_inf(np_obs, "observations")
return np_obs

52
ml-agents/mlagents/trainers/subprocess_env_manager.py


from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set
import cloudpickle
import enum
import time
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.exception import (

logger = logging_util.get_logger(__name__)
WORKER_SHUTDOWN_TIMEOUT_S = 10
class EnvironmentCommand(enum.Enum):

RESET = 4
CLOSE = 5
ENV_EXITED = 6
CLOSED = 7
class EnvironmentRequest(NamedTuple):

self.previous_step: EnvironmentStep = EnvironmentStep.empty(worker_id)
self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False
self.closed = False
def send(self, cmd: EnvironmentCommand, payload: Any = None) -> None:
try:

except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: recv failed.")
def close(self):
def request_close(self):
try:
self.conn.send(EnvironmentRequest(EnvironmentCommand.CLOSE))
except (BrokenPipeError, EOFError):

pass
logger.debug(f"UnityEnvWorker {self.worker_id} joining process.")
self.process.join()
def worker(

EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex)
)
_send_response(EnvironmentCommand.ENV_EXITED, ex)
except Exception as ex:
logger.error(
f"UnityEnvironment worker {worker_id}: environment raised an unexpected exception."
)
step_queue.put(
EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex)
)
_send_response(EnvironmentCommand.ENV_EXITED, ex)
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread()
# See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.cancel_join_thread for
# more info.
step_queue.cancel_join_thread()
step_queue.close()
parent_conn.close()
step_queue.put(EnvironmentResponse(EnvironmentCommand.CLOSED, worker_id, None))
step_queue.close()
class SubprocessEnvManager(EnvManager):

super().__init__()
self.env_workers: List[UnityEnvWorker] = []
self.step_queue: Queue = Queue()
self.workers_alive = 0
for worker_idx in range(n_env):
self.env_workers.append(
self.create_worker(

self.workers_alive += 1
@staticmethod
def create_worker(

def close(self) -> None:
logger.debug("SubprocessEnvManager closing.")
for env_worker in self.env_workers:
env_worker.request_close()
# Pull messages out of the queue until every worker has CLOSED or we time out.
deadline = time.time() + WORKER_SHUTDOWN_TIMEOUT_S
while self.workers_alive > 0 and time.time() < deadline:
try:
step: EnvironmentResponse = self.step_queue.get_nowait()
env_worker = self.env_workers[step.worker_id]
if step.cmd == EnvironmentCommand.CLOSED and not env_worker.closed:
env_worker.closed = True
self.workers_alive -= 1
# Discard all other messages.
except EmptyQueueException:
pass
# Sanity check to kill zombie workers and report an issue if they occur.
if self.workers_alive > 0:
logger.error("SubprocessEnvManager had workers that didn't signal shutdown")
for env_worker in self.env_workers:
if not env_worker.closed and env_worker.process.is_alive():
env_worker.process.terminate()
logger.error(
"A SubprocessEnvManager worker did not shut down correctly so it was forcefully terminated."
)
for env_worker in self.env_workers:
env_worker.close()
def _postprocess_steps(
self, env_steps: List[EnvironmentResponse]

9
ml-agents/mlagents/trainers/tests/simple_test_envs.py


np.array([], dtype=np.int32),
)
self.step()
class UnexpectedExceptionEnvironment(SimpleEnvironment):
def __init__(self, brain_names, use_discrete, to_raise):
super().__init__(brain_names, use_discrete)
self.to_raise = to_raise
def step(self) -> None:
raise self.to_raise()

31
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


from mlagents_envs.side_channel.engine_configuration_channel import EngineConfig
from mlagents_envs.side_channel.stats_side_channel import StatsAggregationMethod
from mlagents_envs.exception import UnityEnvironmentException
from mlagents.trainers.tests.simple_test_envs import SimpleEnvironment
from mlagents.trainers.tests.simple_test_envs import (
SimpleEnvironment,
UnexpectedExceptionEnvironment,
)
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests.check_env_trains import (

assert all(
val > 0.7 for val in StatsReporter.writers[0].get_last_rewards().values()
)
env_manager.close()
class CustomTestOnlyException(Exception):
pass
@pytest.mark.parametrize("num_envs", [1, 4])
def test_subprocess_failing_step(num_envs):
def failing_step_env_factory(_worker_id, _config):
env = UnexpectedExceptionEnvironment(
["1D"], use_discrete=True, to_raise=CustomTestOnlyException
)
return env
env_manager = SubprocessEnvManager(
failing_step_env_factory, EngineConfig.default_config()
)
# Expect the exception raised to be routed back up to the top level.
with pytest.raises(CustomTestOnlyException):
check_environment_trains(
failing_step_env_factory(0, []),
{"1D": ppo_dummy_config()},
env_manager=env_manager,
success_threshold=None,
)
env_manager.close()

正在加载...
取消
保存