浏览代码

Add Trajectory/Policy Queues, move Trainer logic to advance() (#3113)

/asymm-envs
GitHub 5 年前
当前提交
bec2e8f0
共有 15 个文件被更改,包括 486 次插入304 次删除
  1. 71
      config/sac_trainer_config.yaml
  2. 72
      config/trainer_config.yaml
  3. 10
      docs/Migrating.md
  4. 83
      ml-agents/mlagents/trainers/agent_processor.py
  5. 9
      ml-agents/mlagents/trainers/ppo/trainer.py
  6. 14
      ml-agents/mlagents/trainers/rl_trainer.py
  7. 9
      ml-agents/mlagents/trainers/sac/trainer.py
  8. 13
      ml-agents/mlagents/trainers/stats.py
  9. 39
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  10. 36
      ml-agents/mlagents/trainers/tests/test_ppo.py
  11. 39
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  12. 10
      ml-agents/mlagents/trainers/tests/test_sac.py
  13. 28
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  14. 198
      ml-agents/mlagents/trainers/trainer.py
  15. 159
      ml-agents/mlagents/trainers/trainer_controller.py

71
config/sac_trainer_config.yaml


init_entcoef: 1.0
learning_rate: 3.0e-4
learning_rate_schedule: constant
max_steps: 5.0e4
max_steps: 5.0e5
memory_size: 256
normalize: false
num_update: 1

sequence_length: 64
summary_freq: 1000
summary_freq: 10000
tau: 0.005
use_recurrent: false
vis_encode_type: simple

normalize: false
batch_size: 256
buffer_size: 500000
max_steps: 1.0e5
max_steps: 2.0e6
max_steps: 5.0e5
max_steps: 2.0e7
summary_freq: 1000
summary_freq: 20000
max_steps: 5.0e4
max_steps: 1.5e7
summary_freq: 2000
summary_freq: 60000
max_steps: 1.0e6
max_steps: 3e7
summary_freq: 2000
summary_freq: 20000
time_horizon: 128
init_entcoef: 0.1
num_layers: 2

max_steps: 1.0e6
max_steps: 3e7
summary_freq: 2000
summary_freq: 20000
time_horizon: 128
num_layers: 2
init_entcoef: 0.1

max_steps: 5.0e5
max_steps: 5.0e6
summary_freq: 2000
summary_freq: 20000
time_horizon: 128
init_entcoef: 0.1
num_layers: 2

max_steps: 5.0e5
max_steps: 5.0e6
summary_freq: 2000
summary_freq: 20000
time_horizon: 128
init_entcoef: 0.1
num_layers: 2

summary_freq: 2000
summary_freq: 30000
time_horizon: 128
batch_size: 128
buffer_init_steps: 10000

init_entcoef: 0.01
max_steps: 5.0e5
max_steps: 1.0e7
sequence_length: 16
tau: 0.01
use_recurrent: false

hidden_units: 256
buffer_init_steps: 1000
num_layers: 1
max_steps: 5.0e5
max_steps: 1.0e7
buffer_size: 500000
init_entcoef: 0.01
tau: 0.01

normalize: true
batch_size: 64
buffer_size: 12000
summary_freq: 1000
summary_freq: 12000
time_horizon: 1000
hidden_units: 64
init_entcoef: 0.5

batch_size: 256
summary_freq: 1000
summary_freq: 12000
max_steps: 2e5
max_steps: 4e6
CrawlerStatic:
normalize: true

buffer_size: 500000
buffer_init_steps: 2000
max_steps: 5e5
summary_freq: 3000
max_steps: 5e6
summary_freq: 30000
init_entcoef: 1.0
num_layers: 3
hidden_units: 512

time_horizon: 1000
batch_size: 256
buffer_size: 500000
summary_freq: 3000
summary_freq: 30000
max_steps: 1e6
max_steps: 1e7
hidden_units: 512
reward_signals:
extrinsic:

time_horizon: 1000
batch_size: 256
buffer_size: 500000
max_steps: 2e6
summary_freq: 3000
max_steps: 2e7
summary_freq: 30000
num_layers: 4
train_interval: 2
hidden_units: 512

time_horizon: 1000
batch_size: 128
buffer_size: 500000
max_steps: 2e5
summary_freq: 3000
max_steps: 2e7
summary_freq: 60000
Hallway:
sequence_length: 32

init_entcoef: 0.1
max_steps: 5.0e5
max_steps: 1.0e7
summary_freq: 1000
time_horizon: 64
use_recurrent: true

memory_size: 256
gamma: 0.99
batch_size: 64
max_steps: 5.0e5
summary_freq: 1000
max_steps: 1.0e7
time_horizon: 64
use_recurrent: true

gamma: 0.99
buffer_size: 1024
batch_size: 64
max_steps: 5.0e5
summary_freq: 1000
max_steps: 3.0e6
summary_freq: 60000
time_horizon: 64
GridWorld:

init_entcoef: 0.5
buffer_init_steps: 1000
buffer_size: 50000
max_steps: 50000
summary_freq: 2000
max_steps: 500000
summary_freq: 20000
time_horizon: 5
reward_signals:
extrinsic:

72
config/trainer_config.yaml


lambd: 0.95
learning_rate: 3.0e-4
learning_rate_schedule: linear
max_steps: 5.0e4
max_steps: 5.0e5
memory_size: 256
normalize: false
num_epoch: 3

summary_freq: 1000
summary_freq: 10000
use_recurrent: false
vis_encode_type: simple
reward_signals:

beta: 5.0e-3
batch_size: 1024
buffer_size: 10240
max_steps: 1.0e5
max_steps: 2.0e6
max_steps: 1.0e6
max_steps: 2.0e7
max_steps: 5.0e4
max_steps: 1.5e7
summary_freq: 2000
summary_freq: 60000
max_steps: 1.0e6
max_steps: 3e7
summary_freq: 2000
summary_freq: 20000
max_steps: 1.0e6
max_steps: 3e7
summary_freq: 2000
summary_freq: 20000
max_steps: 5.0e5
max_steps: 5.0e6
learning_rate: 1e-3
batch_size: 128
num_epoch: 3

summary_freq: 2000
summary_freq: 20000
max_steps: 5.0e5
max_steps: 5.0e6
learning_rate: 1e-3
batch_size: 320
num_epoch: 3

summary_freq: 2000
summary_freq: 20000
summary_freq: 2000
summary_freq: 30000
time_horizon: 128
batch_size: 128
buffer_size: 2048

max_steps: 5.0e5
max_steps: 1.0e7
num_epoch: 3
reward_signals:
extrinsic:

hidden_units: 256
num_layers: 1
beta: 1.0e-2
max_steps: 5.0e5
max_steps: 1.0e7
num_epoch: 3
reward_signals:
extrinsic:

normalize: true
batch_size: 64
buffer_size: 12000
summary_freq: 1000
summary_freq: 12000
time_horizon: 1000
lambd: 0.99
beta: 0.001

batch_size: 1200
buffer_size: 12000
summary_freq: 1000
summary_freq: 12000
time_horizon: 1000
max_steps: 5.0e5
beta: 0.001

Tennis:
normalize: true
max_steps: 2e5
max_steps: 4e6
CrawlerStatic:
normalize: true

buffer_size: 20240
max_steps: 1e6
summary_freq: 3000
max_steps: 1e7
summary_freq: 30000
num_layers: 3
hidden_units: 512
reward_signals:

time_horizon: 1000
batch_size: 2024
buffer_size: 20240
max_steps: 1e6
summary_freq: 3000
max_steps: 1e7
summary_freq: 30000
num_layers: 3
hidden_units: 512
reward_signals:

time_horizon: 1000
batch_size: 2048
buffer_size: 20480
max_steps: 2e6
summary_freq: 3000
max_steps: 2e7
summary_freq: 30000
num_layers: 3
hidden_units: 512
reward_signals:

time_horizon: 1000
batch_size: 2024
buffer_size: 20240
max_steps: 1e6
summary_freq: 3000
max_steps: 2e7
summary_freq: 60000
reward_signals:
extrinsic:
strength: 1.0

num_epoch: 3
buffer_size: 1024
batch_size: 128
max_steps: 5.0e5
summary_freq: 1000
max_steps: 1.0e7
summary_freq: 10000
time_horizon: 64
VisualHallway:

num_epoch: 3
buffer_size: 1024
batch_size: 64
max_steps: 5.0e5
summary_freq: 1000
max_steps: 1.0e7
summary_freq: 10000
time_horizon: 64
VisualPushBlock:

num_epoch: 3
buffer_size: 1024
batch_size: 64
max_steps: 5.0e5
summary_freq: 1000
max_steps: 3.0e6
summary_freq: 60000
time_horizon: 64
GridWorld:

hidden_units: 256
beta: 5.0e-3
buffer_size: 256
max_steps: 50000
summary_freq: 2000
max_steps: 500000
summary_freq: 20000
time_horizon: 5
reward_signals:
extrinsic:

10
docs/Migrating.md


# Migrating
## Migrating from 0.12 to latest
## Migrating from 0.13 to latest
### Important changes
* Trainer steps are now counted per-Agent, not per-environment as in previous versions. For instance, if you have 10 Agents in the scene, 20 environment steps now corresponds to 200 steps as printed in the terminal and in Tensorboard.
### Steps to Migrate
* Multiply `max_steps` and `summary_steps` in your `trainer_config.yaml` by the number of Agents in the scene.
## Migrating from ML-Agents toolkit v0.12.0 to v0.13.0
### Important changes
* The low level Python API has changed. You can look at the document [Low Level Python API documentation](Python-API.md) for more information. This should only affect you if you're writing a custom trainer; if you use `mlagents-learn` for training, this should be a transparent change.

83
ml-agents/mlagents/trainers/agent_processor.py


import sys
from typing import List, Dict
from collections import defaultdict, Counter
from typing import List, Dict, Deque, TypeVar, Generic
from collections import defaultdict, Counter, deque
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.policy import Policy
T = TypeVar("T")
class AgentProcessor:
"""

def __init__(
self,
trainer: Trainer,
policy: TFPolicy,
behavior_id: str,
stats_reporter: StatsReporter,

self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)
self.stats_reporter = stats_reporter
self.trainer = trainer
self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
self.behavior_id = behavior_id
def add_experiences(

next_obs=next_obs,
behavior_id=self.behavior_id,
)
# This will eventually be replaced with a queue
self.trainer.process_trajectory(trajectory)
for traj_queue in self.trajectory_queues:
traj_queue.put(trajectory)
self.experience_buffers[agent_id] = []
if next_info.local_done[next_idx]:
self.stats_reporter.add_stat(

self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
def publish_trajectory_queue(
self, trajectory_queue: "AgentManagerQueue[Trajectory]"
) -> None:
"""
Adds a trajectory queue to the list of queues to publish to when this AgentProcessor
assembles a Trajectory
:param trajectory_queue: Trajectory queue to publish to.
"""
self.trajectory_queues.append(trajectory_queue)
class AgentManagerQueue(Generic[T]):
"""
Queue used by the AgentManager. Note that we make our own class here because in most implementations
deque is sufficient and faster. However, if we want to switch to multiprocessing, we'll need to change
out this implementation.
"""
class Empty(Exception):
"""
Exception for when the queue is empty.
"""
pass
def __init__(self, behavior_id: str):
"""
Initializes an AgentManagerQueue. Note that we can give it a behavior_id so that it can be identified
separately from an AgentManager.
"""
self.queue: Deque[T] = deque()
self.behavior_id = behavior_id
def empty(self) -> bool:
return len(self.queue) == 0
def get_nowait(self) -> T:
try:
return self.queue.popleft()
except IndexError:
raise self.Empty("The AgentManagerQueue is empty.")
def put(self, item: T) -> None:
self.queue.append(item)
class AgentManager(AgentProcessor):
"""
An AgentManager is an AgentProcessor that also holds a single trajectory and policy queue.
Note: this leaves room for adding AgentProcessors that publish multiple trajectory queues.
"""
def __init__(
self,
policy: TFPolicy,
behavior_id: str,
stats_reporter: StatsReporter,
max_trajectory_length: int = sys.maxsize,
):
super().__init__(policy, behavior_id, stats_reporter, max_trajectory_length)
self.trajectory_queue: AgentManagerQueue[Trajectory] = AgentManagerQueue(
self.behavior_id
)
self.policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue(
self.behavior_id
)
self.publish_trajectory_queue(self.trajectory_queue)

9
ml-agents/mlagents/trainers/ppo/trainer.py


"model_path",
"reward_signals",
]
self.check_param_keys()
self._check_param_keys()
def process_trajectory(self, trajectory: Trajectory) -> None:
def _process_trajectory(self, trajectory: Trajectory) -> None:
super()._process_trajectory(trajectory)
agent_id = trajectory.agent_id # All the agents should have the same ID
# Add to episode_steps

agent_id, self.get_policy(trajectory.behavior_id)
)
def is_ready_update(self):
def _is_ready_update(self):
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to whether or not update_model() can be run

def update_policy(self):
def _update_policy(self):
"""
Uses demonstration_buffer to update the policy.
The reward signal generators must be updated in this method at their own pace.

14
ml-agents/mlagents/trainers/rl_trainer.py


RewardSignalResults = Dict[str, RewardSignalResult]
class RLTrainer(Trainer):
class RLTrainer(Trainer): # pylint: disable=abstract-method
Contains methods for adding BrainInfos to the Buffer.
"""
def __init__(self, *args, **kwargs):

def clear_update_buffer(self) -> None:
"""
Clear the buffers that have been built up during inference. If
we're not training, this should be called instead of update_policy.
Clear the buffers that have been built up during inference.
def advance(self) -> None:
"""
Steps the trainer, taking in trajectories and updates if ready
"""
super().advance()
if not self.is_training:
self.clear_update_buffer()

9
ml-agents/mlagents/trainers/sac/trainer.py


"vis_encode_type",
]
self.check_param_keys()
self._check_param_keys()
self.load = load
self.seed = seed
self.policy: SACPolicy = None # type: ignore

)
)
def process_trajectory(self, trajectory: Trajectory) -> None:
def _process_trajectory(self, trajectory: Trajectory) -> None:
super()._process_trajectory(trajectory)
last_step = trajectory.steps[-1]
agent_id = trajectory.agent_id # All the agents should have the same ID

