浏览代码

Move advance() logic for environment manager out of trainer_controller (#3234)

This PR moves the AgentManagers from the TrainerController into the env_manager. This way, the TrainerController only needs to create the components (Trainers, AgentManagers) and call advance() on the EnvManager and the Trainers.
/asymm-envs
GitHub 5 年前
当前提交
ca96b293
共有 12 个文件被更改,包括 140 次插入128 次删除
  1. 4
      ml-agents/mlagents/trainers/action_info.py
  2. 11
      ml-agents/mlagents/trainers/agent_processor.py
  3. 7
      ml-agents/mlagents/trainers/brain_conversion_utils.py
  4. 63
      ml-agents/mlagents/trainers/env_manager.py
  5. 4
      ml-agents/mlagents/trainers/simple_env_manager.py
  6. 4
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  7. 2
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  8. 2
      ml-agents/mlagents/trainers/tests/test_policy.py
  9. 46
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py
  10. 54
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  11. 4
      ml-agents/mlagents/trainers/tf_policy.py
  12. 67
      ml-agents/mlagents/trainers/trainer_controller.py

4
ml-agents/mlagents/trainers/action_info.py


value: Any
outputs: ActionInfoOutputs
agent_ids: List[AgentId]
@staticmethod
def empty() -> "ActionInfo":
return ActionInfo([], [], {}, [])

11
ml-agents/mlagents/trainers/agent_processor.py


from mlagents.trainers.policy import Policy
from mlagents.trainers.action_info import ActionInfo, ActionInfoOutputs
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.env_manager import get_global_agent_id
from mlagents.trainers.brain_conversion_utils import get_global_agent_id
T = TypeVar("T")

:param trajectory_queue: Trajectory queue to publish to.
"""
self.trajectory_queues.append(trajectory_queue)
def end_episode(self) -> None:
"""
Ends the episode, terminating the current trajectory and stopping stats collection for that
episode. Used for forceful reset (e.g. in curriculum or generalization training.)
"""
self.experience_buffers.clear()
self.episode_rewards.clear()
self.episode_steps.clear()
class AgentManagerQueue(Generic[T]):

7
ml-agents/mlagents/trainers/brain_conversion_utils.py


return BrainParameters(
name, int(vec_size), cam_res, a_size, [], vector_action_space_type
)
def get_global_agent_id(worker_id: int, agent_id: int) -> str:
"""
Create an agent id that is unique across environment workers using the worker_id.
"""
return f"${worker_id}-{agent_id}"

63
ml-agents/mlagents/trainers/env_manager.py


from abc import ABC, abstractmethod
import logging
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue
def get_global_agent_id(worker_id: int, agent_id: int) -> str:
"""
Create an agent id that is unique across environment workers using the worker_id.
"""
return f"${worker_id}-{agent_id}"
logger = logging.getLogger("mlagents.trainers")
class EnvironmentStep(NamedTuple):

class EnvManager(ABC):
def __init__(self):
self.policies: Dict[AgentGroup, Policy] = {}
self.policies: Dict[AgentGroup, TFPolicy] = {}
self.agent_managers: Dict[AgentGroup, AgentManager] = {}
def set_policy(self, brain_name: AgentGroup, policy: Policy) -> None:
def set_policy(self, brain_name: AgentGroup, policy: TFPolicy) -> None:
if brain_name in self.agent_managers:
self.agent_managers[brain_name].policy = policy
def set_agent_manager(self, brain_name: AgentGroup, manager: AgentManager) -> None:
self.agent_managers[brain_name] = manager
def step(self) -> List[EnvironmentStep]:
def _step(self) -> List[EnvironmentStep]:
def reset(self, config: Dict = None) -> List[EnvironmentStep]:
def _reset_env(self, config: Dict = None) -> List[EnvironmentStep]:
def reset(self, config: Dict = None) -> int:
for manager in self.agent_managers.values():
manager.end_episode()
return self._process_step_infos(self._reset_env(config))
@property
@abstractmethod

@abstractmethod
def close(self):
pass
def advance(self):
# Get new policies if found
for brain_name in self.external_brains:
try:
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
self.set_policy(brain_name, _policy)
except AgentManagerQueue.Empty:
pass
# Step the environment
new_step_infos = self._step()
# Add to AgentProcessor
num_step_infos = self._process_step_infos(new_step_infos)
return num_step_infos
def _process_step_infos(self, step_infos: List[EnvironmentStep]) -> int:
for step_info in step_infos:
for name_behavior_id in step_info.name_behavior_ids:
if name_behavior_id not in self.agent_managers:
logger.warning(
"Agent manager was not created for behavior id {}.".format(
name_behavior_id
)
)
continue
self.agent_managers[name_behavior_id].add_experiences(
step_info.current_all_step_result[name_behavior_id],
step_info.worker_id,
step_info.brain_name_to_action_info.get(
name_behavior_id, ActionInfo.empty()
),
)
return len(step_infos)

4
ml-agents/mlagents/trainers/simple_env_manager.py


self.previous_step: EnvironmentStep = EnvironmentStep.empty(0)
self.previous_all_action_info: Dict[str, ActionInfo] = {}
def step(self) -> List[EnvironmentStep]:
def _step(self) -> List[EnvironmentStep]:
all_action_info = self._take_step(self.previous_step)
self.previous_all_action_info = all_action_info

self.previous_step = step_info
return [step_info]
def reset(
def _reset_env(
self, config: Dict[AgentGroup, float] = None
) -> List[EnvironmentStep]: # type: ignore
if config is not None:

4
ml-agents/mlagents/trainers/subprocess_env_manager.py


env_worker.send("step", env_action_info)
env_worker.waiting = True
def step(self) -> List[EnvironmentStep]:
def _step(self) -> List[EnvironmentStep]:
# Queue steps for any workers which aren't in the "waiting" state.
self._queue_steps()

step_infos = self._postprocess_steps(worker_steps)
return step_infos
def reset(self, config: Optional[Dict] = None) -> List[EnvironmentStep]:
def _reset_env(self, config: Optional[Dict] = None) -> List[EnvironmentStep]:
while any(ew.waiting for ew in self.env_workers):
if not self.step_queue.empty():
step = self.step_queue.get_nowait()

2
ml-agents/mlagents/trainers/tests/test_agent_processor.py


)
processor.publish_trajectory_queue(tqueue)
# This is like the initial state after the env reset
processor.add_experiences(mock_step, 0, ActionInfo([], [], {}, []))
processor.add_experiences(mock_step, 0, ActionInfo.empty())
for _ in range(5):
processor.add_experiences(mock_step, 0, fake_action_info)

2
ml-agents/mlagents/trainers/tests/test_policy.py


dummy_groupspec = AgentGroupSpec([(1,)], "continuous", 1)
no_agent_step = BatchedStepResult.empty(dummy_groupspec)
result = policy.get_action(no_agent_step)
assert result == ActionInfo([], [], {}, [])
assert result == ActionInfo.empty()
def test_take_action_returns_nones_on_missing_values():

46
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


EnvironmentResponse,
StepResponse,
)
from mlagents.trainers.env_manager import EnvironmentStep
from mlagents_envs.base_env import BaseEnv
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfig

mock_env_factory, EngineConfig.default_config(), 1
)
params = {"test": "params"}
manager.reset(params)
manager._reset_env(params)
manager.env_workers[0].send.assert_called_with("reset", (params))
def test_reset_collects_results_from_all_envs(self):

)
params = {"test": "params"}
res = manager.reset(params)
res = manager._reset_env(params)
for i, env in enumerate(manager.env_workers):
env.send.assert_called_with("reset", (params))
env.recv.assert_called()

manager.env_workers[2].previous_step = last_steps[2]
manager.env_workers[2].waiting = True
manager._take_step = Mock(return_value=step_mock)
res = manager.step()
res = manager._step()
for i, env in enumerate(manager.env_workers):
if i < 2:
env.send.assert_called_with("step", step_mock)

manager.env_workers[0].previous_step,
manager.env_workers[1].previous_step,
]
@mock.patch("mlagents.trainers.subprocess_env_manager.SubprocessEnvManager._step")
@mock.patch(
"mlagents.trainers.subprocess_env_manager.SubprocessEnvManager.external_brains",
new_callable=mock.PropertyMock,
)
def test_advance(self, external_brains_mock, step_mock):
brain_name = "testbrain"
action_info_dict = {brain_name: MagicMock()}
SubprocessEnvManager.create_worker = lambda em, worker_id, step_queue, env_factory, engine_c: MockEnvWorker(
worker_id, EnvironmentResponse("step", worker_id, worker_id)
)
env_manager = SubprocessEnvManager(
mock_env_factory, EngineConfig.default_config(), 3
)
external_brains_mock.return_value = [brain_name]
agent_manager_mock = mock.Mock()
env_manager.set_agent_manager(brain_name, agent_manager_mock)
step_info_dict = {brain_name: Mock()}
step_info = EnvironmentStep(step_info_dict, 0, action_info_dict)
step_mock.return_value = [step_info]
env_manager.advance()
# Test add_experiences
env_manager._step.assert_called_once()
agent_manager_mock.add_experiences.assert_called_once_with(
step_info.current_all_step_result[brain_name],
0,
step_info.brain_name_to_action_info[brain_name],
)
# Test policy queue
mock_policy = mock.Mock()
agent_manager_mock.policy_queue.get_nowait.return_value = mock_policy
env_manager.advance()
assert env_manager.policies[brain_name] == mock_policy
assert agent_manager_mock.policy == mock_policy

54
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


from unittest.mock import MagicMock, Mock, patch
from unittest.mock import MagicMock, patch
from mlagents.trainers.subprocess_env_manager import EnvironmentStep
from mlagents.trainers.sampler_class import SamplerManager

return tc, trainer_mock
def test_take_step_adds_experiences_to_trainer_and_trains(
def test_advance_adds_experiences_to_trainer_and_trains(
action_info_dict = {brain_name: MagicMock()}
brain_info_dict = {brain_name: Mock()}
old_step_info = EnvironmentStep(brain_info_dict, 0, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, 0, action_info_dict)
trainer_mock._is_ready_update = MagicMock(return_value=True)
env_mock.step.return_value = [new_step_info]
env_mock.reset.return_value = [old_step_info]
tc.brain_name_to_identifier[brain_name].add(brain_name)

env_mock.step.assert_called_once()
manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
new_step_info.current_all_step_result[brain_name],
0,
new_step_info.brain_name_to_action_info[brain_name],
)
trainer_mock.advance.assert_called_once()
def test_take_step_if_not_training(trainer_controller_with_take_step_mocks):
tc, trainer_mock = trainer_controller_with_take_step_mocks
tc.train_model = False
brain_name = "testbrain"
action_info_dict = {brain_name: MagicMock()}
brain_info_dict = {brain_name: Mock()}
old_step_info = EnvironmentStep(brain_info_dict, 0, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, 0, action_info_dict)
trainer_mock._is_ready_update = MagicMock(return_value=False)
env_mock = MagicMock()
env_mock.step.return_value = [new_step_info]
env_mock.reset.return_value = [old_step_info]
tc.brain_name_to_identifier[brain_name].add(brain_name)
tc.advance(env_mock)
env_mock.reset.assert_not_called()
env_mock.step.assert_called_once()
manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
new_step_info.current_all_step_result[brain_name],
0,
new_step_info.brain_name_to_action_info[brain_name],
)
env_mock.advance.assert_called_once()
trainer_mock.advance.assert_called_once()

4
ml-agents/mlagents/trainers/tf_policy.py


from mlagents.trainers import tensorflow_to_barracuda as tf2bc
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.env_manager import get_global_agent_id
from mlagents.trainers.brain_conversion_utils import get_global_agent_id
from mlagents_envs.base_env import BatchedStepResult

to be passed to add experiences
"""
if batched_step_result.n_agents() == 0:
return ActionInfo([], [], {}, [])
return ActionInfo.empty()
agents_done = [
agent

67
ml-agents/mlagents/trainers/trainer_controller.py


import sys
import json
import logging
from typing import Dict, List, Optional, Set
from typing import Dict, Optional, Set
from mlagents.trainers.env_manager import EnvManager, EnvironmentStep
from mlagents.trainers.env_manager import EnvManager
from mlagents_envs.exception import (
UnityEnvironmentException,
UnityCommunicationException,

from mlagents.trainers.trainer import Trainer
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue
from mlagents.trainers.agent_processor import AgentManager
class TrainerController(object):

"""
self.trainers: Dict[str, Trainer] = {}
self.brain_name_to_identifier: Dict[str, Set] = defaultdict(set)
self.managers: Dict[str, AgentManager] = {}
self.trainer_factory = trainer_factory
self.model_path = model_path
self.summaries_dir = summaries_dir

"permissions are set correctly.".format(model_path)
)
def _reset_env(self, env: EnvManager) -> List[EnvironmentStep]:
def _reset_env(self, env: EnvManager) -> None:
"""Resets the environment.
Returns:

self.meta_curriculum.get_config() if self.meta_curriculum else {}
)
sampled_reset_param.update(new_meta_curriculum_config)
return env.reset(config=sampled_reset_param)
env.reset(config=sampled_reset_param)
def _should_save_model(self, global_step: int) -> bool:
return (

policy = trainer.create_policy(env_manager.external_brains[name_behavior_id])
trainer.add_policy(name_behavior_id, policy)
env_manager.set_policy(name_behavior_id, policy)
self.brain_name_to_identifier[brain_name].add(name_behavior_id)
agent_manager = AgentManager(
policy,
name_behavior_id,

env_manager.set_agent_manager(name_behavior_id, agent_manager)
env_manager.set_policy(name_behavior_id, policy)
self.brain_name_to_identifier[brain_name].add(name_behavior_id)
self.managers[name_behavior_id] = agent_manager
def _create_trainers_and_managers(
self, env_manager: EnvManager, behavior_ids: Set[str]

global_step = 0
last_brain_behavior_ids: Set[str] = set()
try:
initial_step = self._reset_env(env_manager)
self._process_step_infos(initial_step)
self._reset_env(env_manager)
while self._not_done_training():
external_brain_behavior_ids = set(env_manager.external_brains.keys())
new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids

def end_trainer_episodes(
self, env: EnvManager, lessons_incremented: Dict[str, bool]
) -> None:
reset_step = self._reset_env(env)
self._process_step_infos(reset_step)
self._reset_env(env)
# Reward buffers reset takes place only for curriculum learning
# else no reset.
for trainer in self.trainers.values():

if meta_curriculum_reset or generalization_reset:
self.end_trainer_episodes(env, lessons_incremented)
def _get_and_process_experiences(self, env: EnvManager) -> int:
with hierarchical_timer("env_step"):
# Get new policies if found
for brain_name in self.trainers.keys():
for name_behavior_id in self.brain_name_to_identifier[brain_name]:
try:
_policy = self.managers[
name_behavior_id
].policy_queue.get_nowait()
env.set_policy(name_behavior_id, _policy)
except AgentManagerQueue.Empty:
pass
# Step the environment
new_step_infos = env.step()
# Add to AgentProcessor
num_step_infos = self._process_step_infos(new_step_infos)
return num_step_infos
def _process_step_infos(self, step_infos: List[EnvironmentStep]) -> int:
for step_info in step_infos:
for name_behavior_id in step_info.name_behavior_ids:
if name_behavior_id not in self.managers:
self.logger.warning(
"Agent manager was not created for behavior id {}.".format(
name_behavior_id
)
)
continue
self.managers[name_behavior_id].add_experiences(
step_info.current_all_step_result[name_behavior_id],
step_info.worker_id,
step_info.brain_name_to_action_info.get(
name_behavior_id, ActionInfo([], [], {}, [])
),
)
return len(step_infos)
num_steps = self._get_and_process_experiences(env)
with hierarchical_timer("env_step"):
num_steps = env.advance()
# Report current lesson
if self.meta_curriculum:

正在加载...
取消
保存