浏览代码

Merge latest develop

/hotfix-v0.9.2a
Ervin Teng 5 年前
当前提交
072d2ef8
共有 36 个文件被更改,包括 2077 次插入1345 次删除
  1. 1
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py
  2. 7
      ml-agents-envs/mlagents/envs/tests/test_timers.py
  3. 63
      ml-agents-envs/mlagents/envs/timers.py
  4. 2
      ml-agents/mlagents/trainers/bc/models.py
  5. 11
      ml-agents/mlagents/trainers/bc/offline_trainer.py
  6. 11
      ml-agents/mlagents/trainers/bc/online_trainer.py
  7. 31
      ml-agents/mlagents/trainers/bc/trainer.py
  8. 171
      ml-agents/mlagents/trainers/buffer.py
  9. 38
      ml-agents/mlagents/trainers/components/bc/module.py
  10. 68
      ml-agents/mlagents/trainers/components/reward_signals/curiosity/signal.py
  11. 5
      ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py
  12. 6
      ml-agents/mlagents/trainers/components/reward_signals/gail/model.py
  13. 62
      ml-agents/mlagents/trainers/components/reward_signals/gail/signal.py
  14. 55
      ml-agents/mlagents/trainers/learn.py
  15. 258
      ml-agents/mlagents/trainers/models.py
  16. 241
      ml-agents/mlagents/trainers/ppo/models.py
  17. 144
      ml-agents/mlagents/trainers/ppo/policy.py
  18. 334
      ml-agents/mlagents/trainers/ppo/trainer.py
  19. 14
      ml-agents/mlagents/trainers/tests/mock_brain.py
  20. 71
      ml-agents/mlagents/trainers/tests/test_buffer.py
  21. 9
      ml-agents/mlagents/trainers/tests/test_learn.py
  22. 2
      ml-agents/mlagents/trainers/tests/test_ppo.py
  23. 15
      ml-agents/mlagents/trainers/tests/test_reward_signals.py
  24. 289
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  25. 5
      ml-agents/mlagents/trainers/tf_policy.py
  26. 173
      ml-agents/mlagents/trainers/trainer.py
  27. 113
      ml-agents/mlagents/trainers/trainer_controller.py
  28. 1
      ml-agents/setup.py
  29. 140
      ml-agents/mlagents/trainers/ppo/multi_gpu_policy.py
  30. 253
      ml-agents/mlagents/trainers/rl_trainer.py
  31. 129
      ml-agents/mlagents/trainers/tests/test_multigpu.py
  32. 81
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  33. 207
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  34. 315
      ml-agents/mlagents/trainers/tests/test_trainer_util.py
  35. 97
      ml-agents/mlagents/trainers/trainer_util.py

1
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


# So after we send back the root timer, we can safely clear them.
# Note that we could randomly return timers a fraction of the time if we wanted to reduce
# the data transferred.
# TODO get gauges from the workers and merge them in the main process too.
step_response = StepResponse(all_brain_info, get_timer_root())
step_queue.put(EnvironmentResponse("step", worker_id, step_response))
reset_timers()

7
ml-agents-envs/mlagents/envs/tests/test_timers.py


@timers.timed
def decorated_func(x: int = 0, y: float = 1.0) -> str:
timers.set_gauge("my_gauge", x + y)
return f"{x} + {y} = {x + y}"

with timers.hierarchical_timer("top_level"):
for i in range(3):
with timers.hierarchical_timer("multiple"):
decorated_func()
decorated_func(i, i)
raised = False
try:

],
}
],
"gauges": [
{"name": "my_gauge", "value": 4.0, "max": 4.0, "min": 0.0, "count": 3}
],
assert timer_tree == expected_tree

63
ml-agents-envs/mlagents/envs/timers.py


# # Unity ML-Agents Toolkit
import math
from typing import Any, Callable, Dict, Generator, TypeVar
from typing import Any, Callable, Dict, Generator, List, TypeVar
"""
Lightweight, hierarchical timers for profiling sections of code.

child.merge(other_child_node, is_parallel=is_parallel)
class GaugeNode:
"""
Tracks the most recent value of a metric. This is analogous to gauges in statsd.
"""
__slots__ = ["value", "min_value", "max_value", "count"]
def __init__(self, value: float):
self.value = value
self.min_value = value
self.max_value = value
self.count = 1
def update(self, new_value: float):
self.min_value = min(self.min_value, new_value)
self.max_value = max(self.max_value, new_value)
self.value = new_value
self.count += 1
def as_dict(self) -> Dict[str, float]:
return {
"value": self.value,
"min": self.min_value,
"max": self.max_value,
"count": self.count,
}
class TimerStack:
"""
Tracks all the time spent. Users shouldn't use this directly, they should use the contextmanager below to make

__slots__ = ["root", "stack", "start_time"]
__slots__ = ["root", "stack", "start_time", "gauges"]
self.gauges: Dict[str, GaugeNode] = {}
self.gauges: Dict[str, GaugeNode] = {}
def push(self, name: str) -> TimerNode:
"""

node = self.get_root()
res["name"] = "root"
# Only output gauges at top level
if self.gauges:
res["gauges"] = self._get_gauges()
res["total"] = node.total
res["count"] = node.count

return res
def set_gauge(self, name: str, value: float) -> None:
if math.isnan(value):
return
gauge_node = self.gauges.get(name)
if gauge_node:
gauge_node.update(value)
else:
self.gauges[name] = GaugeNode(value)
def _get_gauges(self) -> List[Dict[str, Any]]:
gauges = []
for gauge_name, gauge_node in self.gauges.items():
gauge_dict: Dict[str, Any] = {"name": gauge_name, **gauge_node.as_dict()}
gauges.append(gauge_dict)
return gauges
# Global instance of a TimerStack. This is generally all that we need for profiling, but you can potentially
# create multiple instances and pass them to the contextmanager

return func(*args, **kwargs)
return wrapped # type: ignore
def set_gauge(name: str, value: float, timer_stack: TimerStack = None) -> None:
"""
Updates the value of the gauge (or creates it if it hasn't been set before).
"""
timer_stack = timer_stack or _global_timer_stack
timer_stack.set_gauge(name, value)
def get_timer_tree(timer_stack: TimerStack = None) -> Dict[str, Any]:

2
ml-agents/mlagents/trainers/bc/models.py


self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
self.sample_action_float, normalized_logits = self.create_discrete_action_masking_layer(
self.sample_action_float, _, normalized_logits = self.create_discrete_action_masking_layer(
tf.concat(policy_branches, axis=1), self.action_masks, self.act_size
)
tf.identity(normalized_logits, name="action")

11
ml-agents/mlagents/trainers/bc/offline_trainer.py


"The provided demonstration is not compatible with the "
"brain being used for performance evaluation."
)
def __str__(self):
return """Hyperparameters for the Imitation Trainer of brain {0}: \n{1}""".format(
self.brain_name,
"\n".join(
[
"\t{0}:\t{1}".format(x, self.trainer_parameters[x])
for x in self.param_keys
]
),
)

11
ml-agents/mlagents/trainers/bc/online_trainer.py


int(trainer_parameters["batch_size"] / self.policy.sequence_length), 1
)
def __str__(self):
return """Hyperparameters for the Imitation Trainer of brain {0}: \n{1}""".format(
self.brain_name,
"\n".join(
[
"\t{0}:\t{1}".format(x, self.trainer_parameters[x])
for x in self.param_keys
]
),
)
def add_experiences(
self,
curr_info: AllBrainInfo,

31
ml-agents/mlagents/trainers/bc/trainer.py


self.demonstration_buffer = Buffer()
self.evaluation_buffer = Buffer()
@property
def parameters(self):
"""
Returns the trainer parameters of the trainer.
"""
return self.trainer_parameters
@property
def get_max_steps(self):
"""
Returns the maximum number of steps. Is used to know when the trainer should be stopped.
:return: The maximum number of steps of the trainer
"""
return float(self.trainer_parameters["max_steps"])
@property
def get_step(self):
"""
Returns the number of steps the trainer has performed
:return: the step count of the trainer
"""
return self.policy.get_current_step()
def increment_step(self, n_steps: int) -> None:
"""
Increment the step count of the trainer
:param n_steps: number of steps to increment the step count by
"""
self.step = self.policy.increment_step(n_steps)
def add_experiences(
self,
curr_info: AllBrainInfo,

171
ml-agents/mlagents/trainers/buffer.py


import random
from collections import defaultdict
import h5py
from mlagents.envs.exception import UnityException

sequential=True gives [[0,a],[b,c],[d,e]]. If sequential=False gives
[[a,b],[b,c],[c,d],[d,e]]
"""
if training_length == 1:
# When the training length is 1, the method returns a list of elements,
# not a list of sequences of elements.
if sequential:
# The sequences will not have overlapping elements (this involves padding)
leftover = len(self) % training_length
# leftover is the number of elements in the first sequence (this sequence might need 0 padding)
# If batch_size is None : All the elements of the AgentBufferField are returned.
return np.array(self)
# retrieve the maximum number of elements
batch_size = len(self) // training_length + 1 * (leftover != 0)
# The maximum number of sequences taken from a list of length len(self) without overlapping
# with padding is equal to batch_size
if batch_size > (
len(self) // training_length + 1 * (leftover != 0)
):
raise BufferException(
"The batch size and training length requested for get_batch where"
" too large given the current number of data points."
)
if batch_size * training_length > len(self):
padding = np.array(self[-1]) * self.padding_value
return np.array(
[padding] * (training_length - leftover) + self[:]
)
# return the batch_size last elements
if batch_size > len(self):
raise BufferException("Batch size requested is too large")
return np.array(self[-batch_size:])
return np.array(
self[len(self) - batch_size * training_length :]
)
# The training_length is not None, the method returns a list of SEQUENCES of elements
if not sequential:
# The sequences will have overlapping elements
if batch_size is None:
# retrieve the maximum number of elements
batch_size = len(self) - training_length + 1
# The number of sequences of length training_length taken from a list of len(self) elements
# with overlapping is equal to batch_size
if (len(self) - training_length + 1) < batch_size:
raise BufferException(
"The batch size and training length requested for get_batch where"
" too large given the current number of data points."
)
tmp_list = []
for end in range(len(self) - batch_size + 1, len(self) + 1):
tmp_list += [np.array(self[end - training_length : end])]
return np.array(tmp_list)
if sequential:
# The sequences will not have overlapping elements (this involves padding)
leftover = len(self) % training_length
# leftover is the number of elements in the first sequence (this sequence might need 0 padding)
if batch_size is None:
# retrieve the maximum number of elements
batch_size = len(self) // training_length + 1 * (
leftover != 0
)
# The maximum number of sequences taken from a list of length len(self) without overlapping
# with padding is equal to batch_size
if batch_size > (
len(self) // training_length + 1 * (leftover != 0)
):
raise BufferException(
"The batch size and training length requested for get_batch where"
" too large given the current number of data points."
)
tmp_list = []
padding = np.array(self[-1]) * self.padding_value
# The padding is made with zeros and its shape is given by the shape of the last element
for end in range(
len(self), len(self) % training_length, -training_length
)[:batch_size]:
tmp_list += [np.array(self[end - training_length : end])]
if (leftover != 0) and (len(tmp_list) < batch_size):
tmp_list += [
np.array(
[padding] * (training_length - leftover)
+ self[:leftover]
)
]
tmp_list.reverse()
return np.array(tmp_list)
# The sequences will have overlapping elements
if batch_size is None:
# retrieve the maximum number of elements
batch_size = len(self) - training_length + 1
# The number of sequences of length training_length taken from a list of len(self) elements
# with overlapping is equal to batch_size
if (len(self) - training_length + 1) < batch_size:
raise BufferException(
"The batch size and training length requested for get_batch where"
" too large given the current number of data points."
)
tmp_list = []
for end in range(len(self) - batch_size + 1, len(self) + 1):
tmp_list += self[end - training_length : end]
return np.array(tmp_list)
def reset_field(self):
"""

length = len(self[key])
return True
def shuffle(self, key_list=None):
def shuffle(self, sequence_length, key_list=None):
Shuffles the fields in key_list in a consistent way: The reordering will
Shuffles the fields in key_list in a consistent way: The reordering will
be the same across fields.
:param key_list: The fields that must be shuffled.

raise BufferException(
"Unable to shuffle if the fields are not of same length"
)
s = np.arange(len(self[key_list[0]]))
s = np.arange(len(self[key_list[0]]) // sequence_length)
self[key][:] = [self[key][i] for i in s]
tmp = []
for i in s:
tmp += self[key][i * sequence_length : (i + 1) * sequence_length]
self[key][:] = tmp
def make_mini_batch(self, start, end):
"""

