浏览代码

[refactor] Run Trainers in separate threads (#3690)

/develop/dockerfile
GitHub 5 年前
当前提交
4d23200b
共有 20 个文件被更改,包括 262 次插入115 次删除
  1. 2
      com.unity.ml-agents/CHANGELOG.md
  2. 21
      config/sac_trainer_config.yaml
  3. 4
      docs/Migrating.md
  4. 4
      docs/Training-ML-Agents.md
  5. 9
      docs/Training-PPO.md
  6. 40
      docs/Training-SAC.md
  7. 52
      ml-agents/mlagents/trainers/agent_processor.py
  8. 12
      ml-agents/mlagents/trainers/env_manager.py
  9. 4
      ml-agents/mlagents/trainers/ghost/trainer.py
  10. 1
      ml-agents/mlagents/trainers/ppo/trainer.py
  11. 90
      ml-agents/mlagents/trainers/sac/trainer.py
  12. 3
      ml-agents/mlagents/trainers/tests/test_reward_signals.py
  13. 23
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  14. 32
      ml-agents/mlagents/trainers/tests/test_sac.py
  15. 10
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  16. 12
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py
  17. 3
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  18. 14
      ml-agents/mlagents/trainers/trainer/rl_trainer.py
  19. 10
      ml-agents/mlagents/trainers/trainer/trainer.py
  20. 31
      ml-agents/mlagents/trainers/trainer_controller.py

2
com.unity.ml-agents/CHANGELOG.md


overwrite the existing files. (#3705)
- `StackingSensor` was changed from `internal` visibility to `public`
- Updated Barracuda to 0.6.3-preview.
- Model updates can now happen asynchronously with environment steps for better performance. (#3690)
- `num_updates` and `train_interval` for SAC were replaced with `steps_per_update`. (#3690)
### Bug Fixes

21
config/sac_trainer_config.yaml


max_steps: 5.0e5
memory_size: 128
normalize: false
num_update: 1
train_interval: 1
steps_per_update: 10
num_layers: 2
time_horizon: 64
sequence_length: 64

buffer_size: 500000
max_steps: 2.0e6
init_entcoef: 0.05
train_interval: 1
max_steps: 2.0e6
max_steps: 1.0e6
num_layers: 2
hidden_units: 64
summary_freq: 20000

init_entcoef: 0.05
hidden_units: 256
summary_freq: 60000
summary_freq: 100000
time_horizon: 64
num_layers: 2

normalize: true
time_horizon: 1000
batch_size: 256
train_interval: 2
steps_per_update: 20
max_steps: 5e6
max_steps: 3e6
summary_freq: 30000
init_entcoef: 1.0
num_layers: 3

batch_size: 256
buffer_size: 500000
summary_freq: 30000
train_interval: 2
steps_per_update: 20
max_steps: 1e7
max_steps: 5e6
hidden_units: 512
reward_signals:
extrinsic:

max_steps: 2e7
summary_freq: 30000
num_layers: 4
train_interval: 2
steps_per_update: 30
hidden_units: 512
reward_signals:
extrinsic:

batch_size: 128
buffer_size: 500000
max_steps: 2e7
steps_per_update: 20
summary_freq: 60000
Hallway:

memory_size: 128
init_entcoef: 0.1
max_steps: 1.0e7
max_steps: 5.0e6
summary_freq: 10000
time_horizon: 64
use_recurrent: true

4
docs/Migrating.md


- The signature of `Agent.Heuristic()` was changed to take a `float[]` as a
parameter, instead of returning the array. This was done to prevent a common
source of error where users would return arrays of the wrong size.
- `num_updates` and `train_interval` for SAC have been replaced with `steps_per_update`.
### Steps to Migrate

- If your Agent class overrides `Heuristic()`, change the signature to
`public override void Heuristic(float[] actionsOut)` and assign values to
`actionsOut` instead of returning an array.
- Set `steps_per_update` to be around equal to the number of agents in your environment,
times `num_updates` and divided by `train_interval`.
## Migrating from 0.14 to 0.15

4
docs/Training-ML-Agents.md


| tau | How aggressively to update the target network used for bootstrapping value estimation in SAC. | SAC |
| time_horizon | How many steps of experience to collect per-agent before adding it to the experience buffer. | PPO, SAC |
| trainer | The type of training to perform: "ppo", "sac", "offline_bc" or "online_bc". | PPO, SAC |
| train_interval | How often to update the agent. | SAC |
| num_update | Number of mini-batches to update the agent with during each update. | SAC |
| steps_per_update | Ratio of agent steps per mini-batch update. | SAC |
| threaded | Run the trainer in a parallel thread from the environment steps. (Default: true) | PPO, SAC |
For specific advice on setting hyperparameters based on the type of training you
are conducting, see:

9
docs/Training-PPO.md


in most cases, it is sufficient to use the `--initialize-from` CLI parameter to initialize
all models from the same run.
### (Optional) Advanced: Disable Threading
By default, PPO model updates can happen while the environment is being stepped. This violates the
[on-policy](https://spinningup.openai.com/en/latest/user/algorithms.html#the-on-policy-algorithms)
assumption of PPO slightly in exchange for a 10-20% training speedup. To maintain the
strict on-policyness of PPO, you can disable parallel updates by setting `threaded` to `false`.
Default Value: `true`
## Training Statistics
To view training statistics, use TensorBoard. For information on launching and

40
docs/Training-SAC.md


Curiosity reward, which can be used to encourage exploration in sparse extrinsic reward
environments.
#### Number of Updates for Reward Signal (Optional)
#### Steps Per Update for Reward Signal (Optional)
`reward_signal_num_update` for the reward signals corresponds to the number of mini batches sampled
and used for updating the reward signals during each
update. By default, we update the reward signals once every time the main policy is updated.
`reward_signal_steps_per_update` for the reward signals corresponds to the number of steps per mini batch sampled
and used for updating the reward signals. By default, we update the reward signals once every time the main policy is updated.
we may want to update the policy N times, then update the reward signal (GAIL) M times.
We can change `train_interval` and `num_update` of SAC to N, as well as `reward_signal_num_update`
under `reward_signals` to M to accomplish this. By default, `reward_signal_num_update` is set to
`num_update`.
we may want to update the reward signal (GAIL) M times for every update of the policy.
We can change `steps_per_update` of SAC to N, as well as `reward_signal_steps_per_update`
under `reward_signals` to N / M to accomplish this. By default, `reward_signal_steps_per_update` is set to
`steps_per_update`.
Typical Range: `num_update`
Typical Range: `steps_per_update`
### Buffer Size

Typical Range: `1` - `5`
### Number of Updates
### Steps Per Update
`num_update` corresponds to the number of mini batches sampled and used for training during each
training event. In SAC, a single "update" corresponds to grabbing a batch of size `batch_size` from the experience
replay buffer, and using this mini batch to update the models. Typically, this can be left at 1.
However, to imitate the training procedure in certain papers (e.g.
[Kostrikov et. al](http://arxiv.org/abs/1809.02925), [Blondé et. al](http://arxiv.org/abs/1809.02064)),
we may want to update N times with different mini batches before grabbing additional samples.
We can change `train_interval` and `num_update` to N to accomplish this.
`steps_per_update` corresponds to the average ratio of agent steps (actions) taken to updates made of the agent's
policy. In SAC, a single "update" corresponds to grabbing a batch of size `batch_size` from the experience
replay buffer, and using this mini batch to update the models. Note that it is not guaranteed that after
exactly `steps_per_update` steps an update will be made, only that the ratio will hold true over many steps.
Typically, `steps_per_update` should be greater than or equal to 1. Note that setting `steps_per_update` lower will
improve sample efficiency (reduce the number of steps required to train)
but increase the CPU time spent performing updates. For most environments where steps are fairly fast (e.g. our example
environments) `steps_per_update` equal to the number of agents in the scene is a good balance.
For slow environments (steps take 0.1 seconds or more) reducing `steps_per_update` may improve training speed.
We can also change `steps_per_update` to lower than 1 to update more often than once per step, though this will
usually result in a slowdown unless the environment is very slow.
Typical Range: `1`
Typical Range: `1` - `20`
### Tau

52
ml-agents/mlagents/trainers/agent_processor.py


import sys
from typing import List, Dict, Deque, TypeVar, Generic, Tuple, Any, Union
from collections import defaultdict, Counter, deque
from typing import List, Dict, TypeVar, Generic, Tuple, Any, Union
from collections import defaultdict, Counter
import queue
from mlagents_envs.base_env import (
DecisionSteps,

pass
def __init__(self, behavior_id: str, maxlen: int = 1000):
def __init__(self, behavior_id: str, maxlen: int = 20):
self.maxlen: int = maxlen
self.queue: Deque[T] = deque(maxlen=self.maxlen)
self.behavior_id = behavior_id
self._maxlen: int = maxlen
self._queue: queue.Queue = queue.Queue(maxsize=maxlen)
self._behavior_id = behavior_id
@property
def maxlen(self):
"""
The maximum length of the queue.
:return: Maximum length of the queue.
"""
return self._maxlen
@property
def behavior_id(self):
"""
The Behavior ID of this queue.
:return: Behavior ID associated with the queue.
"""
return self._behavior_id
def qsize(self) -> int:
"""
Returns the approximate size of the queue. Note that values may differ
depending on the underlying queue implementation.
"""
return self._queue.qsize()
return len(self.queue) == 0
return self._queue.empty()
"""
Gets the next item from the queue, throwing an AgentManagerQueue.Empty exception
if the queue is empty.
"""
return self.queue.popleft()
except IndexError:
return self._queue.get_nowait()
except queue.Empty:
self.queue.append(item)
self._queue.put(item)
class AgentManager(AgentProcessor):

self.trajectory_queue: AgentManagerQueue[Trajectory] = AgentManagerQueue(
self.behavior_id
)
# NOTE: we make policy queues of infinite length to avoid lockups of the trainers.
# In the environment manager, we make sure to empty the policy queue before continuing to produce steps.
self.behavior_id
self.behavior_id, maxlen=0
)
self.publish_trajectory_queue(self.trajectory_queue)

12
ml-agents/mlagents/trainers/env_manager.py


if self.first_step_infos is not None:
self._process_step_infos(self.first_step_infos)
self.first_step_infos = None
# Get new policies if found
# Get new policies if found. Always get the latest policy.
_policy = None
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
self.set_policy(brain_name, _policy)
# We make sure to empty the policy queue before continuing to produce steps.
# This halts the trainers until the policy queue is empty.
while True:
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
pass
if _policy is not None:
self.set_policy(brain_name, _policy)
# Step the environment
new_step_infos = self._step()
# Add to AgentProcessor

4
ml-agents/mlagents/trainers/ghost/trainer.py


# We grab at most the maximum length of the queue.
# This ensures that even if the queue is being filled faster than it is
# being emptied, the trajectories in the queue are on-policy.
for _ in range(trajectory_queue.maxlen):
for _ in range(trajectory_queue.qsize()):
t = trajectory_queue.get_nowait()
# adds to wrapped trainers queue
internal_trajectory_queue.put(t)

else:
# Dump trajectories from non-learning policy
try:
for _ in range(trajectory_queue.maxlen):
for _ in range(trajectory_queue.qsize()):
t = trajectory_queue.get_nowait()
# count ghost steps
self.ghost_step += len(t.steps)

1
ml-agents/mlagents/trainers/ppo/trainer.py


for stat, val in update_stats.items():
self._stats_reporter.add_stat(stat, val)
self._clear_update_buffer()
return True
def create_policy(
self, parsed_behavior_id: BehaviorIdentifiers, brain_parameters: BrainParameters

90
ml-agents/mlagents/trainers/sac/trainer.py


logger = get_logger(__name__)
BUFFER_TRUNCATE_PERCENT = 0.8
DEFAULT_STEPS_PER_UPDATE = 1
class SACTrainer(RLTrainer):

"init_entcoef",
"max_steps",
"normalize",
"num_update",
"steps_per_update",
"sequence_length",
"summary_freq",
"tau",

self.optimizer: SACOptimizer = None # type: ignore
self.step = 0
self.train_interval = (
trainer_parameters["train_interval"]
if "train_interval" in trainer_parameters
else 1
# Don't count buffer_init_steps in steps_per_update ratio, but also don't divide-by-0
self.update_steps = max(1, self.trainer_parameters["buffer_init_steps"])
self.reward_signal_update_steps = max(
1, self.trainer_parameters["buffer_init_steps"]
self.reward_signal_updates_per_train = (
trainer_parameters["reward_signals"]["reward_signal_num_update"]
if "reward_signal_num_update" in trainer_parameters["reward_signals"]
else trainer_parameters["num_update"]
self.steps_per_update = (
trainer_parameters["steps_per_update"]
if "steps_per_update" in trainer_parameters
else DEFAULT_STEPS_PER_UPDATE
)
self.reward_signal_steps_per_update = (
trainer_parameters["reward_signals"]["reward_signal_steps_per_update"]
if "reward_signal_steps_per_update" in trainer_parameters["reward_signals"]
else self.steps_per_update
)
self.checkpoint_replay_buffer = (

def _is_ready_update(self) -> bool:
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to whether or not update_model() can be run
:return: A boolean corresponding to whether or not _update_policy() can be run
"""
return (
self.update_buffer.num_experiences >= self.trainer_parameters["batch_size"]

@timed
def _update_policy(self) -> None:
def _update_policy(self) -> bool:
If train_interval is met, update the SAC policy given the current reward signals.
If reward_signal_train_interval is met, update the reward signals from the buffer.
Update the SAC policy and reward signals. The reward signal generators are updated using different mini batches.
By default we imitate http://arxiv.org/abs/1809.02925 and similar papers, where the policy is updated
N times, then the reward signals are updated N times.
:return: Whether or not the policy was updated.
if self.step % self.train_interval == 0:
self.update_sac_policy()
self.update_reward_signals()
policy_was_updated = self._update_sac_policy()
self._update_reward_signals()
return policy_was_updated
def create_policy(
self, parsed_behavior_id: BehaviorIdentifiers, brain_parameters: BrainParameters

return policy
def update_sac_policy(self) -> None:
def _update_sac_policy(self) -> bool:
Uses demonstration_buffer to update the policy.
The reward signal generators are updated using different mini batches.
If we want to imitate http://arxiv.org/abs/1809.02925 and similar papers, where the policy is updated
N times, then the reward signals are updated N times, then reward_signal_updates_per_train
is greater than 1 and the reward signals are not updated in parallel.
Uses update_buffer to update the policy. We sample the update_buffer and update
until the steps_per_update ratio is met.
has_updated = False
num_updates = self.trainer_parameters["num_update"]
for _ in range(num_updates):
while self.step / self.update_steps > self.steps_per_update:
logger.debug("Updating SAC policy at step {}".format(self.step))
buffer = self.update_buffer
if (

for stat_name, value in update_stats.items():
batch_update_stats[stat_name].append(value)
self.update_steps += 1
for stat, stat_list in batch_update_stats.items():
self._stats_reporter.add_stat(stat, np.mean(stat_list))
has_updated = True
if self.optimizer.bc_module:
update_stats = self.optimizer.bc_module.update()
for stat, val in update_stats.items():
self._stats_reporter.add_stat(stat, val)
# Truncate update buffer if neccessary. Truncate more than we need to to avoid truncating
# a large buffer at each update.
if self.update_buffer.num_experiences > self.trainer_parameters["buffer_size"]:

return has_updated
for stat, stat_list in batch_update_stats.items():
self._stats_reporter.add_stat(stat, np.mean(stat_list))
if self.optimizer.bc_module:
update_stats = self.optimizer.bc_module.update()
for stat, val in update_stats.items():
self._stats_reporter.add_stat(stat, val)
def update_reward_signals(self) -> None:
def _update_reward_signals(self) -> None:
"""
Iterate through the reward signals and update them. Unlike in PPO,
do it separate from the policy so that it can be done at a different

and policy are updated in parallel.
"""
buffer = self.update_buffer
num_updates = self.reward_signal_updates_per_train
for _ in range(num_updates):
while (
self.step / self.reward_signal_update_steps
> self.reward_signal_steps_per_update
):
# Get minibatches for reward signal update if needed
reward_signal_minibatches = {}
for name, signal in self.optimizer.reward_signals.items():

)
for stat_name, value in update_stats.items():
batch_update_stats[stat_name].append(value)
for stat, stat_list in batch_update_stats.items():
self._stats_reporter.add_stat(stat, np.mean(stat_list))
self.reward_signal_update_steps += 1
for stat, stat_list in batch_update_stats.items():
self._stats_reporter.add_stat(stat, np.mean(stat_list))
def add_policy(
self, parsed_behavior_id: BehaviorIdentifiers, policy: TFPolicy

3
ml-agents/mlagents/trainers/tests/test_reward_signals.py


max_steps: 5.0e4
memory_size: 256
normalize: false
num_update: 1
train_interval: 1
steps_per_update: 1
num_layers: 2
time_horizon: 64
sequence_length: 64

23
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


import yaml
from unittest import mock
import pytest
import mlagents.trainers.tests.mock_brain as mb
from mlagents.trainers.trainer.rl_trainer import RLTrainer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer

# Add concrete implementations of abstract methods
class FakeTrainer(RLTrainer):
def set_is_policy_updating(self, is_updating):
self.update_policy = is_updating
def get_policy(self, name_behavior_id):
return mock.Mock()

def _update_policy(self):
pass
return self.update_policy
def add_policy(self):
pass

def create_rl_trainer():
mock_brainparams = create_mock_brain()
trainer = FakeTrainer(mock_brainparams, dummy_config(), True, 0)
trainer.set_is_policy_updating(True)
return trainer

def test_advance(mocked_clear_update_buffer):
trainer = create_rl_trainer()
trajectory_queue = AgentManagerQueue("testbrain")
policy_queue = AgentManagerQueue("testbrain")
time_horizon = 15
trainer.publish_policy_queue(policy_queue)
time_horizon = 10
trajectory = mb.make_fake_trajectory(
length=time_horizon,
max_step_complete=True,

trajectory_queue.put(trajectory)
trainer.advance()
policy_queue.get_nowait()
for _ in range(0, 5):
trajectory_queue.put(trajectory)
trainer.advance()
# Check that there is stuff in the policy queue
policy_queue.get_nowait()
# Check that if the policy doesn't update, we don't push it to the queue
trainer.set_is_policy_updating(False)
# Check that there nothing in the policy queue
with pytest.raises(AgentManagerQueue.Empty):
policy_queue.get_nowait()
# Check that the buffer has been cleared
assert not trainer.should_still_train

32
ml-agents/mlagents/trainers/tests/test_sac.py


return yaml.safe_load(
"""
trainer: sac
batch_size: 32
batch_size: 8
buffer_size: 10240
buffer_init_steps: 0
hidden_units: 32

memory_size: 10
normalize: true
num_update: 1
train_interval: 1
steps_per_update: 1
num_layers: 1
time_horizon: 64
sequence_length: 16

trainer.add_policy(brain_params, policy)
def test_process_trajectory(dummy_config):
def test_advance(dummy_config):
dummy_config["steps_per_update"] = 20
policy_queue = AgentManagerQueue("testbrain")
trainer.publish_policy_queue(policy_queue)
trajectory = make_fake_trajectory(
length=15,

action_space=[2],
is_discrete=False,
)
trajectory_queue.put(trajectory)
trainer.advance()

# Add a terminal trajectory
trajectory = make_fake_trajectory(
length=15,
length=6,
is_discrete=False,
)
trajectory_queue.put(trajectory)
trainer.advance()

assert (
trainer.stats_reporter.get_stats_summaries("Policy/Extrinsic Reward").mean > 0
)
# Make sure there is a policy on the queue
policy_queue.get_nowait()
# Add another trajectory. Since this is less than 20 steps total (enough for)
# two updates, there should NOT be a policy on the queue.
trajectory = make_fake_trajectory(
length=5,
max_step_complete=False,
vec_obs_size=6,
num_vis_obs=0,
action_space=[2],
is_discrete=False,
)
trajectory_queue.put(trajectory)
trainer.advance()
with pytest.raises(AgentManagerQueue.Empty):
policy_queue.get_nowait()
def test_bad_config(dummy_config):

10
ml-agents/mlagents/trainers/tests/test_simple_rl.py


sequence_length: 64
summary_freq: 500
use_recurrent: false
threaded: false
reward_signals:
extrinsic:
strength: 1.0

max_steps: 1000
memory_size: 16
normalize: false
num_update: 1
train_interval: 1
steps_per_update: 1
num_layers: 1
time_horizon: 64
sequence_length: 32

curiosity_enc_size: 128
demo_path: None
vis_encode_type: simple
threaded: false
reward_signals:
extrinsic:
strength: 1.0

StatsReporter.writers.clear() # Clear StatsReporters so we don't write to file
debug_writer = DebugWriter()
StatsReporter.add_writer(debug_writer)
# Make sure threading is turned off for determinism
trainer_config["threading"] = False
if env_manager is None:
env_manager = SimpleEnvManager(env, FloatPropertiesChannel())
trainer_factory = TrainerFactory(

override_vals = {
"batch_size": 64,
"use_recurrent": True,
"max_steps": 3000,
"max_steps": 5000,
"steps_per_update": 2,
}
config = generate_config(SAC_CONFIG, override_vals)
_check_environment_trains(env, config)

12
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


from mlagents_envs.exception import UnityEnvironmentException
from mlagents.trainers.tests.simple_test_envs import SimpleEnvironment
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests.test_simple_rl import (
_check_environment_trains,
PPO_CONFIG,

)
external_brains_mock.return_value = [brain_name]
agent_manager_mock = mock.Mock()
mock_policy = mock.Mock()
agent_manager_mock.policy_queue.get_nowait.side_effect = [
mock_policy,
mock_policy,
AgentManagerQueue.Empty(),
]
env_manager.set_agent_manager(brain_name, agent_manager_mock)
step_info_dict = {brain_name: (Mock(), Mock())}

)
# Test policy queue
mock_policy = mock.Mock()
agent_manager_mock.policy_queue.get_nowait.return_value = mock_policy
env_manager.advance()
assert env_manager.policies[brain_name] == mock_policy
assert agent_manager_mock.policy == mock_policy

env_manager = SubprocessEnvManager(
simple_env_factory, EngineConfig.default_config(), num_envs
)
trainer_config = generate_config(PPO_CONFIG)
trainer_config = generate_config(PPO_CONFIG, override_vals={"max_steps": 5000})
# Run PPO using env_manager
_check_environment_trains(
simple_env_factory(0, []),

3
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


env_mock.reset.assert_not_called()
env_mock.advance.assert_called_once()
trainer_mock.advance.assert_called_once()
# May have been called many times due to thread
trainer_mock.advance.call_count > 0

14
ml-agents/mlagents/trainers/trainer/rl_trainer.py


return False
@abc.abstractmethod
def _update_policy(self):
def _update_policy(self) -> bool:
:return: Whether or not the policy was updated.
"""
pass

def advance(self) -> None:
"""
Steps the trainer, taking in trajectories and updates if ready.
Will block and wait briefly if there are no trajectories.
"""
with hierarchical_timer("process_trajectory"):
for traj_queue in self.trajectory_queues:

for _ in range(traj_queue.maxlen):
for _ in range(traj_queue.qsize()):
try:
t = traj_queue.get_nowait()
self._process_trajectory(t)

if self._is_ready_update():
with hierarchical_timer("_update_policy"):
self._update_policy()
for q in self.policy_queues:
# Get policies that correspond to the policy queue in question
q.put(self.get_policy(q.behavior_id))
if self._update_policy():
for q in self.policy_queues:
# Get policies that correspond to the policy queue in question
q.put(self.get_policy(q.behavior_id))
else:
self._clear_update_buffer()

10
ml-agents/mlagents/trainers/trainer/trainer.py


self.run_id = run_id
self.trainer_parameters = trainer_parameters
self.summary_path = trainer_parameters["summary_path"]
self._threaded = trainer_parameters.get("threaded", True)
self._stats_reporter = StatsReporter(self.summary_path)
self.is_training = training
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)

:return: the step count of the trainer
"""
return self.step
@property
def threaded(self) -> bool:
"""
Whether or not to run the trainer in a thread. True allows the trainer to
update the policy while the environment is taking steps. Set to False to
enforce strict on-policy updates (i.e. don't update the policy when taking steps.)
"""
return self._threaded
@property
def should_still_train(self) -> bool:

31
ml-agents/mlagents/trainers/trainer_controller.py


import os
import sys
from typing import Dict, Optional, Set
import threading
from typing import Dict, Optional, Set, List
from collections import defaultdict
import numpy as np

:param training_seed: Seed to use for Numpy and Tensorflow random number generation.
:param sampler_manager: SamplerManager object handles samplers for resampling the reset parameters.
:param resampling_interval: Specifies number of simulation steps after which reset parameters are resampled.
:param threaded: Whether or not to run trainers in a separate thread. Disable for testing/debugging.
"""
self.trainers: Dict[str, Trainer] = {}
self.brain_name_to_identifier: Dict[str, Set] = defaultdict(set)

self.meta_curriculum = meta_curriculum
self.sampler_manager = sampler_manager
self.resampling_interval = resampling_interval
self.trainer_threads: List[threading.Thread] = []
self.kill_trainers = False
np.random.seed(training_seed)
tf.set_random_seed(training_seed)

trainer.publish_policy_queue(agent_manager.policy_queue)
trainer.subscribe_trajectory_queue(agent_manager.trajectory_queue)
if trainer.threaded:
# Start trainer thread
trainerthread = threading.Thread(
target=self.trainer_update_func, args=(trainer,), daemon=True
)
trainerthread.start()
self.trainer_threads.append(trainerthread)
def _create_trainers_and_managers(
self, env_manager: EnvManager, behavior_ids: Set[str]

self.reset_env_if_ready(env_manager, global_step)
if self._should_save_model(global_step):
self._save_model()
# Stop advancing trainers
self.kill_trainers = True
# Final save Tensorflow model
if global_step != 0 and self.train_model:
self._save_model()

UnityEnvironmentException,
) as ex:
self.kill_trainers = True
if self.train_model:
self._save_model_when_interrupted()

"Environment/Lesson", curr.lesson_num
)
# Advance trainers. This can be done in a separate loop in the future.
with hierarchical_timer("trainer_advance"):
for trainer in self.trainers.values():
trainer.advance()
for trainer in self.trainers.values():
if not trainer.threaded:
with hierarchical_timer("trainer_advance"):
trainer.advance()
def trainer_update_func(self, trainer: Trainer) -> None:
while not self.kill_trainers:
with hierarchical_timer("trainer_advance"):
trainer.advance()
正在加载...
取消
保存