agent_id, self.get_policy(trajectory.behavior_id)
)
def is_ready_update(self) -> bool:
def _is_ready_update(self) -> bool:
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to whether or not update_model() can be run

)
@timed
def update_policy(self) -> None:
def _update_policy(self) -> None:
"""
If train_interval is met, update the SAC policy given the current reward signals.
If reward_signal_train_interval is met, update the reward signals from the buffer.

13
ml-agents/mlagents/trainers/stats.py


def add_stat(self, key: str, value: float) -> None:
"""
Add a float value stat to the StatsReporter.
:param category: The highest categorization of the statistic, e.g. behavior name.
def set_stat(self, key: str, value: float) -> None:
"""
Sets a stat value to a float. This is for values that we don't want to average, and just
want the latest.
:param key: The type of statistic, e.g. Environment/Reward.
:param value: the value of the statistic.
"""
StatsReporter.stats_dict[self.category][key] = [value]
:param category: The category which to write out the stats.
:param step: Training step which to write these stats as.
"""
values: Dict[str, StatsSummary] = {}

def write_text(self, text: str, step: int) -> None:
"""
Write out some text.
:param category: The highest categorization of the statistic, e.g. behavior name.
:param text: The text to write out.
:param step: Training step which to write these stats as.
"""

def get_stats_summaries(self, key: str) -> StatsSummary:
"""
Get the mean, std, and count of a particular statistic, since last write.
:param category: The highest categorization of the statistic, e.g. behavior name.
:param key: The type of statistic, e.g. Environment/Reward.
:returns: A StatsSummary NamedTuple containing (mean, std, count).
"""

39
ml-agents/mlagents/trainers/tests/test_agent_processor.py


import pytest
import mlagents.trainers.tests.mock_brain as mb
import numpy as np
from mlagents.trainers.agent_processor import AgentProcessor
from mlagents.trainers.agent_processor import (
AgentProcessor,
AgentManager,
AgentManagerQueue,
)
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.stats import StatsReporter

@pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"])
def test_agentprocessor(num_vis_obs):
policy = create_mock_policy()
trainer = mock.Mock()
tqueue = mock.Mock()
trainer,
policy,
name_behavior_id,
max_trajectory_length=5,

num_vector_acts=2,
num_vis_observations=num_vis_obs,
)
processor.publish_trajectory_queue(tqueue)
assert len(trainer.process_trajectory.call_args_list) == 2
assert len(tqueue.put.call_args_list) == 2
trajectory = trainer.process_trajectory.call_args_list[0][0][0]
trajectory = tqueue.put.call_args_list[0][0][0]
def test_agent_manager():
policy = create_mock_policy()
name_behavior_id = "test_brain_name"
manager = AgentManager(
policy,
name_behavior_id,
max_trajectory_length=5,
stats_reporter=StatsReporter("testcat"),
)
assert len(manager.trajectory_queues) == 1
assert isinstance(manager.trajectory_queues[0], AgentManagerQueue)
def test_agent_manager_queue():
queue = AgentManagerQueue(behavior_id="testbehavior")
trajectory = mock.Mock(spec=Trajectory)
assert queue.empty()
queue.put(trajectory)
assert not queue.empty()
queue_traj = queue.get_nowait()
assert isinstance(queue_traj, Trajectory)
assert queue.empty()

36
ml-agents/mlagents/trainers/tests/test_ppo.py


from mlagents.trainers.models import EncoderType, LearningModel
from mlagents.trainers.trainer import UnityTrainerException
from mlagents.trainers.brain import BrainParameters, CameraResolution
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.mock_communicator import MockCommunicator
from mlagents.trainers.tests import mock_brain as mb

trainer = PPOTrainer(
brain_params.brain_name, 0, trainer_params, True, False, 0, "0", False
)
policy_mock = mock.Mock()
policy_mock = mock.Mock(spec=PPOPolicy)
) # 10 hacked becausee this function is no longer called through trainer
) # 10 hacked because this function is no longer called through trainer
trainer.policy = policy_mock
trainer.add_policy("testbehavior", policy_mock)
trainer.increment_step(5)
print(trainer.policy.increment_step(5))
trainer._increment_step(5, "testbehavior")
policy_mock.increment_step.assert_called_with(5)
assert trainer.step == step_count

buffer["curiosity_value_estimates"] = buffer["rewards"]
trainer.update_buffer = buffer
trainer.update_policy()
trainer._update_policy()
trainer.update_policy()
trainer._update_policy()
trainer.update_policy()
trainer._update_policy()
def test_process_trajectory(dummy_config):

)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = PPOTrainer(
brain_params.brain_name, 0, dummy_config, True, False, 0, "0", False
)
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False)
policy = trainer.create_policy(brain_params)
trainer.add_policy(brain_params.brain_name, policy)
trajectory_queue = AgentManagerQueue("testbrain")
trainer.subscribe_trajectory_queue(trajectory_queue)
time_horizon = 15
trajectory = make_fake_trajectory(
length=time_horizon,

action_space=2,
)
policy = trainer.create_policy(brain_params)
trainer.add_policy(brain_params.brain_name, policy)
trainer.process_trajectory(trajectory)
trajectory_queue.put(trajectory)
trainer.advance()
# Check that trainer put trajectory in update buffer
assert trainer.update_buffer.num_experiences == 15

num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
trajectory_queue.put(trajectory)
trainer.advance()
# Check that the stats are reset as episode is finished
for reward in trainer.collected_rewards.values():

policy = trainer.create_policy(brain_params)
trainer.add_policy(brain_params.brain_name, policy)
trainer.process_trajectory(trajectory)
trainer._process_trajectory(trajectory)
# Check that the running mean and variance is correct
steps, mean, variance = trainer.policy.sess.run(

num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
trainer._process_trajectory(trajectory)
# Check that the running mean and variance is correct
steps, mean, variance = trainer.policy.sess.run(

39
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


import unittest.mock as mock
from unittest import mock
import numpy as np
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer

"""
summary_path: "test/"
summary_freq: 1000
reward_signals:
extrinsic:
strength: 1.0