"""
mini_batch = {}
for key in self:
mini_batch[key] = np.array(self[key][start:end])
mini_batch[key] = self[key][start:end]
return mini_batch
def sample_mini_batch(self, batch_size, sequence_length=1):
"""
Creates a mini-batch from a random start and end.
:param batch_size: number of elements to withdraw.
:param sequence_length: Length of sequences to sample.
Number of sequences to sample will be batch_size/sequence_length.
"""
num_seq_to_sample = batch_size // sequence_length
mini_batch = Buffer.AgentBuffer()
buff_len = len(next(iter(self.values())))
num_sequences_in_buffer = buff_len // sequence_length
start_idxes = [
random.randint(0, num_sequences_in_buffer - 1) * sequence_length
for _ in range(num_seq_to_sample)
] # Sample random sequence starts
for i in start_idxes:
for key in self:
mini_batch[key].extend(self[key][i : i + sequence_length])
def save_to_file(self, file_object):
"""
Saves the AgentBuffer to a file-like object.
"""
with h5py.File(file_object) as write_file:
for key, data in self.items():
write_file.create_dataset(
key, data=data, dtype="f", compression="gzip"
)
def load_from_file(self, file_object):
"""
Loads the AgentBuffer from a file-like object.
"""
with h5py.File(file_object) as read_file:
for key in list(read_file.keys()):
self[key] = Buffer.AgentBuffer.AgentBufferField()
# extend() will convert the numpy array's first dimension into list
self[key].extend(read_file[key][()])
def __init__(self):
self.update_buffer = self.AgentBuffer()
super(Buffer, self).__init__()

Resets the update buffer
"""
self.update_buffer.reset_agent()
def truncate_update_buffer(self, max_length, sequence_length=1):
"""
Truncates the update buffer to a certain length.
This can be slow for large buffers. We compensate by cutting further than we need to, so that
we're not truncating at each update. Note that we must truncate an integer number of sequence_lengths
param: max_length: The length at which to truncate the buffer.
"""
current_length = len(next(iter(self.update_buffer.values())))
# make max_length an integer number of sequence_lengths
max_length -= max_length % sequence_length
if current_length > max_length:
for _key in self.update_buffer.keys():
self.update_buffer[_key] = self.update_buffer[_key][
current_length - max_length :
]
def reset_local_buffers(self):
"""

38
ml-agents/mlagents/trainers/components/bc/module.py


n_epoch = self.num_epoch
for _ in range(n_epoch):
self.demonstration_buffer.update_buffer.shuffle()
self.demonstration_buffer.update_buffer.shuffle(
sequence_length=self.policy.sequence_length
)
for i in range(num_batches):
for i in range(num_batches // self.policy.sequence_length):
start = i * self.n_sequences
end = (i + 1) * self.n_sequences
start = i * self.n_sequences * self.policy.sequence_length
end = (i + 1) * self.n_sequences * self.policy.sequence_length
mini_batch_demo = demo_update_buffer.make_mini_batch(start, end)
run_out = self._update_batch(mini_batch_demo, self.n_sequences)
loss = run_out["loss"]

self.policy.model.batch_size: n_sequences,
self.policy.model.sequence_length: self.policy.sequence_length,
}
feed_dict[self.model.action_in_expert] = mini_batch_demo["actions"]
feed_dict[self.model.action_in_expert] = mini_batch_demo["actions"].reshape(
[-1, self.policy.model.brain.vector_action_space_size[0]]
)
feed_dict[self.model.action_in_expert] = mini_batch_demo["actions"].reshape(
[-1, len(self.policy.model.brain.vector_action_space_size)]
)
feed_dict[self.policy.model.action_masks] = np.ones(
(
self.n_sequences,

if self.policy.model.brain.vector_observation_space_size > 0:
apparent_obs_size = (
self.policy.model.brain.vector_observation_space_size
* self.policy.model.brain.num_stacked_vector_observations
)
feed_dict[self.policy.model.vector_in] = mini_batch_demo[
"vector_obs"
].reshape([-1, apparent_obs_size])
feed_dict[self.policy.model.vector_in] = mini_batch_demo["vector_obs"]
visual_obs = mini_batch_demo["visual_obs%d" % i]
if self.policy.sequence_length > 1 and self.policy.use_recurrent:
(_batch, _seq, _w, _h, _c) = visual_obs.shape
feed_dict[self.policy.model.visual_in[i]] = visual_obs.reshape(
[-1, _w, _h, _c]
)
else:
feed_dict[self.policy.model.visual_in[i]] = visual_obs
feed_dict[self.policy.model.visual_in[i]] = mini_batch_demo[
"visual_obs%d" % i
]
if self.use_recurrent:
feed_dict[self.policy.model.memory_in] = np.zeros(
[self.n_sequences, self.policy.m_size]

"prev_action"
].reshape([-1, len(self.policy.model.act_size)])
]
network_out = self.policy.sess.run(
list(self.out_dict.values()), feed_dict=feed_dict

68
ml-agents/mlagents/trainers/components/reward_signals/curiosity/signal.py


feed_dict[self.model.next_visual_in[i]] = next_info.visual_observations[i]
if self.policy.use_vec_obs:
feed_dict[self.model.next_vector_in] = next_info.vector_observations
if self.policy.use_recurrent:
if current_info.memories.shape[1] == 0:
current_info.memories = self.policy.make_empty_memory(
len(current_info.agents)
)
feed_dict[self.policy.model.memory_in] = current_info.memories
unscaled_reward = self.policy.sess.run(
self.model.intrinsic_reward, feed_dict=feed_dict
)

forward_total: List[float] = []
inverse_total: List[float] = []
for _ in range(self.num_epoch):
update_buffer.shuffle()
update_buffer.shuffle(sequence_length=self.policy.sequence_length)
buffer = update_buffer
for l in range(len(update_buffer["actions"]) // num_sequences):
start = l * num_sequences

feed_dict = {
self.policy.model.batch_size: num_sequences,
self.policy.model.sequence_length: self.policy.sequence_length,
self.policy.model.mask_input: mini_batch["masks"].flatten(),
self.policy.model.advantage: mini_batch["advantages"].reshape([-1, 1]),
self.policy.model.all_old_log_probs: mini_batch["action_probs"].reshape(
[-1, sum(self.policy.model.act_size)]
),
self.policy.model.mask_input: mini_batch["masks"],
self.policy.model.advantage: mini_batch["advantages"],
self.policy.model.all_old_log_probs: mini_batch["action_probs"],
feed_dict[self.policy.model.output_pre] = mini_batch["actions_pre"].reshape(
[-1, self.policy.model.act_size[0]]
)
feed_dict[self.policy.model.epsilon] = mini_batch[
"random_normal_epsilon"
].reshape([-1, self.policy.model.act_size[0]])
feed_dict[self.policy.model.output_pre] = mini_batch["actions_pre"]
feed_dict[self.policy.model.action_holder] = mini_batch["actions"].reshape(
[-1, len(self.policy.model.act_size)]
)
if self.policy.use_recurrent:
feed_dict[self.policy.model.prev_action] = mini_batch[
"prev_action"
].reshape([-1, len(self.policy.model.act_size)])
feed_dict[self.policy.model.action_masks] = mini_batch[
"action_mask"
].reshape([-1, sum(self.policy.brain.vector_action_space_size)])
feed_dict[self.policy.model.action_holder] = mini_batch["actions"]
feed_dict[self.policy.model.vector_in] = mini_batch["vector_obs"].reshape(
[-1, self.policy.vec_obs_size]
)
feed_dict[self.model.next_vector_in] = mini_batch["next_vector_in"].reshape(
[-1, self.policy.vec_obs_size]
)
feed_dict[self.policy.model.vector_in] = mini_batch["vector_obs"]
feed_dict[self.model.next_vector_in] = mini_batch["next_vector_in"]
_obs = mini_batch["visual_obs%d" % i]
if self.policy.sequence_length > 1 and self.policy.use_recurrent:
(_batch, _seq, _w, _h, _c) = _obs.shape
feed_dict[self.policy.model.visual_in[i]] = _obs.reshape(
[-1, _w, _h, _c]
)
else:
feed_dict[self.policy.model.visual_in[i]] = _obs
feed_dict[self.policy.model.visual_in[i]] = mini_batch[
"visual_obs%d" % i
]
_obs = mini_batch["next_visual_obs%d" % i]
if self.policy.sequence_length > 1 and self.policy.use_recurrent:
(_batch, _seq, _w, _h, _c) = _obs.shape
feed_dict[self.model.next_visual_in[i]] = _obs.reshape(
[-1, _w, _h, _c]
)
else:
feed_dict[self.model.next_visual_in[i]] = _obs
if self.policy.use_recurrent:
mem_in = mini_batch["memory"][:, 0, :]
feed_dict[self.policy.model.memory_in] = mem_in
feed_dict[self.model.next_visual_in[i]] = mini_batch[
"next_visual_obs%d" % i
]
self.has_updated = True
run_out = self.policy._execute_model(feed_dict, self.update_dict)
return run_out

5
ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py


param_keys = ["strength", "gamma"]
super().check_config(config_dict, param_keys)
def evaluate_batch(self, mini_batch: Dict[str, np.array]) -> RewardSignalResult:
env_rews = mini_batch["environment_rewards"]
return RewardSignalResult(self.strength * env_rews, env_rews)
def evaluate(
self, current_info: BrainInfo, next_info: BrainInfo
) -> RewardSignalResult:

6
ml-agents/mlagents/trainers/components/reward_signals/gail/model.py


"""
Creates the input layers for the discriminator
"""
self.done_expert = tf.placeholder(shape=[None, 1], dtype=tf.float32)
self.done_policy = tf.placeholder(shape=[None, 1], dtype=tf.float32)
self.done_expert_holder = tf.placeholder(shape=[None], dtype=tf.float32)
self.done_policy_holder = tf.placeholder(shape=[None], dtype=tf.float32)
self.done_expert = tf.expand_dims(self.done_expert_holder, -1)
self.done_policy = tf.expand_dims(self.done_policy_holder, -1)
if self.policy_model.brain.vector_action_space_type == "continuous":
action_length = self.policy_model.act_size[0]

62
ml-agents/mlagents/trainers/components/reward_signals/gail/signal.py


feed_dict[
self.policy.model.action_holder
] = next_info.previous_vector_actions
if self.policy.use_recurrent:
if current_info.memories.shape[1] == 0:
current_info.memories = self.policy.make_empty_memory(
len(current_info.agents)
)
feed_dict[self.policy.model.memory_in] = current_info.memories
unscaled_reward = self.policy.sess.run(
self.model.intrinsic_reward, feed_dict=feed_dict
)

n_epoch = self.num_epoch
for _epoch in range(n_epoch):
self.demonstration_buffer.update_buffer.shuffle()
update_buffer.shuffle()
self.demonstration_buffer.update_buffer.shuffle(
sequence_length=self.policy.sequence_length
)
update_buffer.shuffle(sequence_length=self.policy.sequence_length)
if max_batches == 0:
num_batches = possible_batches
else:

:return: Output from update process.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.model.done_expert: mini_batch_demo["done"].reshape([-1, 1]),
self.model.done_policy: mini_batch_policy["done"].reshape([-1, 1]),
self.model.done_expert_holder: mini_batch_demo["done"],
self.model.done_policy_holder: mini_batch_policy["done"],
feed_dict[self.model.action_in_expert] = np.array(mini_batch_demo["actions"])
feed_dict[self.policy.model.selected_actions] = mini_batch_policy[
"actions"
].reshape([-1, self.policy.model.act_size[0]])
feed_dict[self.model.action_in_expert] = mini_batch_demo["actions"].reshape(
[-1, self.policy.model.act_size[0]]
)
feed_dict[self.policy.model.selected_actions] = mini_batch_policy["actions"]
feed_dict[self.policy.model.action_holder] = mini_batch_policy[
"actions"
].reshape([-1, len(self.policy.model.act_size)])
feed_dict[self.model.action_in_expert] = mini_batch_demo["actions"].reshape(
[-1, len(self.policy.model.act_size)]
)
feed_dict[self.policy.model.action_holder] = mini_batch_policy["actions"]
policy_obs = mini_batch_policy["visual_obs%d" % i]
if self.policy.sequence_length > 1 and self.policy.use_recurrent:
(_batch, _seq, _w, _h, _c) = policy_obs.shape
feed_dict[self.policy.model.visual_in[i]] = policy_obs.reshape(
[-1, _w, _h, _c]
)
else:
feed_dict[self.policy.model.visual_in[i]] = policy_obs
demo_obs = mini_batch_demo["visual_obs%d" % i]
if self.policy.sequence_length > 1 and self.policy.use_recurrent:
(_batch, _seq, _w, _h, _c) = demo_obs.shape
feed_dict[self.model.expert_visual_in[i]] = demo_obs.reshape(
[-1, _w, _h, _c]
)
else:
feed_dict[self.model.expert_visual_in[i]] = demo_obs
feed_dict[self.policy.model.visual_in[i]] = mini_batch_policy[
"visual_obs%d" % i
]
feed_dict[self.model.expert_visual_in[i]] = mini_batch_demo[
"visual_obs%d" % i
]
feed_dict[self.policy.model.vector_in] = mini_batch_policy[
"vector_obs"
].reshape([-1, self.policy.vec_obs_size])
feed_dict[self.model.obs_in_expert] = mini_batch_demo["vector_obs"].reshape(
[-1, self.policy.vec_obs_size]
)
feed_dict[self.policy.model.vector_in] = mini_batch_policy["vector_obs"]
feed_dict[self.model.obs_in_expert] = mini_batch_demo["vector_obs"]
out_dict = {
"gail_loss": self.model.loss,

55
ml-agents/mlagents/trainers/learn.py


from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.exception import TrainerError
from mlagents.trainers import MetaCurriculumError, MetaCurriculum
from mlagents.trainers.trainer_util import initialize_trainers
from mlagents.envs import UnityEnvironment
from mlagents.envs.sampler_class import SamplerManager
from mlagents.envs.exception import UnityEnvironmentException, SamplerException

lesson = int(run_options["--lesson"])
fast_simulation = not bool(run_options["--slow"])
no_graphics = run_options["--no-graphics"]
multi_gpu = run_options["--multi-gpu"]
trainer_config_path = run_options["<trainer-config-path>"]
sampler_file_path = (
run_options["--sampler"] if run_options["--sampler"] != "None" else None

base_port + (sub_id * num_envs),
)
env = SubprocessEnvManager(env_factory, num_envs)
maybe_meta_curriculum = try_create_meta_curriculum(curriculum_folder, env)
maybe_meta_curriculum = try_create_meta_curriculum(curriculum_folder, env, lesson)
trainers = initialize_trainers(
trainer_config,
env.external_brains,
summaries_dir,
run_id,
model_path,
keep_checkpoints,
train_model,
load_model,
run_seed,
maybe_meta_curriculum,
multi_gpu,
)
trainers,
load_model,
keep_checkpoints,
lesson,
run_seed,
fast_simulation,
sampler_manager,

process_queue.put(True)
# Begin training
tc.start_learning(env, trainer_config)
tc.start_learning(env)
def create_sampler_manager(sampler_file_path, env_reset_params, run_seed=None):

sampler_config = load_config(sampler_file_path)
if ("resampling-interval") in sampler_config:
if "resampling-interval" in sampler_config:
# Filter arguments that do not exist in the environment
resample_interval = sampler_config.pop("resampling-interval")
if (resample_interval <= 0) or (not isinstance(resample_interval, int)):

def try_create_meta_curriculum(
curriculum_folder: Optional[str], env: SubprocessEnvManager
curriculum_folder: Optional[str], env: SubprocessEnvManager, lesson: int
if meta_curriculum:
for brain_name in meta_curriculum.brains_to_curriculums.keys():
if brain_name not in env.external_brains.keys():
raise MetaCurriculumError(
"One of the curricula "
"defined in " + curriculum_folder + " "
"does not have a corresponding "
"Brain. Check that the "
"curriculum file has the same "
"name as the Brain "
"whose curriculum it defines."
)
# TODO: Should be able to start learning at different lesson numbers
# for each curriculum.
meta_curriculum.set_all_curriculums_to_lesson_num(lesson)
for brain_name in meta_curriculum.brains_to_curriculums.keys():
if brain_name not in env.external_brains.keys():
raise MetaCurriculumError(
"One of the curricula "
"defined in " + curriculum_folder + " "
"does not have a corresponding "
"Brain. Check that the "
"curriculum file has the same "
"name as the Brain "
"whose curriculum it defines."
)
return meta_curriculum

--docker-target-name=<dt> Docker volume to store training-specific files [default: None].
--no-graphics Whether to run the environment in no-graphics mode [default: False].
--debug Whether to run ML-Agents in debug mode with detailed logging [default: False].
--multi-gpu Whether to use multiple GPU training [default: False].
"""
options = docopt(_USAGE)

258
ml-agents/mlagents/trainers/models.py


import logging
from enum import Enum
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, List
import numpy as np
import tensorflow as tf

ActivationFunction = Callable[[tf.Tensor], tf.Tensor]
EPSILON = 1e-7
class EncoderType(Enum):