return mock_brain
def create_rl_trainer():
mock_brainparams = create_mock_brain()
trainer = RLTrainer(mock_brainparams.brain_name, dummy_config(), True, 0)
return trainer
# Add concrete implementations of abstract methods
class FakeTrainer(RLTrainer):
def get_policy(self, name_behavior_id):
return mock.Mock()
def _is_ready_update(self):
return True
def _update_policy(self):
pass
def add_policy(self):
pass
def create_policy(self):
return mock.Mock()
def create_mock_all_brain_info(brain_info):
return {"MockBrain": brain_info}
def _process_trajectory(self, trajectory):
super()._process_trajectory(trajectory)
def create_mock_policy():
mock_policy = mock.Mock()
mock_policy.reward_signals = {}
mock_policy.retrieve_memories.return_value = np.zeros((1, 1), dtype=np.float32)
mock_policy.retrieve_previous_action.return_value = np.zeros(
(1, 1), dtype=np.float32
)
return mock_policy
def create_rl_trainer():
mock_brainparams = create_mock_brain()
trainer = FakeTrainer(mock_brainparams, dummy_config(), True, 0)
return trainer
def test_rl_trainer():

10
ml-agents/mlagents/trainers/tests/test_sac.py


from mlagents.trainers.sac.models import SACModel
from mlagents.trainers.sac.policy import SACPolicy
from mlagents.trainers.sac.trainer import SACTrainer
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.mock_brain import make_brain_parameters
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory

policy = trainer.create_policy(brain_params)
trainer.add_policy(brain_params.brain_name, policy)
trajectory_queue = AgentManagerQueue("testbrain")
trainer.subscribe_trajectory_queue(trajectory_queue)
trainer.process_trajectory(trajectory)
trajectory_queue.put(trajectory)
trainer.advance()
# Check that trainer put trajectory in update buffer
assert trainer.update_buffer.num_experiences == 15

num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
trajectory_queue.put(trajectory)
trainer.advance()
# Check that the stats are reset as episode is finished
for reward in trainer.collected_rewards.values():

28
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


import pytest
from mlagents.tf_utils import tf
from mlagents.trainers.trainer_controller import TrainerController, AgentManager
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.subprocess_env_manager import EnvironmentStep
from mlagents.trainers.sampler_class import SamplerManager

trainer_mock = MagicMock()
trainer_mock.get_step = 0
trainer_mock.get_max_steps = 5
trainer_mock.should_still_train = True
trainer_mock.parameters = {"some": "parameter"}
trainer_mock.write_tensorboard_text = MagicMock()

def take_step_sideeffect(env):
tc.trainers["testbrain"].get_step += 1
if (
not tc.trainers["testbrain"].get_step
<= tc.trainers["testbrain"].get_max_steps
):
tc.trainers["testbrain"].should_still_train = False
if tc.trainers["testbrain"].get_step > 10:
raise KeyboardInterrupt
return 1