:param all_logits: The concatenated unnormalized action probabilities for all branches
:param action_masks: The mask for the logits. Must be of dimension [None x total_number_of_action]
:param action_size: A list containing the number of possible actions for each branch
:return: The action output dimension [batch_size, num_branches] and the concatenated normalized logits
:return: The action output dimension [batch_size, num_branches], the concatenated
normalized probs (after softmax)
and the concatenated normalized log probs
"""
action_idx = [0] + list(np.cumsum(action_size))
branches_logits = [

for i in range(len(action_size))
]
raw_probs = [
tf.multiply(tf.nn.softmax(branches_logits[k]) + 1.0e-10, branch_masks[k])
tf.multiply(tf.nn.softmax(branches_logits[k]) + EPSILON, branch_masks[k])
for k in range(len(action_size))
]
normalized_probs = [

output = tf.concat(
[
tf.multinomial(tf.log(normalized_probs[k]), 1)
tf.multinomial(tf.log(normalized_probs[k] + EPSILON), 1)
for k in range(len(action_size))
],
axis=1,

tf.concat([normalized_probs[k] for k in range(len(action_size))], axis=1),
tf.log(normalized_probs[k] + 1.0e-10)
tf.log(normalized_probs[k] + EPSILON)
for k in range(len(action_size))
],
axis=1,

h_size: int,
num_layers: int,
vis_encode_type: EncoderType = EncoderType.SIMPLE,
stream_scopes: List[str] = None,
) -> tf.Tensor:
"""
Creates encoding stream for observations.

:param stream_scopes: List of strings (length == num_streams), which contains
the scopes for each of the streams. None if all under the same TF scope.
:return: List of encoded streams.
"""
brain = self.brain

for i in range(num_streams):
visual_encoders = []
hidden_state, hidden_visual = None, None
_scope_add = stream_scopes[i] if stream_scopes else ""
if self.vis_obs_size > 0:
if vis_encode_type == EncoderType.RESNET:
for j in range(brain.number_visual_observations):

activation_fn,
num_layers,
"main_graph_{}_encoder{}".format(i, j),
_scope_add + "main_graph_{}_encoder{}".format(i, j),
False,
)
visual_encoders.append(encoded_visual)

h_size,
activation_fn,
num_layers,
"main_graph_{}_encoder{}".format(i, j),
_scope_add + "main_graph_{}_encoder{}".format(i, j),
False,
)
visual_encoders.append(encoded_visual)

h_size,
activation_fn,
num_layers,
"main_graph_{}_encoder{}".format(i, j),
_scope_add + "main_graph_{}_encoder{}".format(i, j),
False,
)
visual_encoders.append(encoded_visual)

h_size,
activation_fn,
num_layers,
"main_graph_{}".format(i),
_scope_add + "main_graph_{}".format(i),
False,
)
if hidden_state is not None and hidden_visual is not None:

value = tf.layers.dense(hidden_input, 1, name="{}_value".format(name))
self.value_heads[name] = value
self.value = tf.reduce_mean(list(self.value_heads.values()), 0)
def create_cc_actor_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Continuous control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
"""
hidden_streams = self.create_observation_streams(
2, h_size, num_layers, vis_encode_type
)
if self.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
_half_point = int(self.m_size / 2)
hidden_policy, memory_policy_out = self.create_recurrent_encoder(
hidden_streams[0],
self.memory_in[:, :_half_point],
self.sequence_length,
name="lstm_policy",
)
hidden_value, memory_value_out = self.create_recurrent_encoder(
hidden_streams[1],
self.memory_in[:, _half_point:],
self.sequence_length,
name="lstm_value",
)
self.memory_out = tf.concat(
[memory_policy_out, memory_value_out], axis=1, name="recurrent_out"
)
else:
hidden_policy = hidden_streams[0]
hidden_value = hidden_streams[1]
mu = tf.layers.dense(
hidden_policy,
self.act_size[0],
activation=None,
kernel_initializer=c_layers.variance_scaling_initializer(factor=0.01),
)
self.log_sigma_sq = tf.get_variable(
"log_sigma_squared",
[self.act_size[0]],
dtype=tf.float32,
initializer=tf.zeros_initializer(),
)
sigma_sq = tf.exp(self.log_sigma_sq)
self.epsilon = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="epsilon"
)
# Clip and scale output to ensure actions are always within [-1, 1] range.
self.output_pre = mu + tf.sqrt(sigma_sq) * self.epsilon
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3
self.output = tf.identity(output_post, name="action")
self.selected_actions = tf.stop_gradient(output_post)
# Compute probability of model output.
all_probs = (
-0.5 * tf.square(tf.stop_gradient(self.output_pre) - mu) / sigma_sq
- 0.5 * tf.log(2.0 * np.pi)
- 0.5 * self.log_sigma_sq
)
self.all_log_probs = tf.identity(all_probs, name="action_probs")
self.entropy = 0.5 * tf.reduce_mean(
tf.log(2 * np.pi * np.e) + self.log_sigma_sq
)
self.create_value_heads(self.stream_names, hidden_value)
self.all_old_log_probs = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="old_probabilities"
)
# We keep these tensors the same name, but use new nodes to keep code parallelism with discrete control.
self.log_probs = tf.reduce_sum(
(tf.identity(self.all_log_probs)), axis=1, keepdims=True
)
self.old_log_probs = tf.reduce_sum(
(tf.identity(self.all_old_log_probs)), axis=1, keepdims=True
)
def create_dc_actor_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Discrete control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
"""
hidden_streams = self.create_observation_streams(
1, h_size, num_layers, vis_encode_type
)
hidden = hidden_streams[0]
if self.use_recurrent:
self.prev_action = tf.placeholder(
shape=[None, len(self.act_size)], dtype=tf.int32, name="prev_action"
)
prev_action_oh = tf.concat(
[
tf.one_hot(self.prev_action[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
hidden = tf.concat([hidden, prev_action_oh], axis=1)
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden, memory_out = self.create_recurrent_encoder(
hidden, self.memory_in, self.sequence_length
)
self.memory_out = tf.identity(memory_out, name="recurrent_out")
policy_branches = []
for size in self.act_size:
policy_branches.append(
tf.layers.dense(
hidden,
size,
activation=None,
use_bias=False,
kernel_initializer=c_layers.variance_scaling_initializer(
factor=0.01
),
)
)
self.all_log_probs = tf.concat(
[branch for branch in policy_branches], axis=1, name="action_probs"
)
self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
output, normalized_logits = self.create_discrete_action_masking_layer(
self.all_log_probs, self.action_masks, self.act_size
)
self.output = tf.identity(output)
self.normalized_logits = tf.identity(normalized_logits, name="action")
self.create_value_heads(self.stream_names, hidden)
self.action_holder = tf.placeholder(
shape=[None, len(policy_branches)], dtype=tf.int32, name="action_holder"
)
self.action_oh = tf.concat(
[
tf.one_hot(self.action_holder[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
self.selected_actions = tf.stop_gradient(self.action_oh)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="old_probabilities"
)
_, old_normalized_logits = self.create_discrete_action_masking_layer(
self.all_old_log_probs, self.action_masks, self.act_size
)
action_idx = [0] + list(np.cumsum(self.act_size))
self.entropy = tf.reduce_sum(
(
tf.stack(
[
tf.nn.softmax_cross_entropy_with_logits_v2(
labels=tf.nn.softmax(
self.all_log_probs[:, action_idx[i] : action_idx[i + 1]]
),
logits=self.all_log_probs[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
)
self.log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.action_oh[:, action_idx[i] : action_idx[i + 1]],
logits=normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)
self.old_log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.action_oh[:, action_idx[i] : action_idx[i + 1]],
logits=old_normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)

241
ml-agents/mlagents/trainers/ppo/models.py


max_step,
)
def create_cc_actor_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Continuous control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
"""
hidden_streams = self.create_observation_streams(
2, h_size, num_layers, vis_encode_type
)
if self.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
_half_point = int(self.m_size / 2)
hidden_policy, memory_policy_out = self.create_recurrent_encoder(
hidden_streams[0],
self.memory_in[:, :_half_point],
self.sequence_length,
name="lstm_policy",
)
hidden_value, memory_value_out = self.create_recurrent_encoder(
hidden_streams[1],
self.memory_in[:, _half_point:],
self.sequence_length,
name="lstm_value",
)
self.memory_out = tf.concat(
[memory_policy_out, memory_value_out], axis=1, name="recurrent_out"
)
else:
hidden_policy = hidden_streams[0]
hidden_value = hidden_streams[1]
mu = tf.layers.dense(
hidden_policy,
self.act_size[0],
activation=None,
kernel_initializer=LearningModel.scaled_init(0.01),
)
self.log_sigma_sq = tf.get_variable(
"log_sigma_squared",
[self.act_size[0]],
dtype=tf.float32,
initializer=tf.zeros_initializer(),
)
sigma_sq = tf.exp(self.log_sigma_sq)
self.epsilon = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="epsilon"
)
# Clip and scale output to ensure actions are always within [-1, 1] range.
self.output_pre = mu + tf.sqrt(sigma_sq) * self.epsilon
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3
self.output = tf.identity(output_post, name="action")
self.selected_actions = tf.stop_gradient(output_post)
# Compute probability of model output.
all_probs = (
-0.5 * tf.square(tf.stop_gradient(self.output_pre) - mu) / sigma_sq
- 0.5 * tf.log(2.0 * np.pi)
- 0.5 * self.log_sigma_sq
)
self.all_log_probs = tf.identity(all_probs, name="action_probs")
self.entropy = 0.5 * tf.reduce_mean(
tf.log(2 * np.pi * np.e) + self.log_sigma_sq
)
self.create_value_heads(self.stream_names, hidden_value)
self.all_old_log_probs = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="old_probabilities"
)
# We keep these tensors the same name, but use new nodes to keep code parallelism with discrete control.
self.log_probs = tf.reduce_sum(
(tf.identity(self.all_log_probs)), axis=1, keepdims=True
)
self.old_log_probs = tf.reduce_sum(
(tf.identity(self.all_old_log_probs)), axis=1, keepdims=True
)
def create_dc_actor_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Discrete control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
"""
hidden_streams = self.create_observation_streams(
1, h_size, num_layers, vis_encode_type
)
hidden = hidden_streams[0]
if self.use_recurrent:
self.prev_action = tf.placeholder(
shape=[None, len(self.act_size)], dtype=tf.int32, name="prev_action"
)
prev_action_oh = tf.concat(
[
tf.one_hot(self.prev_action[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
hidden = tf.concat([hidden, prev_action_oh], axis=1)
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden, memory_out = self.create_recurrent_encoder(
hidden, self.memory_in, self.sequence_length
)
self.memory_out = tf.identity(memory_out, name="recurrent_out")
policy_branches = []
for size in self.act_size:
policy_branches.append(
tf.layers.dense(
hidden,
size,
activation=None,
use_bias=False,
kernel_initializer=LearningModel.scaled_init(0.01),
)
)
self.all_log_probs = tf.concat(
[branch for branch in policy_branches], axis=1, name="action_probs"
)
self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
output, _, normalized_logits = self.create_discrete_action_masking_layer(
self.all_log_probs, self.action_masks, self.act_size
)
self.output = tf.identity(output)
self.normalized_logits = tf.identity(normalized_logits, name="action")
self.create_value_heads(self.stream_names, hidden)
self.action_holder = tf.placeholder(
shape=[None, len(policy_branches)], dtype=tf.int32, name="action_holder"
)
self.action_oh = tf.concat(
[
tf.one_hot(self.action_holder[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
self.selected_actions = tf.stop_gradient(self.action_oh)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="old_probabilities"
)
_, _, old_normalized_logits = self.create_discrete_action_masking_layer(
self.all_old_log_probs, self.action_masks, self.act_size
)
action_idx = [0] + list(np.cumsum(self.act_size))
self.entropy = tf.reduce_sum(
(
tf.stack(
[
tf.nn.softmax_cross_entropy_with_logits_v2(
labels=tf.nn.softmax(
self.all_log_probs[:, action_idx[i] : action_idx[i + 1]]
),
logits=self.all_log_probs[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
)
self.log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.action_oh[:, action_idx[i] : action_idx[i + 1]],
logits=normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)
self.old_log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.action_oh[:, action_idx[i] : action_idx[i + 1]],
logits=old_normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)
def create_losses(
self, probs, old_probs, value_heads, entropy, beta, epsilon, lr, max_step
):

self.returns_holders[name] = returns_holder
self.old_values[name] = old_value
self.advantage = tf.placeholder(
shape=[None, 1], dtype=tf.float32, name="advantages"
shape=[None], dtype=tf.float32, name="advantages"
advantage = tf.expand_dims(self.advantage, -1)
self.learning_rate = tf.train.polynomial_decay(
lr, self.global_step, max_step, 1e-10, power=1.0
)

self.value_loss = tf.reduce_mean(value_losses)
r_theta = tf.exp(probs - old_probs)
p_opt_a = r_theta * self.advantage
p_opt_a = r_theta * advantage
* self.advantage
* advantage
)
self.policy_loss = -tf.reduce_mean(
tf.dynamic_partition(tf.minimum(p_opt_a, p_opt_b), self.mask, 2)[1]

)
def create_ppo_optimizer(self):
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
self.update_batch = optimizer.minimize(self.loss)
self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)
self.grads = self.optimizer.compute_gradients(self.loss)
self.update_batch = self.optimizer.minimize(self.loss)

144
ml-agents/mlagents/trainers/ppo/policy.py


reward_signal_configs = trainer_params["reward_signals"]
self.create_model(brain, trainer_params, reward_signal_configs, seed)
self.model = PPOModel(
brain,
lr=float(trainer_params["learning_rate"]),
h_size=int(trainer_params["hidden_units"]),
epsilon=float(trainer_params["epsilon"]),
beta=float(trainer_params["beta"]),
max_step=float(trainer_params["max_steps"]),
normalize=trainer_params["normalize"],
use_recurrent=trainer_params["use_recurrent"],
num_layers=int(trainer_params["num_layers"]),
m_size=self.m_size,
seed=seed,
stream_names=list(reward_signal_configs.keys()),
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
self.model.create_ppo_optimizer()
# Create reward signals
for reward_signal, config in reward_signal_configs.items():
self.reward_signals[reward_signal] = create_reward_signal(

self.inference_dict = {
"action": self.model.output,
"log_probs": self.model.all_log_probs,
"value": self.model.value_heads,
"value": self.model.value,
"value_heads": self.model.value_heads,
"entropy": self.model.entropy,
"learning_rate": self.model.learning_rate,
}

"update_batch": self.model.update_batch,
}
def create_model(self, brain, trainer_params, reward_signal_configs, seed):
"""
Create PPO model
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param reward_signal_configs: Reward signal config
:param seed: Random seed.
"""
with self.graph.as_default():
self.model = PPOModel(
brain=brain,
lr=float(trainer_params["learning_rate"]),
h_size=int(trainer_params["hidden_units"]),
epsilon=float(trainer_params["epsilon"]),
beta=float(trainer_params["beta"]),
max_step=float(trainer_params["max_steps"]),
normalize=trainer_params["normalize"],
use_recurrent=trainer_params["use_recurrent"],
num_layers=int(trainer_params["num_layers"]),
m_size=self.m_size,
seed=seed,
stream_names=list(reward_signal_configs.keys()),
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
self.model.create_ppo_optimizer()
@timed
def evaluate(self, brain_info):
"""

:param mini_batch: Experience batch.
:return: Output from update process.
"""
feed_dict = self.construct_feed_dict(self.model, mini_batch, num_sequences)
run_out = self._execute_model(feed_dict, self.update_dict)
return run_out
def construct_feed_dict(self, model, mini_batch, num_sequences):
self.model.mask_input: mini_batch["masks"].flatten(),
self.model.advantage: mini_batch["advantages"].reshape([-1, 1]),
self.model.all_old_log_probs: mini_batch["action_probs"].reshape(
[-1, sum(self.model.act_size)]
),
self.model.mask_input: mini_batch["masks"],
self.model.advantage: mini_batch["advantages"],
self.model.all_old_log_probs: mini_batch["action_probs"],
feed_dict[self.model.returns_holders[name]] = mini_batch[
feed_dict[model.returns_holders[name]] = mini_batch[
].flatten()
feed_dict[self.model.old_values[name]] = mini_batch[
]
feed_dict[model.old_values[name]] = mini_batch[
].flatten()
]
feed_dict[self.model.output_pre] = mini_batch["actions_pre"].reshape(
[-1, self.model.act_size[0]]
)
feed_dict[self.model.epsilon] = mini_batch["random_normal_epsilon"].reshape(
[-1, self.model.act_size[0]]
)
feed_dict[model.output_pre] = mini_batch["actions_pre"]
feed_dict[model.epsilon] = mini_batch["random_normal_epsilon"]
feed_dict[self.model.action_holder] = mini_batch["actions"].reshape(
[-1, len(self.model.act_size)]
)
feed_dict[model.action_holder] = mini_batch["actions"]
feed_dict[self.model.prev_action] = mini_batch["prev_action"].reshape(
[-1, len(self.model.act_size)]
)
feed_dict[self.model.action_masks] = mini_batch["action_mask"].reshape(
[-1, sum(self.brain.vector_action_space_size)]
)
feed_dict[model.prev_action] = mini_batch["prev_action"]
feed_dict[model.action_masks] = mini_batch["action_mask"]
feed_dict[self.model.vector_in] = mini_batch["vector_obs"].reshape(
[-1, self.vec_obs_size]
)
feed_dict[model.vector_in] = mini_batch["vector_obs"]
_obs = mini_batch["visual_obs%d" % i]
if self.sequence_length > 1 and self.use_recurrent:
(_batch, _seq, _w, _h, _c) = _obs.shape
feed_dict[self.model.visual_in[i]] = _obs.reshape([-1, _w, _h, _c])
else:
feed_dict[self.model.visual_in[i]] = _obs
feed_dict[model.visual_in[i]] = mini_batch["visual_obs%d" % i]
mem_in = mini_batch["memory"][:, 0, :]
feed_dict[self.model.memory_in] = mem_in
run_out = self._execute_model(feed_dict, self.update_dict)
return run_out
mem_in = [
mini_batch["memory"][i]
for i in range(0, len(mini_batch["memory"]), self.sequence_length)
]
feed_dict[model.memory_in] = mem_in
return feed_dict
def get_value_estimates(
self, brain_info: BrainInfo, idx: int, done: bool

brain_info.memories = self.make_empty_memory(len(brain_info.agents))
feed_dict[self.model.memory_in] = [brain_info.memories[idx]]
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = brain_info.previous_vector_actions[
idx
].reshape([-1, len(self.model.act_size)])
feed_dict[self.model.prev_action] = [
brain_info.previous_vector_actions[idx]
]
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}

value_estimates[k] = 0.0
return value_estimates
def get_action(self, brain_info: BrainInfo) -> ActionInfo:
"""
Decides actions given observations information, and takes them in environment.
:param brain_info: A dictionary of brain names and BrainInfo from environment.
:return: an ActionInfo containing action, memories, values and an object
to be passed to add experiences
"""
if len(brain_info.agents) == 0:
return ActionInfo([], [], [], None, None)
run_out = self.evaluate(brain_info)
mean_values = np.mean(
np.array(list(run_out.get("value").values())), axis=0
).flatten()
return ActionInfo(
action=run_out.get("action"),
memory=run_out.get("memory_out"),
text=None,
value=mean_values,
outputs=run_out,
)

334
ml-agents/mlagents/trainers/ppo/trainer.py


import logging
from collections import defaultdict
from typing import List, Any
from typing import List, Any, Dict
import numpy as np

from mlagents.trainers.trainer import Trainer, UnityTrainerException
from mlagents.trainers.ppo.multi_gpu_policy import MultiGpuPPOPolicy, get_devices
from mlagents.trainers.trainer import UnityTrainerException
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.components.reward_signals import RewardSignalResult
class PPOTrainer(Trainer):
class PPOTrainer(RLTrainer):
self, brain, reward_buff_cap, trainer_parameters, training, load, seed, run_id
self,
brain,
reward_buff_cap,
trainer_parameters,
training,
load,
seed,
run_id,
multi_gpu,
):
"""
Responsible for collecting experiences and training PPO model.

:param seed: The seed the model will be initialized with
:param run_id: The identifier of the current run
"""
super().__init__(brain, trainer_parameters, training, run_id, reward_buff_cap)
super(PPOTrainer, self).__init__(
brain, trainer_parameters, training, run_id, reward_buff_cap
)
self.param_keys = [
"batch_size",
"beta",

]
self.check_param_keys()
# Make sure we have at least one reward_signal
if not self.trainer_parameters["reward_signals"]:
raise UnityTrainerException(
"No reward signals were defined. At least one must be used with {}.".format(
self.__class__.__name__
)
if multi_gpu and len(get_devices()) > 1:
self.policy = MultiGpuPPOPolicy(
seed, brain, trainer_parameters, self.is_training, load
)
else:
self.policy = PPOPolicy(
seed, brain, trainer_parameters, self.is_training, load
self.step = 0
self.policy = PPOPolicy(seed, brain, trainer_parameters, self.is_training, load)
stats = defaultdict(list)
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self.collected_rewards = {"environment": {}}
self.stats = stats
self.training_buffer = Buffer()
self.episode_steps = {}
def __str__(self):
return """Hyperparameters for the {0} of brain {1}: \n{2}""".format(
self.__class__.__name__,
self.brain_name,
self.dict_to_str(self.trainer_parameters, 0),
)
@property
def parameters(self):
"""
Returns the trainer parameters of the trainer.
"""
return self.trainer_parameters
@property
def get_max_steps(self):
"""
Returns the maximum number of steps. Is used to know when the trainer should be stopped.
:return: The maximum number of steps of the trainer
"""
return float(self.trainer_parameters["max_steps"])
@property
def get_step(self):
"""
Returns the number of steps the trainer has performed
:return: the step count of the trainer
"""
return self.step
def increment_step(self, n_steps: int) -> None:
"""
Increment the step count of the trainer
:param n_steps: number of steps to increment the step count by
"""
self.step = self.policy.increment_step(n_steps)
def construct_curr_info(self, next_info: BrainInfo) -> BrainInfo:
"""
Constructs a BrainInfo which contains the most recent previous experiences for all agents
which correspond to the agents in a provided next_info.
:BrainInfo next_info: A t+1 BrainInfo.
:return: curr_info: Reconstructed BrainInfo to match agents of next_info.
"""
visual_observations: List[List[Any]] = [
[]
] # TODO add types to brain.py methods
vector_observations = []
text_observations = []
memories = []
rewards = []
local_dones = []
max_reacheds = []
agents = []
prev_vector_actions = []
prev_text_actions = []
action_masks = []
for agent_id in next_info.agents:
agent_brain_info = self.training_buffer[agent_id].last_brain_info
if agent_brain_info is None:
agent_brain_info = next_info
agent_index = agent_brain_info.agents.index(agent_id)
for i in range(len(next_info.visual_observations)):
visual_observations[i].append(
agent_brain_info.visual_observations[i][agent_index]
)
vector_observations.append(
agent_brain_info.vector_observations[agent_index]
)
text_observations.append(agent_brain_info.text_observations[agent_index])
if self.policy.use_recurrent:
if len(agent_brain_info.memories) > 0:
memories.append(agent_brain_info.memories[agent_index])
else:
memories.append(self.policy.make_empty_memory(1))
rewards.append(agent_brain_info.rewards[agent_index])
local_dones.append(agent_brain_info.local_done[agent_index])
max_reacheds.append(agent_brain_info.max_reached[agent_index])
agents.append(agent_brain_info.agents[agent_index])
prev_vector_actions.append(
agent_brain_info.previous_vector_actions[agent_index]
)
prev_text_actions.append(
agent_brain_info.previous_text_actions[agent_index]
)
action_masks.append(agent_brain_info.action_masks[agent_index])
if self.policy.use_recurrent:
memories = np.vstack(memories)
curr_info = BrainInfo(
visual_observations,
vector_observations,
text_observations,
memories,
rewards,
agents,
local_dones,
prev_vector_actions,
prev_text_actions,
max_reacheds,
action_masks,
)
return curr_info
def add_experiences(
self,
curr_all_info: AllBrainInfo,
next_all_info: AllBrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param next_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
self.trainer_metrics.start_experience_collection_timer()
if take_action_outputs:
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean())
self.stats["Policy/Learning Rate"].append(
take_action_outputs["learning_rate"]
)
for name, signal in self.policy.reward_signals.items():
self.stats[signal.value_name].append(
np.mean(take_action_outputs["value"][name])
)
curr_info = curr_all_info[self.brain_name]
next_info = next_all_info[self.brain_name]
for agent_id in curr_info.agents:
self.training_buffer[agent_id].last_brain_info = curr_info
self.training_buffer[
agent_id
].last_take_action_outputs = take_action_outputs
if curr_info.agents != next_info.agents:
curr_to_use = self.construct_curr_info(next_info)
else:
curr_to_use = curr_info
tmp_rewards_dict = {}
for name, signal in self.policy.reward_signals.items():
tmp_rewards_dict[name] = signal.evaluate(curr_to_use, next_info)
for agent_id in next_info.agents:
stored_info = self.training_buffer[agent_id].last_brain_info
stored_take_action_outputs = self.training_buffer[
agent_id
].last_take_action_outputs
if stored_info is not None:
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
self.training_buffer[agent_id]["visual_obs%d" % i].append(
stored_info.visual_observations[i][idx]
)
self.training_buffer[agent_id]["next_visual_obs%d" % i].append(
next_info.visual_observations[i][next_idx]
)
if self.policy.use_vec_obs:
self.training_buffer[agent_id]["vector_obs"].append(
stored_info.vector_observations[idx]
)
self.training_buffer[agent_id]["next_vector_in"].append(
next_info.vector_observations[next_idx]
)
if self.policy.use_recurrent:
if stored_info.memories.shape[1] == 0:
stored_info.memories = np.zeros(
(len(stored_info.agents), self.policy.m_size)
)
self.training_buffer[agent_id]["memory"].append(
stored_info.memories[idx]
)
actions = stored_take_action_outputs["action"]
if self.policy.use_continuous_act:
actions_pre = stored_take_action_outputs["pre_action"]
self.training_buffer[agent_id]["actions_pre"].append(
actions_pre[idx]
)
epsilons = stored_take_action_outputs["random_normal_epsilon"]
self.training_buffer[agent_id]["random_normal_epsilon"].append(
epsilons[idx]
)
else:
self.training_buffer[agent_id]["action_mask"].append(
stored_info.action_masks[idx], padding_value=1
)
a_dist = stored_take_action_outputs["log_probs"]
# value is a dictionary from name of reward to value estimate of the value head
value = stored_take_action_outputs["value"]
self.training_buffer[agent_id]["actions"].append(actions[idx])
self.training_buffer[agent_id]["prev_action"].append(
stored_info.previous_vector_actions[idx]
)
self.training_buffer[agent_id]["masks"].append(1.0)
self.training_buffer[agent_id]["done"].append(
next_info.local_done[next_idx]
)
for name, reward_result in tmp_rewards_dict.items():
# 0 because we use the scaled reward to train the agent
self.training_buffer[agent_id][
"{}_rewards".format(name)
].append(reward_result.scaled_reward[next_idx])
self.training_buffer[agent_id][
"{}_value_estimates".format(name)
].append(value[name][idx][0])
self.training_buffer[agent_id]["action_probs"].append(a_dist[idx])
for name, rewards in self.collected_rewards.items():
if agent_id not in rewards:
rewards[agent_id] = 0
if name == "environment":
# Report the reward from the environment
rewards[agent_id] += np.array(next_info.rewards)[next_idx]
else:
# Report the reward signals
rewards[agent_id] += tmp_rewards_dict[name].scaled_reward[
next_idx
]
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
self.trainer_metrics.end_experience_collection_timer()
def process_experiences(
self, current_info: AllBrainInfo, new_info: AllBrainInfo
) -> None:

].append(rewards.get(agent_id, 0))
rewards[agent_id] = 0
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
"""
actions = take_action_outputs["action"]
if self.policy.use_continuous_act:
actions_pre = take_action_outputs["pre_action"]
self.training_buffer[agent_id]["actions_pre"].append(actions_pre[agent_idx])
epsilons = take_action_outputs["random_normal_epsilon"]
self.training_buffer[agent_id]["random_normal_epsilon"].append(
epsilons[agent_idx]
)
a_dist = take_action_outputs["log_probs"]
# value is a dictionary from name of reward to value estimate of the value head
self.training_buffer[agent_id]["actions"].append(actions[agent_idx])
self.training_buffer[agent_id]["action_probs"].append(a_dist[agent_idx])
def add_rewards_outputs(
self,
value: Dict[str, Any],
rewards_dict: Dict[str, RewardSignalResult],
agent_id: str,
agent_idx: int,
agent_next_idx: int,
) -> None:
"""
Takes the value output of the last action and store it into the training buffer.
"""
for name, reward_result in rewards_dict.items():
# 0 because we use the scaled reward to train the agent
self.training_buffer[agent_id]["{}_rewards".format(name)].append(
reward_result.scaled_reward[agent_idx]
)
self.training_buffer[agent_id]["{}_value_estimates".format(name)].append(
value[name][agent_next_idx][0]
)
def end_episode(self):
"""
A signal that the Episode has ended. The buffer must be reset.

:return: A boolean corresponding to whether or not update_model() can be run
"""
size_of_buffer = len(self.training_buffer.update_buffer["actions"])
return size_of_buffer > max(
int(self.trainer_parameters["buffer_size"] / self.policy.sequence_length), 1
)
return size_of_buffer > self.trainer_parameters["buffer_size"]
def update_policy(self):
"""

mean_return=float(np.mean(self.cumulative_returns_since_policy_update)),
)
self.cumulative_returns_since_policy_update = []
batch_size = self.trainer_parameters["batch_size"]
n_sequences = max(
int(self.trainer_parameters["batch_size"] / self.policy.sequence_length), 1
)

)
num_epoch = self.trainer_parameters["num_epoch"]
for _ in range(num_epoch):
self.training_buffer.update_buffer.shuffle()
self.training_buffer.update_buffer.shuffle(
sequence_length=self.policy.sequence_length
)
len(self.training_buffer.update_buffer["actions"]) // n_sequences
0, len(self.training_buffer.update_buffer["actions"]), batch_size
start = l * n_sequences
end = (l + 1) * n_sequences
buffer.make_mini_batch(start, end), n_sequences
buffer.make_mini_batch(l, l + batch_size), n_sequences
)
value_total.append(run_out["value_loss"])
policy_total.append(np.abs(run_out["policy_loss"]))

14
ml-agents/mlagents/trainers/tests/mock_brain.py


camrez = {"blackAndWhite": False, "height": 84, "width": 84}
mock_brain.return_value.camera_resolutions = [camrez] * number_visual_observations
mock_brain.return_value.vector_action_space_size = vector_action_space_size
mock_brain.return_value.brain_name = "MockBrain"
return mock_brain()

num_vis_observations=0,
num_vector_acts=2,
discrete=False,
num_discrete_branches=1,
):
"""
Creates a mock BrainInfo with observations. Imitates constant

)
if discrete:
mock_braininfo.return_value.previous_vector_actions = np.array(
num_agents * [1 * [0.5]]
num_agents * [num_discrete_branches * [0.5]]
)
mock_braininfo.return_value.action_masks = np.array(
num_agents * [num_vector_acts * [1.0]]

mock_braininfo.return_value.rewards = num_agents * [1.0]
mock_braininfo.return_value.local_done = num_agents * [False]
mock_braininfo.return_value.text_observations = num_agents * [""]
mock_braininfo.return_value.previous_text_actions = num_agents * [""]
mock_braininfo.return_value.max_reached = num_agents * [100]
mock_braininfo.return_value.action_masks = num_agents * [num_vector_acts * [1.0]]
mock_braininfo.return_value.agents = range(0, num_agents)
return mock_braininfo()

buffer[0]["prev_action"].append(current_brain_info.previous_vector_actions[0])
buffer[0]["masks"].append(1.0)
buffer[0]["advantages"].append(1.0)
buffer[0]["action_probs"].append(np.ones(buffer[0]["actions"][0].shape))
if brain_params.vector_action_space_type == "discrete":
buffer[0]["action_probs"].append(
np.ones(sum(brain_params.vector_action_space_size))
)
else:
buffer[0]["action_probs"].append(np.ones(buffer[0]["actions"][0].shape))
buffer[0]["actions_pre"].append(np.ones(buffer[0]["actions"][0].shape))
buffer[0]["random_normal_epsilon"].append(
np.ones(buffer[0]["actions"][0].shape)

71
ml-agents/mlagents/trainers/tests/test_buffer.py


assert la[i] == lb[i]
def test_buffer():
def construct_fake_buffer():
b = Buffer()
for fake_agent_id in range(4):
for step in range(9):

100 * fake_agent_id + 10 * step + 5,
]
)
return b
def test_buffer():
b = construct_fake_buffer()
assert_array(a, np.array([[171, 172, 173], [181, 182, 183]]))
assert_array(np.array(a), np.array([[171, 172, 173], [181, 182, 183]]))
a,
np.array(a),
[[231, 232, 233], [241, 242, 243], [251, 252, 253]],
[[261, 262, 263], [271, 272, 273], [281, 282, 283]],
[231, 232, 233],
[241, 242, 243],
[251, 252, 253],
[261, 262, 263],
[271, 272, 273],
[281, 282, 283],
]
),
)

assert_array(
a,
np.array(a),
[[251, 252, 253], [261, 262, 263], [271, 272, 273]],
[[261, 262, 263], [271, 272, 273], [281, 282, 283]],
[251, 252, 253],
[261, 262, 263],
[271, 272, 273],
[261, 262, 263],
[271, 272, 273],
[281, 282, 283],
]
),
)

b.append_update_buffer(2, batch_size=None, training_length=2)
assert len(b.update_buffer["action"]) == 10
assert np.array(b.update_buffer["action"]).shape == (10, 2, 2)
assert len(b.update_buffer["action"]) == 20
assert np.array(b.update_buffer["action"]).shape == (20, 2)
assert c["action"].shape == (1, 2, 2)
assert np.array(c["action"]).shape == (1, 2)
def fakerandint(values):
return 19
def test_buffer_sample():
b = construct_fake_buffer()
b.append_update_buffer(3, batch_size=None, training_length=2)
b.append_update_buffer(2, batch_size=None, training_length=2)
# Test non-LSTM
mb = b.update_buffer.sample_mini_batch(batch_size=4, sequence_length=1)
assert mb.keys() == b.update_buffer.keys()
assert np.array(mb["action"]).shape == (4, 2)
# Test LSTM
# We need to check if we ever get a breaking start - this will maximize the probability
mb = b.update_buffer.sample_mini_batch(batch_size=20, sequence_length=19)
assert mb.keys() == b.update_buffer.keys()
# Should only return one sequence
assert np.array(mb["action"]).shape == (19, 2)
def test_buffer_truncate():
b = construct_fake_buffer()
b.append_update_buffer(3, batch_size=None, training_length=2)
b.append_update_buffer(2, batch_size=None, training_length=2)
# Test non-LSTM
b.truncate_update_buffer(2)
assert len(b.update_buffer["action"]) == 2
b.append_update_buffer(3, batch_size=None, training_length=2)
b.append_update_buffer(2, batch_size=None, training_length=2)
# Test LSTM, truncate should be some multiple of sequence_length
b.truncate_update_buffer(4, sequence_length=3)
assert len(b.update_buffer["action"]) == 3

9
ml-agents/mlagents/trainers/tests/test_learn.py


"--no-graphics": False,
"<trainer-config-path>": "basic_path",
"--debug": False,
"--multi-gpu": False,
"--sampler": None,
}

with patch.object(TrainerController, "start_learning", MagicMock()):
learn.run_training(0, 0, basic_options(), MagicMock())
mock_init.assert_called_once_with(
{},
"./models/ppo-0",
"./summaries",
"ppo-0",

False,
5,
0,
0,
True,
sampler_manager_mock.return_value,

with patch.object(TrainerController, "start_learning", MagicMock()):
learn.run_training(0, 0, options_with_docker_target, MagicMock())
mock_init.assert_called_once()
assert mock_init.call_args[0][0] == "/dockertarget/models/ppo-0"
assert mock_init.call_args[0][1] == "/dockertarget/summaries"
assert mock_init.call_args[0][1] == "/dockertarget/models/ppo-0"
assert mock_init.call_args[0][2] == "/dockertarget/summaries"

2
ml-agents/mlagents/trainers/tests/test_ppo.py


}
brain_params = BrainParameters("test_brain", 1, 1, [], [2], [], 0)
trainer = PPOTrainer(brain_params, 0, trainer_params, True, False, 0, "0")
trainer = PPOTrainer(brain_params, 0, trainer_params, True, False, 0, "0", False)
policy_mock = mock.Mock()
step_count = 10
policy_mock.increment_step = mock.Mock(return_value=step_count)

15
ml-agents/mlagents/trainers/tests/test_reward_signals.py


VECTOR_ACTION_SPACE = [2]
VECTOR_OBS_SPACE = 8
DISCRETE_ACTION_SPACE = [2]
DISCRETE_ACTION_SPACE = [3, 3, 3, 2]
BUFFER_INIT_SAMPLES = 20
NUM_AGENTS = 12

DISCRETE_ACTION_SPACE if use_discrete else VECTOR_ACTION_SPACE
),
discrete=use_discrete,
num_discrete_branches=len(DISCRETE_ACTION_SPACE),
)
else:
mock_brain = mb.create_mock_brainparams(

DISCRETE_ACTION_SPACE if use_discrete else VECTOR_ACTION_SPACE
),
discrete=use_discrete,
num_discrete_branches=len(DISCRETE_ACTION_SPACE),
)
mb.setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo)
env = mock_env()

@mock.patch("mlagents.envs.UnityEnvironment")
def test_gail_dc(mock_env, dummy_config, gail_dummy_config):
env, policy = create_ppo_policy_mock(
mock_env, dummy_config, gail_dummy_config, False, True, False
)
reward_signal_eval(env, policy, "gail")
reward_signal_update(env, policy, "gail")
@mock.patch("mlagents.envs.UnityEnvironment")
def test_gail_visual(mock_env, dummy_config, gail_dummy_config):
def test_gail_dc_visual(mock_env, dummy_config, gail_dummy_config):
gail_dummy_config["gail"]["demo_path"] = (
os.path.dirname(os.path.abspath(__file__)) + "/testdcvis.demo"
)

289
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


import os
from mlagents.trainers import ActionInfo
from mlagents.trainers import TrainerMetrics
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.bc.offline_trainer import OfflineBCTrainer
from mlagents.trainers.bc.online_trainer import OnlineBCTrainer
from mlagents.envs.exception import UnityEnvironmentException
from mlagents.envs.sampler_class import SamplerManager

@pytest.fixture
def dummy_online_bc_config():
return yaml.safe_load(
"""
default:
trainer: online_bc
brain_to_imitate: ExpertBrain
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
use_curiosity: false
curiosity_strength: 0.0
curiosity_enc_size: 1
"""
)
@pytest.fixture
def dummy_offline_bc_config():
return yaml.safe_load(
"""
default:
trainer: offline_bc
demo_path: """
+ os.path.dirname(os.path.abspath(__file__))
+ """/test.demo
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
use_curiosity: false
curiosity_strength: 0.0
curiosity_enc_size: 1
"""
)
@pytest.fixture
def dummy_offline_bc_config_with_override():
base = dummy_offline_bc_config()
base["testbrain"] = {}
base["testbrain"]["normalize"] = False
return base
@pytest.fixture
def dummy_bad_config():
return yaml.safe_load(
"""
default:
trainer: incorrect_trainer
brain_to_imitate: ExpertBrain
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
"""
)
@pytest.fixture
def basic_trainer_controller():
return TrainerController(
model_path="test_model_path",

meta_curriculum=None,
load=True,
keep_checkpoints=False,
lesson=None,
sampler_manager=SamplerManager(None),
sampler_manager=SamplerManager({}),
trainers={},
)

seed = 27
TrainerController(
"",
"",
"1",
1,
None,
True,
False,
False,
None,
seed,
True,
SamplerManager(None),
None,
model_path="",
summaries_dir="",
run_id="1",
save_freq=1,
meta_curriculum=None,
train=True,
training_seed=seed,
fast_simulation=True,
sampler_manager=SamplerManager({}),
resampling_interval=None,
trainers={},
def assert_bc_trainer_constructed(
trainer_cls, input_config, tc, expected_brain_params, expected_config
):
external_brains = {"testbrain": expected_brain_params}
def mock_constructor(self, brain, trainer_parameters, training, load, seed, run_id):
assert brain == expected_brain_params
assert trainer_parameters == expected_config
assert training == tc.train_model
assert load == tc.load_model
assert seed == tc.seed
assert run_id == tc.run_id
with patch.object(trainer_cls, "__init__", mock_constructor):
tc.initialize_trainers(input_config, external_brains)
assert "testbrain" in tc.trainers
assert isinstance(tc.trainers["testbrain"], trainer_cls)
def assert_ppo_trainer_constructed(
input_config, tc, expected_brain_params, expected_config, expected_reward_buff_cap=1
):
external_brains = {"testbrain": expected_brain_params}
def mock_constructor(
self, brain, reward_buff_cap, trainer_parameters, training, load, seed, run_id
):
self.trainer_metrics = TrainerMetrics("", "")
assert brain == expected_brain_params
assert trainer_parameters == expected_config
assert reward_buff_cap == expected_reward_buff_cap
assert training == tc.train_model
assert load == tc.load_model
assert seed == tc.seed
assert run_id == tc.run_id
with patch.object(PPOTrainer, "__init__", mock_constructor):
tc.initialize_trainers(input_config, external_brains)
assert "testbrain" in tc.trainers
assert isinstance(tc.trainers["testbrain"], PPOTrainer)
@patch("mlagents.envs.BrainParameters")
def test_initialize_trainer_parameters_uses_defaults(BrainParametersMock):
brain_params_mock = BrainParametersMock()
tc = basic_trainer_controller()
full_config = dummy_offline_bc_config()
expected_config = full_config["default"]
expected_config["summary_path"] = tc.summaries_dir + "/test_run_id_testbrain"
expected_config["model_path"] = tc.model_path + "/testbrain"
expected_config["keep_checkpoints"] = tc.keep_checkpoints
assert_bc_trainer_constructed(
OfflineBCTrainer, full_config, tc, brain_params_mock, expected_config
)
@patch("mlagents.envs.BrainParameters")
def test_initialize_trainer_parameters_override_defaults(BrainParametersMock):
brain_params_mock = BrainParametersMock()
tc = basic_trainer_controller()
full_config = dummy_offline_bc_config_with_override()
expected_config = full_config["default"]
expected_config["summary_path"] = tc.summaries_dir + "/test_run_id_testbrain"
expected_config["model_path"] = tc.model_path + "/testbrain"
expected_config["keep_checkpoints"] = tc.keep_checkpoints
# Override value from specific brain config
expected_config["normalize"] = False
assert_bc_trainer_constructed(
OfflineBCTrainer, full_config, tc, brain_params_mock, expected_config
)
@patch("mlagents.envs.BrainParameters")
def test_initialize_online_bc_trainer(BrainParametersMock):
brain_params_mock = BrainParametersMock()
tc = basic_trainer_controller()
full_config = dummy_online_bc_config()
expected_config = full_config["default"]
expected_config["summary_path"] = tc.summaries_dir + "/test_run_id_testbrain"
expected_config["model_path"] = tc.model_path + "/testbrain"
expected_config["keep_checkpoints"] = tc.keep_checkpoints
assert_bc_trainer_constructed(
OnlineBCTrainer, full_config, tc, brain_params_mock, expected_config
)
@patch("mlagents.envs.BrainParameters")
def test_initialize_ppo_trainer(BrainParametersMock):
brain_params_mock = BrainParametersMock()
tc = basic_trainer_controller()
full_config = dummy_config()
expected_config = full_config["default"]
expected_config["summary_path"] = tc.summaries_dir + "/test_run_id_testbrain"
expected_config["model_path"] = tc.model_path + "/testbrain"
expected_config["keep_checkpoints"] = tc.keep_checkpoints
assert_ppo_trainer_constructed(full_config, tc, brain_params_mock, expected_config)
@patch("mlagents.envs.BrainParameters")
def test_initialize_invalid_trainer_raises_exception(BrainParametersMock):
tc = basic_trainer_controller()
bad_config = dummy_bad_config()
external_brains = {"testbrain": BrainParametersMock()}
with pytest.raises(UnityEnvironmentException):
tc.initialize_trainers(bad_config, external_brains)
def trainer_controller_with_start_learning_mocks():
trainer_mock = MagicMock()
trainer_mock.get_step = 0

tc, trainer_mock = trainer_controller_with_start_learning_mocks()
tc.train_model = False
trainer_config = dummy_config()
tf_reset_graph.return_value = None
env_mock = MagicMock()

tc.start_learning(env_mock, trainer_config)
tc.start_learning(env_mock)
tc.initialize_trainers.assert_called_once_with(
trainer_config, env_mock.external_brains
)
env_mock.reset.assert_called_once()
assert tc.advance.call_count == 11
tc._export_graph.assert_not_called()

@patch("tensorflow.reset_default_graph")
def test_start_learning_trains_until_max_steps_then_saves(tf_reset_graph):
tc, trainer_mock = trainer_controller_with_start_learning_mocks()
trainer_config = dummy_config()
tf_reset_graph.return_value = None
brain_info_mock = MagicMock()

env_mock.external_brains = MagicMock()
tc.start_learning(env_mock, trainer_config)
tc.start_learning(env_mock)
tc.initialize_trainers.assert_called_once_with(
trainer_config, env_mock.external_brains
)
def test_start_learning_updates_meta_curriculum_lesson_number():
tc, trainer_mock = trainer_controller_with_start_learning_mocks()
trainer_config = dummy_config()
brain_info_mock = MagicMock()
env_mock = MagicMock()
env_mock.close = MagicMock()
env_mock.reset = MagicMock(return_value=brain_info_mock)
meta_curriculum_mock = MagicMock()
meta_curriculum_mock.set_all_curriculums_to_lesson_num = MagicMock()
tc.meta_curriculum = meta_curriculum_mock
tc.lesson = 5
tc.start_learning(env_mock, trainer_config)
meta_curriculum_mock.set_all_curriculums_to_lesson_num.assert_called_once_with(
tc.lesson
)
def trainer_controller_with_take_step_mocks():

5
ml-agents/mlagents/trainers/tf_policy.py


self.graph = tf.Graph()
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
# For multi-GPU training, set allow_soft_placement to True to allow
# placing the operation into an alternative device automatically
# to prevent from exceptions if the device doesn't suppport the operation
# or the device does not exist
config.allow_soft_placement = True
self.sess = tf.Session(config=config, graph=self.graph)
self.saver = None
if self.use_recurrent:

173
ml-agents/mlagents/trainers/trainer.py


# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Deque, Any
from collections import deque
from collections import deque, defaultdict
from mlagents.envs import UnityException, AllBrainInfo, ActionInfoOutputs
from mlagents.envs import UnityException, AllBrainInfo, ActionInfoOutputs, BrainInfo
from mlagents.envs.timers import set_gauge
from mlagents.trainers.buffer import Buffer
from mlagents.trainers.tf_policy import Policy
from mlagents.envs import BrainParameters
LOGGER = logging.getLogger("mlagents.trainers")

class Trainer(object):
"""This class is the base class for the mlagents.envs.trainers"""
def __init__(self, brain, trainer_parameters, training, run_id, reward_buff_cap=1):
def __init__(
self,
brain: BrainParameters,
trainer_parameters: dict,
training: bool,
run_id: int,
reward_buff_cap: int = 1,
):
"""
Responsible for collecting experiences and training a neural network model.
:BrainParameters brain: Brain to be trained.

:int reward_buff_cap:
self.param_keys = []
self.param_keys: List[str] = []
self.brain_name = brain.brain_name
self.run_id = run_id
self.trainer_parameters = trainer_parameters

self.cumulative_returns_since_policy_update = []
self.cumulative_returns_since_policy_update: List[float] = []
self.stats = {}
self.stats: Dict[str, List] = defaultdict(list)
self.policy = None
self._reward_buffer = deque(maxlen=reward_buff_cap)
def __str__(self):
return """{} Trainer""".format(self.__class__)
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
self.policy: Policy = None
def check_param_keys(self):
for k in self.param_keys:

"brain {2}.".format(k, self.__class__, self.brain_name)
)
def dict_to_str(self, param_dict, num_tabs):
def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
"""
Takes a parameter dictionary and converts it to a human-readable string.
Recurses if there are multiple levels of dict. Used to print out hyperaparameters.

if not isinstance(param_dict, dict):
return param_dict
return str(param_dict)
else:
append_newline = "\n" if num_tabs > 0 else ""
return append_newline + "\n".join(

]
)
def __str__(self) -> str:
return """Hyperparameters for the {0} of brain {1}: \n{2}""".format(
self.__class__.__name__,
self.brain_name,
self.dict_to_str(self.trainer_parameters, 0),
)
def parameters(self):
def parameters(self) -> Dict[str, Any]:
raise UnityTrainerException("The parameters property was not implemented.")
return self.trainer_parameters
def graph_scope(self):
"""
Returns the graph scope of the trainer.
"""
raise UnityTrainerException("The graph_scope property was not implemented.")
@property
def get_max_steps(self):
def get_max_steps(self) -> float:
raise UnityTrainerException("The get_max_steps property was not implemented.")
return float(self.trainer_parameters["max_steps"])
def get_step(self):
def get_step(self) -> int:
Returns the number of training steps the trainer has performed
Returns the number of steps the trainer has performed
raise UnityTrainerException("The get_step property was not implemented.")
return self.step
def reward_buffer(self):
def reward_buffer(self) -> Deque[float]:
"""
Returns the reward buffer. The reward buffer contains the cumulative
rewards of the most recent episodes completed by agents using this

def increment_step(self, n_steps: int) -> None:
"""
Increment the step count of the trainer
"""
raise UnityTrainerException("The increment_step method was not implemented.")
def add_experiences(
self,
curr_info: AllBrainInfo,
next_info: AllBrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
:param n_steps: number of steps to increment the step count by
Adds experiences to each agent's experience history.
:param curr_info: Current AllBrainInfo.
:param next_info: Next AllBrainInfo.
:param take_action_outputs: The outputs of the take action method.
"""
raise UnityTrainerException("The add_experiences method was not implemented.")
self.step = self.policy.increment_step(n_steps)
def process_experiences(
self, current_info: AllBrainInfo, next_info: AllBrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
Processing involves calculating value and advantage targets for model updating step.
:param current_info: Dictionary of all current-step brains and corresponding BrainInfo.
:param next_info: Dictionary of all next-step brains and corresponding BrainInfo.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def end_episode(self):
"""
A signal that the Episode has ended. The buffer must be reset.
Get only called when the academy resets.
"""
raise UnityTrainerException("The end_episode method was not implemented.")
def is_ready_update(self):
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to wether or not update_model() can be run
"""
raise UnityTrainerException("The is_ready_update method was not implemented.")
def update_policy(self):
"""
Uses demonstration_buffer to update model.
"""
raise UnityTrainerException("The update_model method was not implemented.")
def save_model(self):
def save_model(self) -> None:
def export_model(self):
def export_model(self) -> None:
def write_training_metrics(self):
def write_training_metrics(self) -> None:
"""
Write training metrics to a CSV file
:return:

is_training,
)
)
set_gauge(f"{self.brain_name}.mean_reward", mean_reward)
else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(

self.summary_writer.add_summary(summary, step)
self.summary_writer.flush()
def write_tensorboard_text(self, key, input_dict):
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
"""
Saves text to Tensorboard.
Note: Only works on tensorflow r1.2 or above.

"Cannot write text summary for Tensorboard. Tensorflow version must be r1.2 or above."
)
pass
def add_experiences(
self,
curr_all_info: AllBrainInfo,
next_all_info: AllBrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param next_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def process_experiences(
self, current_info: AllBrainInfo, next_info: AllBrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
Processing involves calculating value and advantage targets for model updating step.
:param current_info: Dictionary of all current-step brains and corresponding BrainInfo.
:param next_info: Dictionary of all next-step brains and corresponding BrainInfo.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def end_episode(self):
"""
A signal that the Episode has ended. The buffer must be reset.
Get only called when the academy resets.
"""
raise UnityTrainerException("The end_episode method was not implemented.")
def is_ready_update(self):
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to wether or not update_model() can be run
"""
raise UnityTrainerException("The is_ready_update method was not implemented.")
def update_policy(self):
"""
Uses demonstration_buffer to update model.
"""
raise UnityTrainerException("The update_model method was not implemented.")

113
ml-agents/mlagents/trainers/trainer_controller.py


import tensorflow as tf
from time import time
from mlagents.envs import BrainParameters
from mlagents.envs.env_manager import StepInfo
from mlagents.envs.env_manager import EnvManager
from mlagents.envs.exception import (

from mlagents.envs.sampler_class import SamplerManager
from mlagents.envs.timers import hierarchical_timer, get_timer_tree, timed
from mlagents.trainers import Trainer, TrainerMetrics
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.bc.offline_trainer import OfflineBCTrainer
from mlagents.trainers.bc.online_trainer import OnlineBCTrainer
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.subprocess_env_manager import SubprocessEnvManager
trainers: Dict[str, Trainer],
load: bool,
keep_checkpoints: int,
lesson: Optional[int],
training_seed: int,
fast_simulation: bool,
sampler_manager: SamplerManager,

:param trainers: Trainers for each brain to train.
:param load: Whether to load the model or randomly initialize.
:param keep_checkpoints: How many model checkpoints to keep.
:param lesson: Start learning from this lesson.
self.trainers = trainers
self.lesson = lesson
self.load_model = load
self.keep_checkpoints = keep_checkpoints
self.trainers: Dict[str, Trainer] = {}
self.seed = training_seed
np.random.seed(self.seed)
tf.set_random_seed(self.seed)
np.random.seed(training_seed)
tf.set_random_seed(training_seed)
def _get_measure_vals(self):
brain_names_to_measure_vals = {}

for brain_name in self.trainers.keys():
self.trainers[brain_name].export_model()
def initialize_trainers(
self,
trainer_config: Dict[str, Any],
external_brains: Dict[str, BrainParameters],
) -> None:
"""
Initialization of the trainers
:param trainer_config: The configurations of the trainers
"""
trainer_parameters_dict = {}
for brain_name in external_brains:
trainer_parameters = trainer_config["default"].copy()
trainer_parameters["summary_path"] = "{basedir}/{name}".format(
basedir=self.summaries_dir, name=str(self.run_id) + "_" + brain_name
)
trainer_parameters["model_path"] = "{basedir}/{name}".format(
basedir=self.model_path, name=brain_name
)
trainer_parameters["keep_checkpoints"] = self.keep_checkpoints
if brain_name in trainer_config:
_brain_key: Any = brain_name
while not isinstance(trainer_config[_brain_key], dict):
_brain_key = trainer_config[_brain_key]
trainer_parameters.update(trainer_config[_brain_key])
trainer_parameters_dict[brain_name] = trainer_parameters.copy()
for brain_name in external_brains:
if trainer_parameters_dict[brain_name]["trainer"] == "offline_bc":
self.trainers[brain_name] = OfflineBCTrainer(
brain=external_brains[brain_name],
trainer_parameters=trainer_parameters_dict[brain_name],
training=self.train_model,
load=self.load_model,
seed=self.seed,
run_id=self.run_id,
)
elif trainer_parameters_dict[brain_name]["trainer"] == "online_bc":
self.trainers[brain_name] = OnlineBCTrainer(
brain=external_brains[brain_name],
trainer_parameters=trainer_parameters_dict[brain_name],
training=self.train_model,
load=self.load_model,
seed=self.seed,
run_id=self.run_id,
)
elif trainer_parameters_dict[brain_name]["trainer"] == "ppo":
# Find lesson length based on the form of learning
if self.meta_curriculum:
lesson_length = self.meta_curriculum.brains_to_curriculums[
brain_name
].min_lesson_length
else:
lesson_length = 1
self.trainers[brain_name] = PPOTrainer(
brain=external_brains[brain_name],
reward_buff_cap=lesson_length,
trainer_parameters=trainer_parameters_dict[brain_name],
training=self.train_model,
load=self.load_model,
seed=self.seed,
run_id=self.run_id,
)
self.trainer_metrics[brain_name] = self.trainers[
brain_name
].trainer_metrics
else:
raise UnityEnvironmentException(
"The trainer config contains "
"an unknown trainer type for "
"brain {}".format(brain_name)
)
@staticmethod
def _create_model_path(model_path):
try:

else:
trainer.write_summary(global_step, delta_train_start)
def start_learning(
self, env_manager: EnvManager, trainer_config: Dict[str, Any]
) -> None:
# TODO: Should be able to start learning at different lesson numbers
# for each curriculum.
if self.meta_curriculum is not None:
self.meta_curriculum.set_all_curriculums_to_lesson_num(self.lesson)
def start_learning(self, env_manager: EnvManager) -> None:
# Prevent a single session from taking all GPU memory.
self.initialize_trainers(trainer_config, env_manager.external_brains)
for _, t in self.trainers.items():
self.logger.info(t)

env_manager.close()
def end_trainer_episodes(
self, env: BaseUnityEnvironment, lessons_incremented: Dict[str, bool]
self, env: EnvManager, lessons_incremented: Dict[str, bool]
) -> None:
self._reset_env(env)
# Reward buffers reset takes place only for curriculum learning

if changed:
self.trainers[brain_name].reward_buffer.clear()
def reset_env_if_ready(self, env: BaseUnityEnvironment, steps: int) -> None:
def reset_env_if_ready(self, env: EnvManager, steps: int) -> None:
if self.meta_curriculum:
# Get the sizes of the reward buffers.
reward_buff_sizes = {

self.end_trainer_episodes(env, lessons_incremented)
@timed
def advance(self, env: SubprocessEnvManager) -> int:
def advance(self, env: EnvManager) -> int:
with hierarchical_timer("env_step"):
time_start_step = time()
new_step_infos = env.step()

1
ml-agents/setup.py


"pyyaml",
"protobuf>=3.6,<3.7",
"grpcio>=1.11.0,<1.12.0",
"h5py==2.9.0",
'pypiwin32==223;platform_system=="Windows"',
],
python_requires=">=3.6,<3.7",

140
ml-agents/mlagents/trainers/ppo/multi_gpu_policy.py


import logging
import numpy as np
import tensorflow as tf
from tensorflow.python.client import device_lib
from mlagents.envs.timers import timed
from mlagents.trainers.models import EncoderType
from mlagents.trainers.ppo.policy import PPOPolicy
from mlagents.trainers.ppo.models import PPOModel
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)
from mlagents.trainers.components.bc.module import BCModule
# Variable scope in which created variables will be placed under
TOWER_SCOPE_NAME = "tower"
logger = logging.getLogger("mlagents.trainers")
class MultiGpuPPOPolicy(PPOPolicy):
def __init__(self, seed, brain, trainer_params, is_training, load):
"""
Policy for Proximal Policy Optimization Networks with multi-GPU training
:param seed: Random seed.
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param is_training: Whether the model should be trained.
:param load: Whether a pre-trained model will be loaded or a new one created.
"""
super().__init__(seed, brain, trainer_params, is_training, load)
with self.graph.as_default():
avg_grads = self.average_gradients([t.grads for t in self.towers])
self.update_batch = self.model.optimizer.apply_gradients(avg_grads)
self.update_dict = {"update_batch": self.update_batch}
self.update_dict.update(
{
"value_loss_" + str(i): self.towers[i].value_loss
for i in range(len(self.towers))
}
)
self.update_dict.update(
{
"policy_loss_" + str(i): self.towers[i].policy_loss
for i in range(len(self.towers))
}
)
def create_model(self, brain, trainer_params, reward_signal_configs, seed):
"""
Create PPO models, one on each device
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param reward_signal_configs: Reward signal config
:param seed: Random seed.
"""
self.devices = get_devices()
self.towers = []
with self.graph.as_default():
with tf.variable_scope(TOWER_SCOPE_NAME, reuse=tf.AUTO_REUSE):
for device in self.devices:
with tf.device(device):
self.towers.append(
PPOModel(
brain=brain,
lr=float(trainer_params["learning_rate"]),
h_size=int(trainer_params["hidden_units"]),
epsilon=float(trainer_params["epsilon"]),
beta=float(trainer_params["beta"]),
max_step=float(trainer_params["max_steps"]),
normalize=trainer_params["normalize"],
use_recurrent=trainer_params["use_recurrent"],
num_layers=int(trainer_params["num_layers"]),
m_size=self.m_size,
seed=seed,
stream_names=list(reward_signal_configs.keys()),
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
)
self.towers[-1].create_ppo_optimizer()
self.model = self.towers[0]
@timed
def update(self, mini_batch, num_sequences):
"""
Updates model using buffer.
:param n_sequences: Number of trajectories in batch.
:param mini_batch: Experience batch.
:return: Output from update process.
"""
feed_dict = {}
device_batch_size = num_sequences // len(self.devices)
device_batches = []
for i in range(len(self.devices)):
device_batches.append(
{k: v[i : i + device_batch_size] for (k, v) in mini_batch.items()}
)
for batch, tower in zip(device_batches, self.towers):
feed_dict.update(self.construct_feed_dict(tower, batch, num_sequences))
out = self._execute_model(feed_dict, self.update_dict)
run_out = {}
run_out["value_loss"] = np.mean(
[out["value_loss_" + str(i)] for i in range(len(self.towers))]
)
run_out["policy_loss"] = np.mean(
[out["policy_loss_" + str(i)] for i in range(len(self.towers))]
)
run_out["update_batch"] = out["update_batch"]
return run_out
def average_gradients(self, tower_grads):
"""
Average gradients from all towers
:param tower_grads: Gradients from all towers
"""
average_grads = []
for grad_and_vars in zip(*tower_grads):
grads = [g for g, _ in grad_and_vars if g is not None]
if not grads:
continue
avg_grad = tf.reduce_mean(tf.stack(grads), 0)
var = grad_and_vars[0][1]
average_grads.append((avg_grad, var))
return average_grads
def get_devices():
"""
Get all available GPU devices
"""
local_device_protos = device_lib.list_local_devices()
devices = [x.name for x in local_device_protos if x.device_type == "GPU"]
return devices

253
ml-agents/mlagents/trainers/rl_trainer.py


# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Deque, Any
import os
import tensorflow as tf
import numpy as np
from collections import deque, defaultdict
from mlagents.envs import UnityException, AllBrainInfo, ActionInfoOutputs, BrainInfo
from mlagents.trainers.buffer import Buffer
from mlagents.trainers.tf_policy import Policy
from mlagents.trainers.trainer import Trainer, UnityTrainerException
from mlagents.envs import BrainParameters
LOGGER = logging.getLogger("mlagents.trainers")
class RLTrainer(Trainer):
"""
This class is the base class for trainers that use Reward Signals.
Contains methods for adding BrainInfos to the Buffer.
"""
def __init__(self, *args, **kwargs):
super(RLTrainer, self).__init__(*args, **kwargs)
self.step = 0
# Make sure we have at least one reward_signal
if not self.trainer_parameters["reward_signals"]:
raise UnityTrainerException(
"No reward signals were defined. At least one must be used with {}.".format(
self.__class__.__name__
)
)
# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self.collected_rewards = {"environment": {}}
self.training_buffer = Buffer()
self.episode_steps = {}
def construct_curr_info(self, next_info: BrainInfo) -> BrainInfo:
"""
Constructs a BrainInfo which contains the most recent previous experiences for all agents
which correspond to the agents in a provided next_info.
:BrainInfo next_info: A t+1 BrainInfo.
:return: curr_info: Reconstructed BrainInfo to match agents of next_info.
"""
visual_observations: List[List[Any]] = [
[]
] # TODO add types to brain.py methods
vector_observations = []
text_observations = []
memories = []
rewards = []
local_dones = []
max_reacheds = []
agents = []
prev_vector_actions = []
prev_text_actions = []
action_masks = []
for agent_id in next_info.agents:
agent_brain_info = self.training_buffer[agent_id].last_brain_info
if agent_brain_info is None:
agent_brain_info = next_info
agent_index = agent_brain_info.agents.index(agent_id)
for i in range(len(next_info.visual_observations)):
visual_observations[i].append(
agent_brain_info.visual_observations[i][agent_index]
)
vector_observations.append(
agent_brain_info.vector_observations[agent_index]
)
text_observations.append(agent_brain_info.text_observations[agent_index])
if self.policy.use_recurrent:
if len(agent_brain_info.memories) > 0:
memories.append(agent_brain_info.memories[agent_index])
else:
memories.append(self.policy.make_empty_memory(1))
rewards.append(agent_brain_info.rewards[agent_index])
local_dones.append(agent_brain_info.local_done[agent_index])
max_reacheds.append(agent_brain_info.max_reached[agent_index])
agents.append(agent_brain_info.agents[agent_index])
prev_vector_actions.append(
agent_brain_info.previous_vector_actions[agent_index]
)
prev_text_actions.append(
agent_brain_info.previous_text_actions[agent_index]
)
action_masks.append(agent_brain_info.action_masks[agent_index])
if self.policy.use_recurrent:
memories = np.vstack(memories)
curr_info = BrainInfo(
visual_observations,
vector_observations,
text_observations,
memories,
rewards,
agents,
local_dones,
prev_vector_actions,
prev_text_actions,
max_reacheds,
action_masks,
)
return curr_info
def add_experiences(
self,
curr_all_info: AllBrainInfo,
next_all_info: AllBrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param next_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
self.trainer_metrics.start_experience_collection_timer()
if take_action_outputs:
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean())
self.stats["Policy/Learning Rate"].append(
take_action_outputs["learning_rate"]
)
for name, signal in self.policy.reward_signals.items():
self.stats[signal.value_name].append(
np.mean(take_action_outputs["value_heads"][name])
)
curr_info = curr_all_info[self.brain_name]
next_info = next_all_info[self.brain_name]
for agent_id in curr_info.agents:
self.training_buffer[agent_id].last_brain_info = curr_info
self.training_buffer[
agent_id
].last_take_action_outputs = take_action_outputs
if curr_info.agents != next_info.agents:
curr_to_use = self.construct_curr_info(next_info)
else:
curr_to_use = curr_info
tmp_rewards_dict = {}
for name, signal in self.policy.reward_signals.items():
tmp_rewards_dict[name] = signal.evaluate(curr_to_use, next_info)
for agent_id in next_info.agents:
stored_info = self.training_buffer[agent_id].last_brain_info
stored_take_action_outputs = self.training_buffer[
agent_id
].last_take_action_outputs
if stored_info is not None:
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
self.training_buffer[agent_id]["visual_obs%d" % i].append(
stored_info.visual_observations[i][idx]
)
self.training_buffer[agent_id]["next_visual_obs%d" % i].append(
next_info.visual_observations[i][next_idx]
)
if self.policy.use_vec_obs:
self.training_buffer[agent_id]["vector_obs"].append(
stored_info.vector_observations[idx]
)
self.training_buffer[agent_id]["next_vector_in"].append(
next_info.vector_observations[next_idx]
)
if self.policy.use_recurrent:
if stored_info.memories.shape[1] == 0:
stored_info.memories = np.zeros(
(len(stored_info.agents), self.policy.m_size)
)
self.training_buffer[agent_id]["memory"].append(
stored_info.memories[idx]
)
self.training_buffer[agent_id]["masks"].append(1.0)
self.training_buffer[agent_id]["done"].append(
next_info.local_done[next_idx]
)
# Add the outputs of the last eval
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx)
# Store action masks if neccessary
if not self.policy.use_continuous_act:
self.training_buffer[agent_id]["action_mask"].append(
stored_info.action_masks[idx], padding_value=1
)
self.training_buffer[agent_id]["prev_action"].append(
stored_info.previous_vector_actions[idx]
)
values = stored_take_action_outputs["value_heads"]
# Add the value outputs if needed
self.add_rewards_outputs(
values, tmp_rewards_dict, agent_id, idx, next_idx
)
for name, rewards in self.collected_rewards.items():
if agent_id not in rewards:
rewards[agent_id] = 0
if name == "environment":
# Report the reward from the environment
rewards[agent_id] += np.array(next_info.rewards)[next_idx]
else:
# Report the reward signals
rewards[agent_id] += tmp_rewards_dict[name].scaled_reward[
next_idx
]
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
self.trainer_metrics.end_experience_collection_timer()
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
We break this out from add_experiences since it is very highly dependent
on the type of trainer.
:param take_action_outputs: The outputs of the Policy's get_action method.
:param agent_id: the Agent we're adding to.
:param agent_idx: the index of the Agent agent_id
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def add_rewards_outputs(
self,
value: Dict[str, Any],
rewards_dict: Dict[str, float],
agent_id: str,
agent_idx: int,
agent_next_idx: int,
) -> None:
"""
Takes the value and evaluated rewards output of the last action and store it
into the training buffer. We break this out from add_experiences since it is very
highly dependent on the type of trainer.
:param take_action_outputs: The outputs of the Policy's get_action method.
:param rewards_dict: Dict of rewards after evaluation
:param agent_id: the Agent we're adding to.
:param agent_idx: the index of the Agent agent_id in the current brain info
:param agent_next_idx: the index of the Agent agent_id in the next brain info
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)

129
ml-agents/mlagents/trainers/tests/test_multigpu.py


import unittest.mock as mock
import pytest
import numpy as np
import tensorflow as tf
import yaml
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.ppo.multi_gpu_policy import MultiGpuPPOPolicy, get_devices
from mlagents.envs import UnityEnvironment, BrainParameters
from mlagents.envs.mock_communicator import MockCommunicator
from mlagents.trainers.tests.mock_brain import create_mock_brainparams
@pytest.fixture
def dummy_config():
return yaml.safe_load(
"""
trainer: ppo
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
curiosity_strength: 0.0
curiosity_enc_size: 1
reward_signals:
extrinsic:
strength: 1.0
gamma: 0.99
"""
)
@mock.patch("mlagents.trainers.ppo.multi_gpu_policy.get_devices")
def test_create_model(mock_get_devices, dummy_config):
tf.reset_default_graph()
mock_get_devices.return_value = [
"/device:GPU:0",
"/device:GPU:1",
"/device:GPU:2",
"/device:GPU:3",
]
trainer_parameters = dummy_config
trainer_parameters["model_path"] = ""
trainer_parameters["keep_checkpoints"] = 3
brain = create_mock_brainparams()
policy = MultiGpuPPOPolicy(0, brain, trainer_parameters, False, False)
assert len(policy.towers) == len(mock_get_devices.return_value)
@mock.patch("mlagents.trainers.ppo.multi_gpu_policy.get_devices")
def test_average_gradients(mock_get_devices, dummy_config):
tf.reset_default_graph()
mock_get_devices.return_value = [
"/device:GPU:0",
"/device:GPU:1",
"/device:GPU:2",
"/device:GPU:3",
]
trainer_parameters = dummy_config
trainer_parameters["model_path"] = ""
trainer_parameters["keep_checkpoints"] = 3
brain = create_mock_brainparams()
with tf.Session() as sess:
policy = MultiGpuPPOPolicy(0, brain, trainer_parameters, False, False)
var = tf.Variable(0)
tower_grads = [
[(tf.constant(0.1), var)],
[(tf.constant(0.2), var)],
[(tf.constant(0.3), var)],
[(tf.constant(0.4), var)],
]
avg_grads = policy.average_gradients(tower_grads)
init = tf.global_variables_initializer()
sess.run(init)
run_out = sess.run(avg_grads)
assert run_out == [(0.25, 0)]
@mock.patch("mlagents.trainers.tf_policy.TFPolicy._execute_model")
@mock.patch("mlagents.trainers.ppo.policy.PPOPolicy.construct_feed_dict")
@mock.patch("mlagents.trainers.ppo.multi_gpu_policy.get_devices")
def test_update(
mock_get_devices, mock_construct_feed_dict, mock_execute_model, dummy_config
):
tf.reset_default_graph()
mock_get_devices.return_value = ["/device:GPU:0", "/device:GPU:1"]
mock_construct_feed_dict.return_value = {}
mock_execute_model.return_value = {
"value_loss_0": 0.1,
"value_loss_1": 0.3,
"policy_loss_0": 0.5,
"policy_loss_1": 0.7,
"update_batch": None,
}
trainer_parameters = dummy_config
trainer_parameters["model_path"] = ""
trainer_parameters["keep_checkpoints"] = 3
brain = create_mock_brainparams()
policy = MultiGpuPPOPolicy(0, brain, trainer_parameters, False, False)
mock_mini_batch = mock.Mock()
mock_mini_batch.items.return_value = [("action", [1, 2]), ("value", [3, 4])]
run_out = policy.update(mock_mini_batch, 1)
assert mock_mini_batch.items.call_count == len(mock_get_devices.return_value)
assert mock_construct_feed_dict.call_count == len(mock_get_devices.return_value)
assert run_out["value_loss"] == 0.2
assert run_out["policy_loss"] == 0.6
if __name__ == "__main__":
pytest.main()

81
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


import unittest.mock as mock
import pytest
import yaml
import mlagents.trainers.tests.mock_brain as mb
import numpy as np
from mlagents.trainers.rl_trainer import RLTrainer
@pytest.fixture
def dummy_config():
return yaml.safe_load(
"""
summary_path: "test/"
reward_signals:
extrinsic:
strength: 1.0
gamma: 0.99
"""
)
def create_mock_brain():
mock_brain = mb.create_mock_brainparams(
vector_action_space_type="continuous",
vector_action_space_size=[2],
vector_observation_space_size=8,
number_visual_observations=1,
)
return mock_brain
def create_rl_trainer():
mock_brainparams = create_mock_brain()
trainer = RLTrainer(mock_brainparams, dummy_config(), True, 0)
return trainer
def create_mock_all_brain_info(brain_info):
return {"MockBrain": brain_info}
def create_mock_policy():
mock_policy = mock.Mock()
mock_policy.reward_signals = {}
return mock_policy
@mock.patch("mlagents.trainers.rl_trainer.RLTrainer.add_policy_outputs")
@mock.patch("mlagents.trainers.rl_trainer.RLTrainer.add_rewards_outputs")
def test_rl_trainer(add_policy_outputs, add_rewards_outputs):
trainer = create_rl_trainer()
trainer.policy = create_mock_policy()
fake_action_outputs = {
"action": [0.1, 0.1],
"value_heads": {},
"entropy": np.array([1.0]),
"learning_rate": 1.0,
}
mock_braininfo = mb.create_mock_braininfo(
num_agents=2,
num_vector_observations=8,
num_vector_acts=2,
num_vis_observations=1,
)
trainer.add_experiences(
create_mock_all_brain_info(mock_braininfo),
create_mock_all_brain_info(mock_braininfo),
fake_action_outputs,
)
# Remove one of the agents
next_mock_braininfo = mb.create_mock_braininfo(
num_agents=1,
num_vector_observations=8,
num_vector_acts=2,
num_vis_observations=1,
)
brain_info = trainer.construct_curr_info(next_mock_braininfo)
# assert construct_curr_info worked properly
assert len(brain_info.agents) == 1

207
ml-agents/mlagents/trainers/tests/test_simple_rl.py


import math
import random
import tempfile
import pytest
import yaml
from typing import Any, Dict
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.trainer_util import initialize_trainers
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs import BrainInfo, AllBrainInfo, BrainParameters
from mlagents.envs.communicator_objects import AgentInfoProto
from mlagents.envs.simple_env_manager import SimpleEnvManager
from mlagents.envs.sampler_class import SamplerManager
BRAIN_NAME = __name__
OBS_SIZE = 1
STEP_SIZE = 0.1
TIME_PENALTY = 0.001
MIN_STEPS = int(1.0 / STEP_SIZE) + 1
SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY
def clamp(x, min_val, max_val):
return max(min_val, min(x, max_val))
class Simple1DEnvironment(BaseUnityEnvironment):
"""
Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]).
"""
def __init__(self, use_discrete):
super().__init__()
self.discrete = use_discrete
self._brains: Dict[str, BrainParameters] = {}
self._brains[BRAIN_NAME] = BrainParameters(
brain_name=BRAIN_NAME,
vector_observation_space_size=OBS_SIZE,
num_stacked_vector_observations=1,
camera_resolutions=[],
vector_action_space_size=[2] if use_discrete else [1],
vector_action_descriptions=["moveDirection"],
vector_action_space_type=0 if use_discrete else 1,
)
# state
self.position = 0.0
self.step_count = 0
self.random = random.Random(str(self._brains))
self.goal = random.choice([-1, 1])
def step(
self,
vector_action: Dict[str, Any] = None,
memory: Dict[str, Any] = None,
text_action: Dict[str, Any] = None,
value: Dict[str, Any] = None,
) -> AllBrainInfo:
assert vector_action is not None
if self.discrete:
act = vector_action[BRAIN_NAME][0][0]
delta = 1 if act else -1
else:
delta = vector_action[BRAIN_NAME][0][0]
delta = clamp(delta, -STEP_SIZE, STEP_SIZE)
self.position += delta
self.position = clamp(self.position, -1, 1)
self.step_count += 1
done = self.position >= 1.0 or self.position <= -1.0
if done:
reward = SUCCESS_REWARD * self.position * self.goal
else:
reward = -TIME_PENALTY
agent_info = AgentInfoProto(
stacked_vector_observation=[self.goal] * OBS_SIZE, reward=reward, done=done
)
if done:
self._reset_agent()
return {
BRAIN_NAME: BrainInfo.from_agent_proto(
0, [agent_info], self._brains[BRAIN_NAME]
)
}
def _reset_agent(self):
self.position = 0.0
self.step_count = 0
self.goal = random.choice([-1, 1])
def reset(
self,
config: Dict[str, float] = None,
train_mode: bool = True,
custom_reset_parameters: Any = None,
) -> AllBrainInfo: # type: ignore
self._reset_agent()
agent_info = AgentInfoProto(
stacked_vector_observation=[self.goal] * OBS_SIZE,
done=False,
max_step_reached=False,
)
return {
BRAIN_NAME: BrainInfo.from_agent_proto(
0, [agent_info], self._brains[BRAIN_NAME]
)
}
@property
def global_done(self):
return False
@property
def external_brains(self) -> Dict[str, BrainParameters]:
return self._brains
@property
def reset_parameters(self) -> Dict[str, str]:
return {}
def close(self):
pass
def _check_environment_trains(env):
config = """
default:
trainer: ppo
batch_size: 16
beta: 5.0e-3
buffer_size: 64
epsilon: 0.2
hidden_units: 128
lambd: 0.95
learning_rate: 5.0e-3
max_steps: 2500
memory_size: 256
normalize: false
num_epoch: 3
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 500
use_recurrent: false
reward_signals:
extrinsic:
strength: 1.0
gamma: 0.99
"""
# Create controller and begin training.
with tempfile.TemporaryDirectory() as dir:
run_id = "id"
save_freq = 99999
seed = 1337
trainer_config = yaml.safe_load(config)
env_manager = SimpleEnvManager(env)
trainers = initialize_trainers(
trainer_config=trainer_config,
external_brains=env_manager.external_brains,
summaries_dir=dir,
run_id=run_id,
model_path=dir,
keep_checkpoints=1,
train_model=True,
load_model=False,
seed=seed,
meta_curriculum=None,
multi_gpu=False,
)
print(trainers)
tc = TrainerController(
trainers=trainers,
summaries_dir=dir,
model_path=dir,
run_id=run_id,
meta_curriculum=None,
train=True,
training_seed=seed,
fast_simulation=True,
sampler_manager=SamplerManager(None),
resampling_interval=None,
save_freq=save_freq,
)
# Begin training
tc.start_learning(env_manager)
print(tc._get_measure_vals())
for brain_name, mean_reward in tc._get_measure_vals().items():
assert not math.isnan(mean_reward)
assert mean_reward > 0.99
@pytest.mark.parametrize("use_discrete", [True, False])
def test_simple_rl(use_discrete):
env = Simple1DEnvironment(use_discrete=use_discrete)
_check_environment_trains(env)

315
ml-agents/mlagents/trainers/tests/test_trainer_util.py


import pytest
import yaml
import os
from unittest.mock import patch
import mlagents.trainers.trainer_util as trainer_util
from mlagents.trainers.trainer_metrics import TrainerMetrics
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.bc.offline_trainer import OfflineBCTrainer
from mlagents.trainers.bc.online_trainer import OnlineBCTrainer
from mlagents.envs.exception import UnityEnvironmentException
@pytest.fixture
def dummy_config():
return yaml.safe_load(
"""
default:
trainer: ppo
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
use_curiosity: false
curiosity_strength: 0.0
curiosity_enc_size: 1
"""
)
@pytest.fixture
def dummy_online_bc_config():
return yaml.safe_load(
"""
default:
trainer: online_bc
brain_to_imitate: ExpertBrain
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
use_curiosity: false
curiosity_strength: 0.0
curiosity_enc_size: 1
"""
)
@pytest.fixture
def dummy_offline_bc_config():
return yaml.safe_load(
"""
default:
trainer: offline_bc
demo_path: """
+ os.path.dirname(os.path.abspath(__file__))
+ """/test.demo
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
use_curiosity: false
curiosity_strength: 0.0
curiosity_enc_size: 1
"""
)
@pytest.fixture
def dummy_offline_bc_config_with_override():
base = dummy_offline_bc_config()
base["testbrain"] = {}
base["testbrain"]["normalize"] = False
return base
@pytest.fixture
def dummy_bad_config():
return yaml.safe_load(
"""
default:
trainer: incorrect_trainer
brain_to_imitate: ExpertBrain
batches_per_epoch: 16
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
gamma: 0.99
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
memory_size: 8
"""
)
@patch("mlagents.envs.BrainParameters")
def test_initialize_trainer_parameters_override_defaults(BrainParametersMock):
summaries_dir = "test_dir"
run_id = "testrun"
model_path = "model_dir"
keep_checkpoints = 1
train_model = True
load_model = False
seed = 11
base_config = dummy_offline_bc_config_with_override()
expected_config = base_config["default"]
expected_config["summary_path"] = summaries_dir + f"/{run_id}_testbrain"
expected_config["model_path"] = model_path + "/testbrain"
expected_config["keep_checkpoints"] = keep_checkpoints
# Override value from specific brain config
expected_config["normalize"] = False
brain_params_mock = BrainParametersMock()
external_brains = {"testbrain": brain_params_mock}
def mock_constructor(self, brain, trainer_parameters, training, load, seed, run_id):
assert brain == brain_params_mock
assert trainer_parameters == expected_config
assert training == train_model
assert load == load_model
assert seed == seed
assert run_id == run_id
with patch.object(OfflineBCTrainer, "__init__", mock_constructor):
trainers = trainer_util.initialize_trainers(
trainer_config=base_config,
external_brains=external_brains,
summaries_dir=summaries_dir,
run_id=run_id,
model_path=model_path,
keep_checkpoints=keep_checkpoints,
train_model=train_model,
load_model=load_model,
seed=seed,
)
assert "testbrain" in trainers
assert isinstance(trainers["testbrain"], OfflineBCTrainer)
@patch("mlagents.envs.BrainParameters")
def test_initialize_online_bc_trainer(BrainParametersMock):
summaries_dir = "test_dir"
run_id = "testrun"
model_path = "model_dir"
keep_checkpoints = 1
train_model = True
load_model = False
seed = 11
base_config = dummy_online_bc_config()
expected_config = base_config["default"]
expected_config["summary_path"] = summaries_dir + f"/{run_id}_testbrain"
expected_config["model_path"] = model_path + "/testbrain"
expected_config["keep_checkpoints"] = keep_checkpoints
brain_params_mock = BrainParametersMock()
external_brains = {"testbrain": brain_params_mock}
def mock_constructor(self, brain, trainer_parameters, training, load, seed, run_id):
assert brain == brain_params_mock
assert trainer_parameters == expected_config
assert training == train_model
assert load == load_model
assert seed == seed
assert run_id == run_id
with patch.object(OnlineBCTrainer, "__init__", mock_constructor):
trainers = trainer_util.initialize_trainers(
trainer_config=base_config,
external_brains=external_brains,
summaries_dir=summaries_dir,
run_id=run_id,
model_path=model_path,
keep_checkpoints=keep_checkpoints,
train_model=train_model,
load_model=load_model,
seed=seed,
)
assert "testbrain" in trainers
assert isinstance(trainers["testbrain"], OnlineBCTrainer)
@patch("mlagents.envs.BrainParameters")
def test_initialize_ppo_trainer(BrainParametersMock):
brain_params_mock = BrainParametersMock()
external_brains = {"testbrain": BrainParametersMock()}
summaries_dir = "test_dir"
run_id = "testrun"
model_path = "model_dir"
keep_checkpoints = 1
train_model = True
load_model = False
seed = 11
expected_reward_buff_cap = 1
base_config = dummy_config()
expected_config = base_config["default"]
expected_config["summary_path"] = summaries_dir + f"/{run_id}_testbrain"
expected_config["model_path"] = model_path + "/testbrain"
expected_config["keep_checkpoints"] = keep_checkpoints
def mock_constructor(
self,
brain,
reward_buff_cap,
trainer_parameters,
training,
load,
seed,
run_id,
multi_gpu,
):
self.trainer_metrics = TrainerMetrics("", "")
assert brain == brain_params_mock
assert trainer_parameters == expected_config
assert reward_buff_cap == expected_reward_buff_cap
assert training == train_model
assert load == load_model
assert seed == seed
assert run_id == run_id
assert multi_gpu == multi_gpu
with patch.object(PPOTrainer, "__init__", mock_constructor):
trainers = trainer_util.initialize_trainers(
trainer_config=base_config,
external_brains=external_brains,
summaries_dir=summaries_dir,
run_id=run_id,
model_path=model_path,
keep_checkpoints=keep_checkpoints,
train_model=train_model,
load_model=load_model,
seed=seed,
)
assert "testbrain" in trainers
assert isinstance(trainers["testbrain"], PPOTrainer)
@patch("mlagents.envs.BrainParameters")
def test_initialize_invalid_trainer_raises_exception(BrainParametersMock):
summaries_dir = "test_dir"
run_id = "testrun"
model_path = "model_dir"
keep_checkpoints = 1
train_model = True
load_model = False
seed = 11
bad_config = dummy_bad_config()
external_brains = {"testbrain": BrainParametersMock()}
with pytest.raises(UnityEnvironmentException):
trainer_util.initialize_trainers(
trainer_config=bad_config,
external_brains=external_brains,
summaries_dir=summaries_dir,
run_id=run_id,
model_path=model_path,
keep_checkpoints=keep_checkpoints,
train_model=train_model,
load_model=load_model,
seed=seed,
)

97
ml-agents/mlagents/trainers/trainer_util.py


from typing import Any, Dict
from mlagents.trainers import MetaCurriculum
from mlagents.envs.exception import UnityEnvironmentException
from mlagents.trainers import Trainer
from mlagents.envs.brain import BrainParameters
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.bc.offline_trainer import OfflineBCTrainer
from mlagents.trainers.bc.online_trainer import OnlineBCTrainer
def initialize_trainers(
trainer_config: Dict[str, Any],
external_brains: Dict[str, BrainParameters],
summaries_dir: str,
run_id: str,
model_path: str,
keep_checkpoints: int,
train_model: bool,
load_model: bool,
seed: int,
meta_curriculum: MetaCurriculum = None,
multi_gpu: bool = False,
) -> Dict[str, Trainer]:
"""
Initializes trainers given a provided trainer configuration and set of brains from the environment, as well as
some general training session options.
:param trainer_config: Original trainer configuration loaded from YAML
:param external_brains: BrainParameters provided by the Unity environment
:param summaries_dir: Directory to store trainer summary statistics
:param run_id: Run ID to associate with this training run
:param model_path: Path to save the model
:param keep_checkpoints: How many model checkpoints to keep
:param train_model: Whether to train the model (vs. run inference)
:param load_model: Whether to load the model or randomly initialize
:param seed: The random seed to use
:param meta_curriculum: Optional meta_curriculum, used to determine a reward buffer length for PPOTrainer
:param multi_gpu: Whether to use multi-GPU training
:return:
"""
trainers = {}
trainer_parameters_dict = {}
for brain_name in external_brains:
trainer_parameters = trainer_config["default"].copy()
trainer_parameters["summary_path"] = "{basedir}/{name}".format(
basedir=summaries_dir, name=str(run_id) + "_" + brain_name
)
trainer_parameters["model_path"] = "{basedir}/{name}".format(
basedir=model_path, name=brain_name
)
trainer_parameters["keep_checkpoints"] = keep_checkpoints
if brain_name in trainer_config:
_brain_key: Any = brain_name
while not isinstance(trainer_config[_brain_key], dict):
_brain_key = trainer_config[_brain_key]
trainer_parameters.update(trainer_config[_brain_key])
trainer_parameters_dict[brain_name] = trainer_parameters.copy()
for brain_name in external_brains:
if trainer_parameters_dict[brain_name]["trainer"] == "offline_bc":
trainers[brain_name] = OfflineBCTrainer(
external_brains[brain_name],
trainer_parameters_dict[brain_name],
train_model,
load_model,
seed,
run_id,
)
elif trainer_parameters_dict[brain_name]["trainer"] == "online_bc":
trainers[brain_name] = OnlineBCTrainer(
external_brains[brain_name],
trainer_parameters_dict[brain_name],
train_model,
load_model,
seed,
run_id,
)
elif trainer_parameters_dict[brain_name]["trainer"] == "ppo":
trainers[brain_name] = PPOTrainer(
external_brains[brain_name],
meta_curriculum.brains_to_curriculums[brain_name].min_lesson_length
if meta_curriculum
else 1,
trainer_parameters_dict[brain_name],
train_model,
load_model,
seed,
run_id,
multi_gpu,
)
else:
raise UnityEnvironmentException(
"The trainer config contains "
"an unknown trainer type for "
"brain {}".format(brain_name)
)
return trainers
正在加载...
取消
保存