trainer_mock.parameters = {"some": "parameter"}
trainer_mock.write_tensorboard_text = MagicMock()
processor_mock = MagicMock()
tc.managers = {"testbrain": AgentManager(processor=processor_mock)}
tc.managers = {"testbrain": MagicMock()}
return tc, trainer_mock

brain_info_dict = {brain_name: Mock()}
old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
trainer_mock.is_ready_update = MagicMock(return_value=True)
trainer_mock._is_ready_update = MagicMock(return_value=True)
env_mock = MagicMock()
env_mock.step.return_value = [new_step_info]

env_mock.reset.assert_not_called()
env_mock.step.assert_called_once()
processor_mock = tc.managers[brain_name].processor
processor_mock.add_experiences.assert_called_once_with(
manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
trainer_mock.update_policy.assert_called_once()
trainer_mock.increment_step.assert_called_once()
trainer_mock.advance.assert_called_once()
def test_take_step_if_not_training(trainer_controller_with_take_step_mocks):

old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
trainer_mock.is_ready_update = MagicMock(return_value=False)
trainer_mock._is_ready_update = MagicMock(return_value=False)
env_mock = MagicMock()
env_mock.step.return_value = [new_step_info]

tc.advance(env_mock)
env_mock.reset.assert_not_called()
env_mock.step.assert_called_once()
processor_mock = tc.managers[brain_name].processor
processor_mock.add_experiences.assert_called_once_with(
manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
new_step_info.previous_all_brain_info[brain_name],
new_step_info.current_all_brain_info[brain_name],
new_step_info.brain_name_to_action_info[brain_name].outputs,

198
ml-agents/mlagents/trainers/trainer.py


# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Deque, Any
import time
import abc
from mlagents.tf_utils import tf

from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.policy import Policy
from mlagents_envs.timers import hierarchical_timer
LOGGER = logging.getLogger("mlagents.trainers")

pass
class Trainer(object):
class Trainer(abc.ABC):
"""This class is the base class for the mlagents_envs.trainers"""
def __init__(

self.cumulative_returns_since_policy_update: List[float] = []
self.is_training = training
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
self.policy_queues: List[AgentManagerQueue[Policy]] = []
self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
self.training_start_time = time.time()
self.summary_freq = self.trainer_parameters["summary_freq"]
self.next_update_step = self.summary_freq
def check_param_keys(self):
def _check_param_keys(self):
for k in self.param_keys:
if k not in self.trainer_parameters:
raise UnityTrainerException(

LOGGER.info("Could not write text summary for Tensorboard.")
pass
def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
"""
Takes a parameter dictionary and converts it to a human-readable string.
Recurses if there are multiple levels of dict. Used to print out hyperaparameters.

"\t"
+ " " * num_tabs
+ "{0}:\t{1}".format(
x, self.dict_to_str(param_dict[x], num_tabs + 1)
x, self._dict_to_str(param_dict[x], num_tabs + 1)
)
for x in param_dict
]

return """Hyperparameters for the {0} of brain {1}: \n{2}""".format(
self.__class__.__name__,
self.brain_name,
self.dict_to_str(self.trainer_parameters, 0),
self._dict_to_str(self.trainer_parameters, 0),
)
@property

return self.trainer_parameters
@property
def get_max_steps(self) -> float:
def get_max_steps(self) -> int:
return float(self.trainer_parameters["max_steps"])
return int(float(self.trainer_parameters["max_steps"]))
@property
def get_step(self) -> int:

return self.step
@property
def should_still_train(self) -> bool:
"""
Returns whether or not the trainer should train. A Trainer could
stop training if it wasn't training to begin with, or if max_steps
is reached.
"""
return self.is_training and self.get_step <= self.get_max_steps
@property
def reward_buffer(self) -> Deque[float]:
"""
Returns the reward buffer. The reward buffer contains the cumulative

"""
return self._reward_buffer
def increment_step(self, n_steps: int) -> None:
def _increment_step(self, n_steps: int, name_behavior_id: str) -> None:
self.next_update_step = self.step + (
self.summary_freq - self.step % self.summary_freq
)
p = self.get_policy(name_behavior_id)
if p:
p.increment_step(n_steps)
def save_model(self, name_behavior_id: str) -> None:
"""

"""
self.get_policy(name_behavior_id).export_model()
def write_summary(self, global_step: int, delta_train_start: float) -> None:
def _write_summary(self, step: int) -> None:
:param delta_train_start: Time elapsed since training started.
:param global_step: The number of steps the simulation has been going for
if (
global_step % self.trainer_parameters["summary_freq"] == 0
and global_step != 0
):
is_training = (
"Training."
if self.is_training and self.get_step <= self.get_max_steps
else "Not Training."
is_training = "Training." if self.should_still_train else "Not Training."
stats_summary = self.stats_reporter.get_stats_summaries(
"Environment/Cumulative Reward"
)
if stats_summary.num > 0:
LOGGER.info(
" {}: {}: Step: {}. "
"Time Elapsed: {:0.3f} s "
"Mean "
"Reward: {:0.3f}"
". Std of Reward: {:0.3f}. {}".format(
self.run_id,
self.brain_name,
step,
time.time() - self.training_start_time,
stats_summary.mean,
stats_summary.std,
is_training,
)
step = min(self.get_step, self.get_max_steps)
stats_summary = self.stats_reporter.get_stats_summaries(
"Environment/Cumulative Reward"
)
if stats_summary.num > 0:
LOGGER.info(
" {}: {}: Step: {}. "
"Time Elapsed: {:0.3f} s "
"Mean "
"Reward: {:0.3f}"
". Std of Reward: {:0.3f}. {}".format(
self.run_id,
self.brain_name,
step,
delta_train_start,
stats_summary.mean,
stats_summary.std,
is_training,
)
)
set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean)
else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(
self.run_id, self.brain_name, step, is_training
)
set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean)
else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(
self.run_id, self.brain_name, step, is_training
self.stats_reporter.write_stats(int(step))
)
self.stats_reporter.write_stats(int(step))
def process_trajectory(self, trajectory: Trajectory) -> None:
@abc.abstractmethod
def _process_trajectory(self, trajectory: Trajectory) -> None:
Processing involves calculating value and advantage targets for model updating step.
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
self._maybe_write_summary(self.get_step + len(trajectory.steps))
self._increment_step(len(trajectory.steps), trajectory.behavior_id)
def _maybe_write_summary(self, step_after_process: int) -> None:
"""
If processing the trajectory will make the step exceed the next summary write,
write the summary. This logic ensures summaries are written on the update step and not in between.
:param step_after_process: the step count after processing the next trajectory.
"""
if step_after_process >= self.next_update_step and self.get_step != 0:
self._write_summary(self.next_update_step)
@abc.abstractmethod
raise UnityTrainerException("The end_episode method was not implemented.")
pass
@abc.abstractmethod
def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy:
"""
Creates policy
"""
pass
@abc.abstractmethod
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None:
"""
Adds policy to trainer
"""
pass
@abc.abstractmethod
def get_policy(self, name_behavior_id: str) -> TFPolicy:
"""
Gets policy from trainer
"""
pass
def is_ready_update(self):
@abc.abstractmethod
def _is_ready_update(self):
raise UnityTrainerException("The is_ready_update method was not implemented.")
return False
def update_policy(self):
@abc.abstractmethod
def _update_policy(self):
raise UnityTrainerException("The update_model method was not implemented.")
pass
def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy:
def advance(self) -> None:
Creates policy
Steps the trainer, taking in trajectories and updates if ready.
raise UnityTrainerException("The create_policy method was not implemented.")
with hierarchical_timer("process_trajectory"):
for traj_queue in self.trajectory_queues:
try:
t = traj_queue.get_nowait()
self._process_trajectory(t)
except AgentManagerQueue.Empty:
pass
if self.should_still_train:
if self._is_ready_update():
with hierarchical_timer("_update_policy"):
self._update_policy()
for q in self.policy_queues:
# Get policies that correspond to the policy queue in question
q.put(self.get_policy(q.behavior_id))
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None:
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None:
Adds policy to trainer
Adds a policy queue to the list of queues to publish to when this Trainer
makes a policy update
:param queue: Policy queue to publish to.
raise UnityTrainerException("The add_policy method was not implemented")
self.policy_queues.append(policy_queue)
def get_policy(self, name_behavior_id: str) -> TFPolicy:
def subscribe_trajectory_queue(
self, trajectory_queue: AgentManagerQueue[Trajectory]
) -> None:
Gets policy from trainer
Adds a trajectory queue to the list of queues for the trainer injest Trajectories from.
:param queue: Trajectory queue to publish to.
raise UnityTrainerException("The get_policy method was not implemented.")
def advance(self) -> None:
pass
self.trajectory_queues.append(trajectory_queue)

159
ml-agents/mlagents/trainers/trainer_controller.py


import sys
import json
import logging
from typing import Dict, List, Optional, Set, NamedTuple
from typing import Dict, List, Optional, Set
from time import time
from mlagents.trainers.env_manager import EnvManager, EnvironmentStep
from mlagents_envs.exception import (

from mlagents.trainers.trainer import Trainer
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.agent_processor import AgentProcessor
class AgentManager(NamedTuple):
processor: AgentProcessor
from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue
class TrainerController(object):

self.save_freq = save_freq
self.train_model = train
self.meta_curriculum = meta_curriculum
self.training_start_time = time()
self.sampler_manager = sampler_manager
self.resampling_interval = resampling_interval
np.random.seed(training_seed)

if brain_name not in self.trainers:
continue
if curriculum.measure == "progress":
measure_val = (
self.trainers[brain_name].get_step
/ self.trainers[brain_name].get_max_steps
measure_val = self.trainers[brain_name].get_step / float(
self.trainers[brain_name].get_max_steps
)
brain_names_to_measure_vals[brain_name] = measure_val
elif curriculum.measure == "reward":

def _not_done_training(self) -> bool:
return (
any(t.get_step <= t.get_max_steps for k, t in self.trainers.items())
any(t.should_still_train for t in self.trainers.values())
def write_to_tensorboard(self, global_step: int) -> None:
for brain_name, trainer in self.trainers.items():
# Write training statistics to Tensorboard.
delta_train_start = time() - self.training_start_time
if (
self.meta_curriculum
and brain_name in self.meta_curriculum.brains_to_curricula
):
lesson_num = self.meta_curriculum.brains_to_curricula[
brain_name
].lesson_num
trainer.stats_reporter.add_stat("Environment/Lesson", lesson_num)
trainer.write_summary(global_step, delta_train_start)
def _create_trainer_and_manager(
self, env_manager: EnvManager, name_behavior_id: str
) -> None:
try:
brain_name, _ = name_behavior_id.split("?")
except ValueError:
brain_name = name_behavior_id
try:
trainer = self.trainers[brain_name]
except KeyError:
trainer = self.trainer_factory.generate(brain_name)
self.trainers[brain_name] = trainer
self.logger.info(trainer)
if self.train_model:
trainer.write_tensorboard_text("Hyperparameters", trainer.parameters)
policy = trainer.create_policy(env_manager.external_brains[name_behavior_id])
trainer.add_policy(name_behavior_id, policy)
env_manager.set_policy(name_behavior_id, policy)
self.brain_name_to_identifier[brain_name].add(name_behavior_id)
agent_manager = AgentManager(
policy,
name_behavior_id,
trainer.stats_reporter,
trainer.parameters.get("time_horizon", sys.maxsize),
)
trainer.publish_policy_queue(agent_manager.policy_queue)
trainer.subscribe_trajectory_queue(agent_manager.trajectory_queue)
self.managers[name_behavior_id] = agent_manager
def start_learning(self, env_manager: EnvManager) -> None:
self._create_model_path(self.model_path)

external_brain_behavior_ids = set(env_manager.external_brains.keys())
new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids
for name_behavior_id in new_behavior_ids:
try:
brain_name, _ = name_behavior_id.split("?")
except ValueError:
brain_name = name_behavior_id
try:
trainer = self.trainers[brain_name]
except KeyError:
trainer = self.trainer_factory.generate(brain_name)
self.trainers[brain_name] = trainer
self.logger.info(trainer)
if self.train_model:
trainer.write_tensorboard_text(
"Hyperparameters", trainer.parameters
)
policy = trainer.create_policy(
env_manager.external_brains[name_behavior_id]
)
trainer.add_policy(name_behavior_id, policy)
env_manager.set_policy(name_behavior_id, policy)
self.brain_name_to_identifier[brain_name].add(name_behavior_id)
agent_manager = AgentManager(
processor=AgentProcessor(
trainer,
policy,
name_behavior_id,
trainer.stats_reporter,
trainer.parameters.get("time_horizon", sys.maxsize),
)
)
self.managers[name_behavior_id] = agent_manager
self._create_trainer_and_manager(env_manager, name_behavior_id)
n_steps = self.advance(env_manager)
for _ in range(n_steps):
global_step += 1

self._save_model()
self.write_to_tensorboard(global_step)
# Final save Tensorflow model
if global_step != 0 and self.train_model:
self._save_model()

if meta_curriculum_reset or generalization_reset: