浏览代码

Move add_experiences out of trainer, add Trajectories (#3067)

/asymm-envs
GitHub 5 年前
当前提交
2fd305e7
共有 32 个文件被更改,包括 1261 次插入837 次删除
  1. 3
      ml-agents/mlagents/trainers/action_info.py
  2. 198
      ml-agents/mlagents/trainers/agent_processor.py
  3. 29
      ml-agents/mlagents/trainers/buffer.py
  4. 1
      ml-agents/mlagents/trainers/components/bc/module.py
  5. 2
      ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py
  6. 13
      ml-agents/mlagents/trainers/curriculum.py
  7. 32
      ml-agents/mlagents/trainers/demo_loader.py
  8. 6
      ml-agents/mlagents/trainers/learn.py
  9. 26
      ml-agents/mlagents/trainers/models.py
  10. 45
      ml-agents/mlagents/trainers/ppo/policy.py
  11. 212
      ml-agents/mlagents/trainers/ppo/trainer.py
  12. 246
      ml-agents/mlagents/trainers/rl_trainer.py
  13. 2
      ml-agents/mlagents/trainers/sac/policy.py
  14. 140
      ml-agents/mlagents/trainers/sac/trainer.py
  15. 45
      ml-agents/mlagents/trainers/tests/mock_brain.py
  16. 95
      ml-agents/mlagents/trainers/tests/test_buffer.py
  17. 2
      ml-agents/mlagents/trainers/tests/test_meta_curriculum.py
  18. 174
      ml-agents/mlagents/trainers/tests/test_ppo.py
  19. 48
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  20. 38
      ml-agents/mlagents/trainers/tests/test_sac.py
  21. 3
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  22. 22
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  23. 4
      ml-agents/mlagents/trainers/tests/test_trainer_util.py
  24. 62
      ml-agents/mlagents/trainers/tf_policy.py
  25. 108
      ml-agents/mlagents/trainers/trainer.py
  26. 39
      ml-agents/mlagents/trainers/trainer_controller.py
  27. 4
      ml-agents/mlagents/trainers/trainer_util.py
  28. 118
      ml-agents/mlagents/trainers/stats.py
  29. 63
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  30. 80
      ml-agents/mlagents/trainers/tests/test_stats.py
  31. 110
      ml-agents/mlagents/trainers/tests/test_trajectory.py
  32. 128
      ml-agents/mlagents/trainers/trajectory.py

3
ml-agents/mlagents/trainers/action_info.py


from typing import NamedTuple, Any, Dict
import numpy as np
ActionInfoOutputs = Dict[str, Any]
ActionInfoOutputs = Dict[str, np.ndarray]
class ActionInfo(NamedTuple):

198
ml-agents/mlagents/trainers/agent_processor.py


from typing import List, Union
import sys
from typing import List, Dict
from collections import defaultdict, Counter
from mlagents.trainers.buffer import AgentBuffer, BufferException
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.trajectory import Trajectory, AgentExperience
from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.stats import StatsReporter
class ProcessingBuffer(dict):
class AgentProcessor:
ProcessingBuffer contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.
AgentProcessor contains a dictionary per-agent trajectory buffers. The buffers are indexed by agent_id.
Buffer also contains an update_buffer that corresponds to the buffer used when updating the model.
One AgentProcessor should be created per agent group.
def __str__(self):
return "local_buffers :\n{0}".format(
"\n".join(["\tagent {0} :{1}".format(k, str(self[k])) for k in self.keys()])
)
def __getitem__(self, key):
if key not in self.keys():
self[key] = AgentBuffer()
return super().__getitem__(key)
def reset_local_buffers(self) -> None:
def __init__(
self,
trainer: Trainer,
policy: TFPolicy,
stats_reporter: StatsReporter,
max_trajectory_length: int = sys.maxsize,
):
Resets all the local AgentBuffers.
Create an AgentProcessor.
:param trainer: Trainer instance connected to this AgentProcessor. Trainer is given trajectory
when it is finished.
:param policy: Policy instance associated with this AgentProcessor.
:param max_trajectory_length: Maximum length of a trajectory before it is added to the trainer.
:param stats_category: The category under which to write the stats. Usually, this comes from the Trainer.
for buf in self.values():
buf.reset_agent()
self.experience_buffers: Dict[str, List[AgentExperience]] = defaultdict(list)
self.last_brain_info: Dict[str, BrainInfo] = {}
self.last_take_action_outputs: Dict[str, ActionInfoOutputs] = {}
# Note: this is needed until we switch to AgentExperiences as the data input type.
# We still need some info from the policy (memories, previous actions)
# that really should be gathered by the env-manager.
self.policy = policy
self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)
self.stats_reporter = stats_reporter
self.trainer = trainer
self.max_trajectory_length = max_trajectory_length
def append_to_update_buffer(
def add_experiences(
update_buffer: AgentBuffer,
agent_id: Union[int, str],
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
Appends the buffer of an agent to the update buffer.
:param update_buffer: A reference to an AgentBuffer to append the agent's buffer to
:param agent_id: The id of the agent which data will be appended
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
if key_list is None:
key_list = self[agent_id].keys()
if not self[agent_id].check_length(key_list):
raise BufferException(
"The length of the fields {0} for agent {1} were not of same length".format(
key_list, agent_id
)
if take_action_outputs:
self.stats_reporter.add_stat(
"Policy/Entropy", take_action_outputs["entropy"].mean()
for field_key in key_list:
update_buffer[field_key].extend(
self[agent_id][field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
self.stats_reporter.add_stat(
"Policy/Learning Rate", take_action_outputs["learning_rate"]
def append_all_agent_batch_to_update_buffer(
self,
update_buffer: AgentBuffer,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of all agents to the update buffer.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
for agent_id in self.keys():
self.append_to_update_buffer(
update_buffer, agent_id, key_list, batch_size, training_length
)
for agent_id in curr_info.agents:
self.last_brain_info[agent_id] = curr_info
self.last_take_action_outputs[agent_id] = take_action_outputs
# Store the environment reward
tmp_environment_reward = next_info.rewards
for next_idx, agent_id in enumerate(next_info.agents):
stored_info = self.last_brain_info.get(agent_id, None)
if stored_info is not None:
stored_take_action_outputs = self.last_take_action_outputs[agent_id]
idx = stored_info.agents.index(agent_id)
obs = []
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
obs.append(stored_info.visual_observations[i][idx])
if self.policy.use_vec_obs:
obs.append(stored_info.vector_observations[idx])
if self.policy.use_recurrent:
memory = self.policy.retrieve_memories([agent_id])[0, :]
else:
memory = None
done = next_info.local_done[next_idx]
max_step = next_info.max_reached[next_idx]
# Add the outputs of the last eval
action = stored_take_action_outputs["action"][idx]
if self.policy.use_continuous_act:
action_pre = stored_take_action_outputs["pre_action"][idx]
else:
action_pre = None
action_probs = stored_take_action_outputs["log_probs"][idx]
action_masks = stored_info.action_masks[idx]
prev_action = self.policy.retrieve_previous_action([agent_id])[0, :]
experience = AgentExperience(
obs=obs,
reward=tmp_environment_reward[next_idx],
done=done,
action=action,
action_probs=action_probs,
action_pre=action_pre,
action_mask=action_masks,
prev_action=prev_action,
max_step=max_step,
memory=memory,
)
# Add the value outputs if needed
self.experience_buffers[agent_id].append(experience)
self.episode_rewards[agent_id] += tmp_environment_reward[next_idx]
if (
next_info.local_done[next_idx]
or (
len(self.experience_buffers[agent_id])
>= self.max_trajectory_length
)
) and len(self.experience_buffers[agent_id]) > 0:
# Make next AgentExperience
next_obs = []
for i, _ in enumerate(next_info.visual_observations):
next_obs.append(next_info.visual_observations[i][next_idx])
if self.policy.use_vec_obs:
next_obs.append(next_info.vector_observations[next_idx])
trajectory = Trajectory(
steps=self.experience_buffers[agent_id],
agent_id=agent_id,
next_obs=next_obs,
)
# This will eventually be replaced with a queue
self.trainer.process_trajectory(trajectory)
self.experience_buffers[agent_id] = []
if next_info.local_done[next_idx]:
self.stats_reporter.add_stat(
"Environment/Cumulative Reward",
self.episode_rewards.get(agent_id, 0),
)
self.stats_reporter.add_stat(
"Environment/Episode Length",
self.episode_steps.get(agent_id, 0),
)
del self.episode_steps[agent_id]
del self.episode_rewards[agent_id]
elif not next_info.local_done[next_idx]:
self.episode_steps[agent_id] += 1
self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)

29
ml-agents/mlagents/trainers/buffer.py


for _key in self.keys():
self[_key] = self[_key][current_length - max_length :]
def resequence_and_append(
self,
target_buffer: "AgentBuffer",
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Takes in a batch size and training length (sequence length), and appends this AgentBuffer to target_buffer
properly padded for LSTM use. Optionally, use key_list to restrict which fields are inserted into the new
buffer.
:param target_buffer: The buffer which to append the samples to.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
if key_list is None:
key_list = list(self.keys())
if not self.check_length(key_list):
raise BufferException(
"The length of the fields {0} were not of same length".format(key_list)
)
for field_key in key_list:
target_buffer[field_key].extend(
self[field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
)
@property
def num_experiences(self) -> int:
"""

1
ml-agents/mlagents/trainers/components/bc/module.py


feed_dict[self.policy.model.prev_action] = mini_batch_demo[
"prev_action"
]
network_out = self.policy.sess.run(
list(self.out_dict.values()), feed_dict=feed_dict
)

2
ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py


return RewardSignalResult(scaled_reward, unscaled_reward)
def evaluate_batch(self, mini_batch: Dict[str, np.array]) -> RewardSignalResult:
env_rews = np.array(mini_batch["environment_rewards"])
env_rews = np.array(mini_batch["environment_rewards"], dtype=np.float32)
return RewardSignalResult(self.strength * env_rews, env_rews)

13
ml-agents/mlagents/trainers/curriculum.py


import os
import json
import math
from typing import Dict, Any, TextIO
from .exception import CurriculumConfigError, CurriculumLoadingError

)
@property
def lesson_num(self):
def lesson_num(self) -> int:
def lesson_num(self, lesson_num):
def lesson_num(self, lesson_num: int) -> None:
def increment_lesson(self, measure_val):
def increment_lesson(self, measure_val: float) -> bool:
"""
Increments the lesson number depending on the progress given.
:param measure_val: Measure of progress (either reward or percentage

return True
return False
def get_config(self, lesson=None):
def get_config(self, lesson: int = None) -> Dict[str, Any]:
"""
Returns reset parameters which correspond to the lesson.
:param lesson: The lesson you want to get the config of. If None, the

return config
@staticmethod
def load_curriculum_file(location):
def load_curriculum_file(location: str) -> None:
try:
with open(location) as data_file:
return Curriculum._load_curriculum(data_file)

)
@staticmethod
def _load_curriculum(fp):
def _load_curriculum(fp: TextIO) -> None:
try:
return json.load(fp)
except json.decoder.JSONDecodeError as e:

32
ml-agents/mlagents/trainers/demo_loader.py


from typing import List, Tuple
import numpy as np
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.trainers.brain import BrainParameters, BrainInfo
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import (
AgentInfoActionPairProto,

sequence_length: int,
) -> AgentBuffer:
# Create and populate buffer using experiences
demo_process_buffer = ProcessingBuffer()
demo_buffer = AgentBuffer()
demo_raw_buffer = AgentBuffer()
demo_processed_buffer = AgentBuffer()
for idx, experience in enumerate(pair_infos):
if idx > len(pair_infos) - 2:
break

previous_action = np.array(
pair_infos[idx - 1].action_info.vector_actions, dtype=np.float32
)
demo_process_buffer[0].last_brain_info = current_brain_info
demo_process_buffer[0]["done"].append(next_brain_info.local_done[0])
demo_process_buffer[0]["rewards"].append(next_brain_info.rewards[0])
demo_raw_buffer["done"].append(next_brain_info.local_done[0])
demo_raw_buffer["rewards"].append(next_brain_info.rewards[0])
demo_process_buffer[0]["visual_obs%d" % i].append(
demo_raw_buffer["visual_obs%d" % i].append(
demo_process_buffer[0]["vector_obs"].append(
demo_raw_buffer["vector_obs"].append(
demo_process_buffer[0]["actions"].append(
current_pair_info.action_info.vector_actions
)
demo_process_buffer[0]["prev_action"].append(previous_action)
demo_raw_buffer["actions"].append(current_pair_info.action_info.vector_actions)
demo_raw_buffer["prev_action"].append(previous_action)
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
demo_process_buffer.reset_local_buffers()
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.reset_agent()
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
return demo_buffer
return demo_processed_buffer
@timed

6
ml-agents/mlagents/trainers/learn.py


from mlagents.trainers.exception import TrainerError
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import load_config, TrainerFactory
from mlagents.trainers.stats import TensorboardWriter, StatsReporter
from mlagents_envs.environment import UnityEnvironment
from mlagents.trainers.sampler_class import SamplerManager
from mlagents.trainers.exception import SamplerException

)
trainer_config = load_config(trainer_config_path)
port = options.base_port + (sub_id * options.num_envs)
# Configure Tensorboard Writers and StatsReporter
tb_writer = TensorboardWriter(summaries_dir)
StatsReporter.add_writer(tb_writer)
if options.env_path is None:
port = 5004 # This is the in Editor Training Port
env_factory = create_environment_factory(

26
ml-agents/mlagents/trainers/models.py


[],
trainable=False,
dtype=tf.int32,
initializer=tf.ones_initializer(),
initializer=tf.zeros_initializer(),
)
self.running_mean = tf.get_variable(
"running_mean",

self.update_normalization = self.create_normalizer_update(vector_obs)
def create_normalizer_update(self, vector_input):
mean_current_observation = tf.reduce_mean(vector_input, axis=0)
new_mean = self.running_mean + (
mean_current_observation - self.running_mean
) / tf.cast(tf.add(self.normalization_steps, 1), tf.float32)
new_variance = self.running_variance + (mean_current_observation - new_mean) * (
mean_current_observation - self.running_mean
# Based on Welford's algorithm for running mean and standard deviation, for batch updates. Discussion here:
# https://stackoverflow.com/questions/56402955/whats-the-formula-for-welfords-algorithm-for-variance-std-with-batch-updates
steps_increment = tf.shape(vector_input)[0]
total_new_steps = tf.add(self.normalization_steps, steps_increment)
# Compute the incremental update and divide by the number of new steps.
input_to_old_mean = tf.subtract(vector_input, self.running_mean)
new_mean = self.running_mean + tf.reduce_sum(
input_to_old_mean / tf.cast(total_new_steps, dtype=tf.float32), axis=0
)
# Compute difference of input to the new mean for Welford update
input_to_new_mean = tf.subtract(vector_input, new_mean)
new_variance = self.running_variance + tf.reduce_sum(
input_to_new_mean * input_to_old_mean, axis=0
update_norm_step = tf.assign(
self.normalization_steps, self.normalization_steps + 1
)
update_norm_step = tf.assign(self.normalization_steps, total_new_steps)
return tf.group([update_mean, update_variance, update_norm_step])
@staticmethod

45
ml-agents/mlagents/trainers/ppo/policy.py


from mlagents.tf_utils import tf
from mlagents_envs.timers import timed
from mlagents.trainers.brain import BrainInfo, BrainParameters
from mlagents.trainers.brain import BrainParameters
from mlagents.trainers.models import EncoderType, LearningRateSchedule
from mlagents.trainers.ppo.models import PPOModel
from mlagents.trainers.tf_policy import TFPolicy

{
"action": self.model.output,
"log_probs": self.model.all_log_probs,
"value_heads": self.model.value_heads,
"value": self.model.value,
"entropy": self.model.entropy,
"learning_rate": self.model.learning_rate,
}

]
feed_dict[model.memory_in] = mem_in
return feed_dict
def get_value_estimates(
self, brain_info: BrainInfo, idx: int, done: bool
) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param brain_info: BrainInfo to be used for bootstrapping.
:param idx: Index in BrainInfo of agent.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.model.batch_size: 1,
self.model.sequence_length: 1,
}
for i in range(len(brain_info.visual_observations)):
feed_dict[self.model.visual_in[i]] = [
brain_info.visual_observations[i][idx]
]
if self.use_vec_obs:
feed_dict[self.model.vector_in] = [brain_info.vector_observations[idx]]
agent_id = brain_info.agents[idx]
if self.use_recurrent:
feed_dict[self.model.memory_in] = self.retrieve_memories([agent_id])
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = self.retrieve_previous_action(
[agent_id]
)
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates

212
ml-agents/mlagents/trainers/ppo/trainer.py


import logging
from collections import defaultdict
from typing import Dict
from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.rl_trainer import RLTrainer, AllRewardsOutput
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.trajectory import Trajectory
logger = logging.getLogger("mlagents.trainers")

self.policy = self.ppo_policy
for _reward_signal in self.policy.reward_signals.keys():
self.collected_rewards[_reward_signal] = {}
self.collected_rewards[_reward_signal] = defaultdict(lambda: 0)
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
def process_trajectory(self, trajectory: Trajectory) -> None:
Checks agent histories for processing condition, and processes them as necessary.
Takes a trajectory and processes it, putting it into the update buffer.
:param current_info: current BrainInfo.
:param next_info: next BrainInfo.
:param trajectory: The Trajectory tuple containing the steps to be processed.
if self.is_training:
self.policy.update_normalization(next_info.vector_observations)
for l in range(len(next_info.agents)):
agent_actions = self.processing_buffer[next_info.agents[l]]["actions"]
if (
next_info.local_done[l]
or len(agent_actions) > self.trainer_parameters["time_horizon"]
) and len(agent_actions) > 0:
agent_id = next_info.agents[l]
if next_info.max_reached[l]:
bootstrapping_info = self.processing_buffer[
agent_id
].last_brain_info
idx = bootstrapping_info.agents.index(agent_id)
else:
bootstrapping_info = next_info
idx = l
value_next = self.ppo_policy.get_value_estimates(
bootstrapping_info,
idx,
next_info.local_done[l] and not next_info.max_reached[l],
)
agent_id = trajectory.agent_id # All the agents should have the same ID
tmp_advantages = []
tmp_returns = []
for name in self.policy.reward_signals:
bootstrap_value = value_next[name]
# Add to episode_steps
self.episode_steps[agent_id] += len(trajectory.steps)
local_rewards = self.processing_buffer[agent_id][
"{}_rewards".format(name)
].get_batch()
local_value_estimates = self.processing_buffer[agent_id][
"{}_value_estimates".format(name)
].get_batch()
local_advantage = get_gae(
rewards=local_rewards,
value_estimates=local_value_estimates,
value_next=bootstrap_value,
gamma=self.policy.reward_signals[name].gamma,
lambd=self.trainer_parameters["lambd"],
)
local_return = local_advantage + local_value_estimates
# This is later use as target for the different value estimates
self.processing_buffer[agent_id]["{}_returns".format(name)].set(
local_return
)
self.processing_buffer[agent_id]["{}_advantage".format(name)].set(
local_advantage
)
tmp_advantages.append(local_advantage)
tmp_returns.append(local_return)
agent_buffer_trajectory = trajectory.to_agentbuffer()
# Update the normalization
if self.is_training:
self.policy.update_normalization(agent_buffer_trajectory["vector_obs"])
global_advantages = list(
np.mean(np.array(tmp_advantages, dtype=np.float32), axis=0)
)
global_returns = list(
np.mean(np.array(tmp_returns, dtype=np.float32), axis=0)
)
self.processing_buffer[agent_id]["advantages"].set(global_advantages)
self.processing_buffer[agent_id]["discounted_returns"].set(
global_returns
)
# Get all value estimates
value_estimates = self.policy.get_batched_value_estimates(
agent_buffer_trajectory
)
for name, v in value_estimates.items():
agent_buffer_trajectory["{}_value_estimates".format(name)].extend(v)
self.stats_reporter.add_stat(
self.policy.reward_signals[name].value_name, np.mean(v)
)
self.processing_buffer.append_to_update_buffer(
self.update_buffer,
agent_id,
batch_size=None,
training_length=self.policy.sequence_length,
)
value_next = self.policy.get_value_estimates(
trajectory.next_obs,
agent_id,
trajectory.done_reached and not trajectory.max_step_reached,
)
self.processing_buffer[agent_id].reset_agent()
if next_info.local_done[l]:
self.stats["Environment/Episode Length"].append(
self.episode_steps.get(agent_id, 0)
)
self.episode_steps[agent_id] = 0
for name, rewards in self.collected_rewards.items():
if name == "environment":
self.cumulative_returns_since_policy_update.append(
rewards.get(agent_id, 0)
)
self.stats["Environment/Cumulative Reward"].append(
rewards.get(agent_id, 0)
)
self.reward_buffer.appendleft(rewards.get(agent_id, 0))
rewards[agent_id] = 0
else:
self.stats[
self.policy.reward_signals[name].stat_name
].append(rewards.get(agent_id, 0))
rewards[agent_id] = 0
# Evaluate all reward functions
self.collected_rewards["environment"][agent_id] += np.sum(
agent_buffer_trajectory["environment_rewards"]
)
for name, reward_signal in self.policy.reward_signals.items():
evaluate_result = reward_signal.evaluate_batch(
agent_buffer_trajectory
).scaled_reward
agent_buffer_trajectory["{}_rewards".format(name)].extend(evaluate_result)
# Report the reward signals
self.collected_rewards[name][agent_id] += np.sum(evaluate_result)
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
"""
actions = take_action_outputs["action"]
if self.policy.use_continuous_act:
actions_pre = take_action_outputs["pre_action"]
self.processing_buffer[agent_id]["actions_pre"].append(
actions_pre[agent_idx]
)
a_dist = take_action_outputs["log_probs"]
# value is a dictionary from name of reward to value estimate of the value head
self.processing_buffer[agent_id]["actions"].append(actions[agent_idx])
self.processing_buffer[agent_id]["action_probs"].append(a_dist[agent_idx])
# Compute GAE and returns
tmp_advantages = []
tmp_returns = []
for name in self.policy.reward_signals:
bootstrap_value = value_next[name]
def add_rewards_outputs(
self,
rewards_out: AllRewardsOutput,
values: Dict[str, np.ndarray],
agent_id: str,
agent_idx: int,
agent_next_idx: int,
) -> None:
"""
Takes the value output of the last action and store it into the training buffer.
"""
for name, reward_result in rewards_out.reward_signals.items():
# 0 because we use the scaled reward to train the agent
self.processing_buffer[agent_id]["{}_rewards".format(name)].append(
reward_result.scaled_reward[agent_next_idx]
local_rewards = agent_buffer_trajectory[
"{}_rewards".format(name)
].get_batch()
local_value_estimates = agent_buffer_trajectory[
"{}_value_estimates".format(name)
].get_batch()
local_advantage = get_gae(
rewards=local_rewards,
value_estimates=local_value_estimates,
value_next=bootstrap_value,
gamma=self.policy.reward_signals[name].gamma,
lambd=self.trainer_parameters["lambd"],
self.processing_buffer[agent_id]["{}_value_estimates".format(name)].append(
values[name][agent_idx][0]
)
local_return = local_advantage + local_value_estimates
# This is later use as target for the different value estimates
agent_buffer_trajectory["{}_returns".format(name)].set(local_return)
agent_buffer_trajectory["{}_advantage".format(name)].set(local_advantage)
tmp_advantages.append(local_advantage)
tmp_returns.append(local_return)
# Get global advantages
global_advantages = list(
np.mean(np.array(tmp_advantages, dtype=np.float32), axis=0)
)
global_returns = list(np.mean(np.array(tmp_returns, dtype=np.float32), axis=0))
agent_buffer_trajectory["advantages"].set(global_advantages)
agent_buffer_trajectory["discounted_returns"].set(global_returns)
# Append to update buffer
agent_buffer_trajectory.resequence_and_append(
self.update_buffer, training_length=self.policy.sequence_length
)
# If this was a terminal trajectory, append stats and reset reward collection
if trajectory.done_reached:
self._update_end_episode_stats(agent_id)
def is_ready_update(self):
"""

batch_update_stats[stat_name].append(value)
for stat, stat_list in batch_update_stats.items():
self.stats[stat].append(np.mean(stat_list))
self.stats_reporter.add_stat(stat, np.mean(stat_list))
self.stats[stat].append(val)
self.stats_reporter.add_stat(stat, val)
self.clear_update_buffer()
self.trainer_metrics.end_policy_update()

246
ml-agents/mlagents/trainers/rl_trainer.py


# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Any, NamedTuple
import numpy as np
from typing import Dict
from collections import defaultdict
from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.trainers.trainer import Trainer, UnityTrainerException
from mlagents.trainers.components.reward_signals import RewardSignalResult

class AllRewardsOutput(NamedTuple):
"""
This class stores all of the outputs of the reward signals,
as well as the raw reward from the environment.
"""
reward_signals: RewardSignalResults
environment: np.ndarray
class RLTrainer(Trainer):
"""
This class is the base class for trainers that use Reward Signals.

# collected_rewards is a dictionary from name of reward signal to a dictionary of agent_id to cumulative reward
# used for reporting only. We always want to report the environment reward to Tensorboard, regardless
# of what reward signals are actually present.
self.collected_rewards = {"environment": {}}
self.processing_buffer = ProcessingBuffer()
self.update_buffer = AgentBuffer()
self.episode_steps = {}
def construct_curr_info(self, next_info: BrainInfo) -> BrainInfo:
"""
Constructs a BrainInfo which contains the most recent previous experiences for all agents
which correspond to the agents in a provided next_info.
:BrainInfo next_info: A t+1 BrainInfo.
:return: curr_info: Reconstructed BrainInfo to match agents of next_info.
"""
visual_observations: List[List[Any]] = [
[] for _ in next_info.visual_observations
] # TODO add types to brain.py methods
vector_observations = []
rewards = []
local_dones = []
max_reacheds = []
agents = []
action_masks = []
for agent_id in next_info.agents:
agent_brain_info = self.processing_buffer[agent_id].last_brain_info
if agent_brain_info is None:
agent_brain_info = next_info
agent_index = agent_brain_info.agents.index(agent_id)
for i in range(len(next_info.visual_observations)):
visual_observations[i].append(
agent_brain_info.visual_observations[i][agent_index]
)
vector_observations.append(
agent_brain_info.vector_observations[agent_index]
)
rewards.append(agent_brain_info.rewards[agent_index])
local_dones.append(agent_brain_info.local_done[agent_index])
max_reacheds.append(agent_brain_info.max_reached[agent_index])
agents.append(agent_brain_info.agents[agent_index])
action_masks.append(agent_brain_info.action_masks[agent_index])
curr_info = BrainInfo(
visual_observations,
vector_observations,
rewards,
agents,
local_dones,
max_reacheds,
action_masks,
)
return curr_info
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
self.trainer_metrics.start_experience_collection_timer()
if take_action_outputs:
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean())
self.stats["Policy/Learning Rate"].append(
take_action_outputs["learning_rate"]
)
for name, signal in self.policy.reward_signals.items():
self.stats[signal.value_name].append(
np.mean(take_action_outputs["value_heads"][name])
)
for agent_id in curr_info.agents:
self.processing_buffer[agent_id].last_brain_info = curr_info
self.processing_buffer[
agent_id
].last_take_action_outputs = take_action_outputs
if curr_info.agents != next_info.agents:
curr_to_use = self.construct_curr_info(next_info)
else:
curr_to_use = curr_info
# Evaluate and store the reward signals
tmp_reward_signal_outs = {}
for name, signal in self.policy.reward_signals.items():
tmp_reward_signal_outs[name] = signal.evaluate(
curr_to_use, take_action_outputs["action"], next_info
)
# Store the environment reward
tmp_environment = np.array(next_info.rewards, dtype=np.float32)
rewards_out = AllRewardsOutput(
reward_signals=tmp_reward_signal_outs, environment=tmp_environment
)
for agent_id in next_info.agents:
stored_info = self.processing_buffer[agent_id].last_brain_info
stored_take_action_outputs = self.processing_buffer[
agent_id
].last_take_action_outputs
if stored_info is not None:
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
self.processing_buffer[agent_id]["visual_obs%d" % i].append(
stored_info.visual_observations[i][idx]
)
self.processing_buffer[agent_id][
"next_visual_obs%d" % i
].append(next_info.visual_observations[i][next_idx])
if self.policy.use_vec_obs:
self.processing_buffer[agent_id]["vector_obs"].append(
stored_info.vector_observations[idx]
)
self.processing_buffer[agent_id]["next_vector_in"].append(
next_info.vector_observations[next_idx]
)
if self.policy.use_recurrent:
self.processing_buffer[agent_id]["memory"].append(
self.policy.retrieve_memories([agent_id])[0, :]
)
self.processing_buffer[agent_id]["masks"].append(1.0)
self.processing_buffer[agent_id]["done"].append(
next_info.local_done[next_idx]
)
# Add the outputs of the last eval
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx)
# Store action masks if necessary
if not self.policy.use_continuous_act:
self.processing_buffer[agent_id]["action_mask"].append(
stored_info.action_masks[idx], padding_value=1
)
self.processing_buffer[agent_id]["prev_action"].append(
self.policy.retrieve_previous_action([agent_id])[0, :]
)
values = stored_take_action_outputs["value_heads"]
# Add the value outputs if needed
self.add_rewards_outputs(
rewards_out, values, agent_id, idx, next_idx
)
for name, rewards in self.collected_rewards.items():
if agent_id not in rewards:
rewards[agent_id] = 0
if name == "environment":
# Report the reward from the environment
rewards[agent_id] += rewards_out.environment[next_idx]
else:
# Report the reward signals
rewards[agent_id] += rewards_out.reward_signals[
name
].scaled_reward[next_idx]
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
self.trainer_metrics.end_experience_collection_timer()
self.collected_rewards: Dict[str, Dict[str, int]] = {
"environment": defaultdict(lambda: 0)
}
self.update_buffer: AgentBuffer = AgentBuffer()
self.episode_steps: Dict[str, int] = defaultdict(lambda: 0)
def end_episode(self) -> None:
"""

self.processing_buffer.reset_local_buffers()
for agent_id in self.episode_steps:
self.episode_steps[agent_id] = 0
for rewards in self.collected_rewards.values():

def _update_end_episode_stats(self, agent_id: str) -> None:
self.episode_steps[agent_id] = 0
for name, rewards in self.collected_rewards.items():
if name == "environment":
self.cumulative_returns_since_policy_update.append(
rewards.get(agent_id, 0)
)
self.reward_buffer.appendleft(rewards.get(agent_id, 0))
rewards[agent_id] = 0
else:
self.stats_reporter.add_stat(
self.policy.reward_signals[name].stat_name, rewards.get(agent_id, 0)
)
rewards[agent_id] = 0
def clear_update_buffer(self) -> None:
"""
Clear the buffers that have been built up during inference. If

def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
We break this out from add_experiences since it is very highly dependent
on the type of trainer.
:param take_action_outputs: The outputs of the Policy's get_action method.
:param agent_id: the Agent we're adding to.
:param agent_idx: the index of the Agent agent_id
"""
raise UnityTrainerException(
"The add_policy_outputs method was not implemented."
)
def add_rewards_outputs(
self,
rewards_out: AllRewardsOutput,
values: Dict[str, np.ndarray],
agent_id: str,
agent_idx: int,
agent_next_idx: int,
) -> None:
"""
Takes the value and evaluated rewards output of the last action and store it
into the training buffer. We break this out from add_experiences since it is very
highly dependent on the type of trainer.
:param take_action_outputs: The outputs of the Policy's get_action method.
:param rewards_dict: Dict of rewards after evaluation
:param agent_id: the Agent we're adding to.
:param agent_idx: the index of the Agent agent_id in the current brain info
:param agent_next_idx: the index of the Agent agent_id in the next brain info
"""
raise UnityTrainerException(
"The add_rewards_outputs method was not implemented."
)
def advance(self):
"""
Eventually logic from TrainerController.advance() will live here.
"""
self.clear_update_buffer()

2
ml-agents/mlagents/trainers/sac/policy.py


{
"action": self.model.output,
"log_probs": self.model.all_log_probs,
"value_heads": self.model.value_heads,
"value": self.model.value,
"entropy": self.model.entropy,
"learning_rate": self.model.learning_rate,
}

140
ml-agents/mlagents/trainers/sac/trainer.py


import numpy as np
from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.rl_trainer import RLTrainer, AllRewardsOutput
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.trajectory import Trajectory, SplitObservations
LOGGER = logging.getLogger("mlagents.trainers")

)
for _reward_signal in self.policy.reward_signals.keys():
self.collected_rewards[_reward_signal] = {}
self.episode_steps = {}
self.collected_rewards[_reward_signal] = defaultdict(lambda: 0)
def save_model(self) -> None:
"""

)
)
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
def process_trajectory(self, trajectory: Trajectory) -> None:
Takes the output of the last action and store it into the training buffer.
Takes a trajectory and processes it, putting it into the replay buffer.
actions = take_action_outputs["action"]
self.processing_buffer[agent_id]["actions"].append(actions[agent_idx])
last_step = trajectory.steps[-1]
agent_id = trajectory.agent_id # All the agents should have the same ID
def add_rewards_outputs(
self,
rewards_out: AllRewardsOutput,
values: Dict[str, np.ndarray],
agent_id: str,
agent_idx: int,
agent_next_idx: int,
) -> None:
"""
Takes the value output of the last action and store it into the training buffer.
"""
self.processing_buffer[agent_id]["environment_rewards"].append(
rewards_out.environment[agent_next_idx]
)
# Add to episode_steps
self.episode_steps[agent_id] += len(trajectory.steps)
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
:param current_info: current BrainInfo.
:param next_info: next BrainInfo.
"""
agent_buffer_trajectory = trajectory.to_agentbuffer()
# Update the normalization
self.policy.update_normalization(next_info.vector_observations)
for l in range(len(next_info.agents)):
agent_actions = self.processing_buffer[next_info.agents[l]]["actions"]
if (
next_info.local_done[l]
or len(agent_actions) >= self.trainer_parameters["time_horizon"]
) and len(agent_actions) > 0:
agent_id = next_info.agents[l]
self.policy.update_normalization(agent_buffer_trajectory["vector_obs"])
# Bootstrap using last brain info. Set last element to duplicate obs and remove dones.
if next_info.max_reached[l]:
bootstrapping_info = self.processing_buffer[
agent_id
].last_brain_info
idx = bootstrapping_info.agents.index(agent_id)
for i, obs in enumerate(bootstrapping_info.visual_observations):
self.processing_buffer[agent_id]["next_visual_obs%d" % i][
-1
] = obs[idx]
if self.policy.use_vec_obs:
self.processing_buffer[agent_id]["next_vector_in"][
-1
] = bootstrapping_info.vector_observations[idx]
self.processing_buffer[agent_id]["done"][-1] = False
# Evaluate all reward functions for reporting purposes
self.collected_rewards["environment"][agent_id] += np.sum(
agent_buffer_trajectory["environment_rewards"]
)
for name, reward_signal in self.policy.reward_signals.items():
evaluate_result = reward_signal.evaluate_batch(
agent_buffer_trajectory
).scaled_reward
# Report the reward signals
self.collected_rewards[name][agent_id] += np.sum(evaluate_result)
self.processing_buffer.append_to_update_buffer(
self.update_buffer,
agent_id,
batch_size=None,
training_length=self.policy.sequence_length,
)
# Get all value estimates for reporting purposes
value_estimates = self.policy.get_batched_value_estimates(
agent_buffer_trajectory
)
for name, v in value_estimates.items():
self.stats_reporter.add_stat(
self.policy.reward_signals[name].value_name, np.mean(v)
)
self.processing_buffer[agent_id].reset_agent()
if next_info.local_done[l]:
self.stats["Environment/Episode Length"].append(
self.episode_steps.get(agent_id, 0)
)
self.episode_steps[agent_id] = 0
for name, rewards in self.collected_rewards.items():
if name == "environment":
self.cumulative_returns_since_policy_update.append(
rewards.get(agent_id, 0)
)
self.stats["Environment/Cumulative Reward"].append(
rewards.get(agent_id, 0)
)
self.reward_buffer.appendleft(rewards.get(agent_id, 0))
rewards[agent_id] = 0
else:
self.stats[
self.policy.reward_signals[name].stat_name
].append(rewards.get(agent_id, 0))
rewards[agent_id] = 0
# Bootstrap using the last step rather than the bootstrap step if max step is reached.
# Set last element to duplicate obs and remove dones.
if last_step.max_step:
vec_vis_obs = SplitObservations.from_observations(last_step.obs)
for i, obs in enumerate(vec_vis_obs.visual_observations):
agent_buffer_trajectory["next_visual_obs%d" % i][-1] = obs
if vec_vis_obs.vector_observations.size > 1:
agent_buffer_trajectory["next_vector_in"][
-1
] = vec_vis_obs.vector_observations
agent_buffer_trajectory["done"][-1] = False
# Append to update buffer
agent_buffer_trajectory.resequence_and_append(
self.update_buffer, training_length=self.policy.sequence_length
)
if trajectory.done_reached:
self._update_end_episode_stats(agent_id)
def is_ready_update(self) -> bool:
"""

)
for stat, stat_list in batch_update_stats.items():
self.stats[stat].append(np.mean(stat_list))
self.stats_reporter.add_stat(stat, np.mean(stat_list))
self.stats[stat].append(val)
self.stats_reporter.add_stat(stat, val)
def update_reward_signals(self) -> None:
"""

for stat_name, value in update_stats.items():
batch_update_stats[stat_name].append(value)
for stat, stat_list in batch_update_stats.items():
self.stats[stat].append(np.mean(stat_list))
self.stats_reporter.add_stat(stat, np.mean(stat_list))

45
ml-agents/mlagents/trainers/tests/mock_brain.py


from mlagents.trainers.brain import CameraResolution, BrainParameters
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
def create_mock_brainparams(

def create_buffer(brain_infos, brain_params, sequence_length, memory_size=8):
buffer = ProcessingBuffer()
buffer = AgentBuffer()
update_buffer = AgentBuffer()
# Make a buffer
for idx, experience in enumerate(brain_infos):

next_brain_info = brain_infos[idx + 1]
buffer[0].last_brain_info = current_brain_info
buffer[0]["done"].append(next_brain_info.local_done[0])
buffer[0]["rewards"].append(next_brain_info.rewards[0])
buffer.last_brain_info = current_brain_info
buffer["done"].append(next_brain_info.local_done[0])
buffer["rewards"].append(next_brain_info.rewards[0])
buffer[0]["visual_obs%d" % i].append(
buffer["visual_obs%d" % i].append(
buffer[0]["next_visual_obs%d" % i].append(
buffer["next_visual_obs%d" % i].append(
buffer[0]["vector_obs"].append(current_brain_info.vector_observations[0])
buffer[0]["next_vector_in"].append(
current_brain_info.vector_observations[0]
)
buffer["vector_obs"].append(current_brain_info.vector_observations[0])
buffer["next_vector_in"].append(current_brain_info.vector_observations[0])
buffer[0]["actions"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer[0]["prev_action"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer[0]["masks"].append(1.0)
buffer[0]["advantages"].append(1.0)
buffer["actions"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer["prev_action"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer["masks"].append(1.0)
buffer["advantages"].append(1.0)
buffer[0]["action_probs"].append(
buffer["action_probs"].append(
buffer[0]["action_probs"].append(
np.ones(buffer[0]["actions"][0].shape, dtype=np.float32)
buffer["action_probs"].append(
np.ones(buffer["actions"][0].shape, dtype=np.float32)
buffer[0]["actions_pre"].append(
np.ones(buffer[0]["actions"][0].shape, dtype=np.float32)
buffer["actions_pre"].append(
np.ones(buffer["actions"][0].shape, dtype=np.float32)
buffer[0]["action_mask"].append(
buffer["action_mask"].append(
buffer[0]["memory"].append(np.ones(memory_size, dtype=np.float32))
buffer["memory"].append(np.ones(memory_size, dtype=np.float32))
buffer.append_to_update_buffer(
update_buffer, 0, batch_size=None, training_length=sequence_length
buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=sequence_length
)
return update_buffer

95
ml-agents/mlagents/trainers/tests/test_buffer.py


import numpy as np
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
def assert_array(a, b):

assert la[i] == lb[i]
def construct_fake_processing_buffer():
b = ProcessingBuffer()
for fake_agent_id in range(4):
for step in range(9):
b[fake_agent_id]["vector_observation"].append(
[
100 * fake_agent_id + 10 * step + 1,
100 * fake_agent_id + 10 * step + 2,
100 * fake_agent_id + 10 * step + 3,
]
)
b[fake_agent_id]["action"].append(
[
100 * fake_agent_id + 10 * step + 4,
100 * fake_agent_id + 10 * step + 5,
]
)
def construct_fake_buffer(fake_agent_id):
b = AgentBuffer()
for step in range(9):
b["vector_observation"].append(
[
100 * fake_agent_id + 10 * step + 1,
100 * fake_agent_id + 10 * step + 2,
100 * fake_agent_id + 10 * step + 3,
]
)
b["action"].append(
[100 * fake_agent_id + 10 * step + 4, 100 * fake_agent_id + 10 * step + 5]
)
b = construct_fake_processing_buffer()
a = b[1]["vector_observation"].get_batch(
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
agent_3_buffer = construct_fake_buffer(3)
a = agent_1_buffer["vector_observation"].get_batch(
a = b[2]["vector_observation"].get_batch(
a = agent_2_buffer["vector_observation"].get_batch(
batch_size=2, training_length=3, sequential=True
)
assert_array(

]
),
)
a = b[2]["vector_observation"].get_batch(
a = agent_2_buffer["vector_observation"].get_batch(
batch_size=2, training_length=3, sequential=False
)
assert_array(

]
),
)
b[4].reset_agent()
assert len(b[4]) == 0
agent_1_buffer.reset_agent()
assert agent_1_buffer.num_experiences == 0
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_3_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
assert len(update_buffer["action"]) == 20
assert np.array(update_buffer["action"]).shape == (20, 2)

def test_buffer_sample():
b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
# Test non-LSTM
mb = update_buffer.sample_mini_batch(batch_size=4, sequence_length=1)
assert mb.keys() == update_buffer.keys()

def test_num_experiences():
b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
assert len(update_buffer["action"]) == 20
assert update_buffer.num_experiences == 20

b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
# Test LSTM, truncate should be some multiple of sequence_length
update_buffer.truncate(4, sequence_length=3)
assert update_buffer.num_experiences == 3

2
ml-agents/mlagents/trainers/tests/test_meta_curriculum.py


hidden_units: 128
lambd: 0.95
learning_rate: 5.0e-3
max_steps: 100
max_steps: 200
memory_size: 256
normalize: false
num_epoch: 3

174
ml-agents/mlagents/trainers/tests/test_ppo.py


from mlagents.trainers.ppo.models import PPOModel
from mlagents.trainers.ppo.trainer import PPOTrainer, discount_rewards
from mlagents.trainers.ppo.policy import PPOPolicy
from mlagents.trainers.rl_trainer import AllRewardsOutput
from mlagents.trainers.components.reward_signals import RewardSignalResult
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
from mlagents.trainers.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,

sequence_length: 64
summary_freq: 1000
use_recurrent: false
normalize: true
memory_size: 8
curiosity_strength: 0.0
curiosity_enc_size: 1

@mock.patch("mlagents_envs.environment.UnityEnvironment.get_communicator")
def test_ppo_get_value_estimates(mock_communicator, mock_launcher, dummy_config):
tf.reset_default_graph()
mock_communicator.return_value = MockCommunicator(
discrete_action=False, visual_inputs=0
brain_params = BrainParameters(
brain_name="test_brain",
vector_observation_space_size=1,
camera_resolutions=[],
vector_action_space_size=[2],
vector_action_descriptions=[],
vector_action_space_type=0,
env = UnityEnvironment(" ")
env.reset()
brain_name = env.get_agent_groups()[0]
brain_info = step_result_to_brain_info(
env.get_step_result(brain_name), env.get_agent_group_spec(brain_name)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
policy = PPOPolicy(0, brain_params, dummy_config, False, False)
time_horizon = 15
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=2,
brain_params = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
trainer_parameters = dummy_config
model_path = brain_name
trainer_parameters["model_path"] = model_path
trainer_parameters["keep_checkpoints"] = 3
policy = PPOPolicy(0, brain_params, trainer_parameters, False, False)
run_out = policy.get_value_estimates(brain_info, 0, done=False)
run_out = policy.get_value_estimates(trajectory.next_obs, "test_agent", done=False)
run_out = policy.get_value_estimates(brain_info, 0, done=True)
run_out = policy.get_value_estimates(trajectory.next_obs, "test_agent", done=True)
for key, val in run_out.items():
assert type(key) is str
assert val == 0.0

run_out = policy.get_value_estimates(brain_info, 0, done=True)
run_out = policy.get_value_estimates(trajectory.next_obs, "test_agent", done=True)
env.close()
agentbuffer = trajectory.to_agentbuffer()
batched_values = policy.get_batched_value_estimates(agentbuffer)
for values in batched_values.values():
assert len(values) == 15
def test_ppo_model_cc_vector():

trainer.update_policy()
def test_add_rewards_output(dummy_config):
def test_process_trajectory(dummy_config):
brain_params = BrainParameters(
brain_name="test_brain",
vector_observation_space_size=1,
camera_resolutions=[],
vector_action_space_size=[2],
vector_action_descriptions=[],
vector_action_space_type=0,
)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False)
time_horizon = 15
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
# Check that trainer put trajectory in update buffer
assert trainer.update_buffer.num_experiences == 15
# Check that GAE worked
assert (
"advantages" in trainer.update_buffer
and "discounted_returns" in trainer.update_buffer
)
# Check that the stats are being collected as episode isn't complete
for reward in trainer.collected_rewards.values():
for agent in reward.values():
assert agent > 0
# Add a terminal trajectory
trajectory = make_fake_trajectory(
length=time_horizon + 1,
max_step_complete=False,
vec_obs_size=1,
num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
# Check that the stats are reset as episode is finished
for reward in trainer.collected_rewards.values():
for agent in reward.values():
assert agent == 0
assert trainer.stats_reporter.get_stats_summaries("Policy/Extrinsic Reward").num > 0
def test_normalization(dummy_config):
brain_params = BrainParameters(
brain_name="test_brain",
vector_observation_space_size=1,

dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False)
rewardsout = AllRewardsOutput(
reward_signals={
"extrinsic": RewardSignalResult(
scaled_reward=np.array([1.0, 1.0], dtype=np.float32),
unscaled_reward=np.array([1.0, 1.0], dtype=np.float32),
)
},
environment=np.array([1.0, 1.0], dtype=np.float32),
time_horizon = 6
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=2,
)
# Change half of the obs to 0
for i in range(3):
trajectory.steps[i].obs[0] = np.zeros(1, dtype=np.float32)
trainer.process_trajectory(trajectory)
# Check that the running mean and variance is correct
steps, mean, variance = trainer.ppo_policy.sess.run(
[
trainer.policy.model.normalization_steps,
trainer.policy.model.running_mean,
trainer.policy.model.running_variance,
]
values = {"extrinsic": np.array([[2.0]], dtype=np.float32)}
agent_id = "123"
idx = 0
# make sure that we're grabbing from the next_idx for rewards. If we're not, the test will fail.
next_idx = 1
trainer.add_rewards_outputs(
rewardsout,
values=values,
agent_id=agent_id,
agent_idx=idx,
agent_next_idx=next_idx,
assert steps == 6
assert mean[0] == 0.5
# Note: variance is divided by number of steps, and initialized to 1 to avoid
# divide by 0. The right answer is 0.25
assert (variance[0] - 1) / steps == 0.25
# Make another update, this time with all 1's
time_horizon = 10
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=2,
assert trainer.processing_buffer[agent_id]["extrinsic_value_estimates"][0] == 2.0
assert trainer.processing_buffer[agent_id]["extrinsic_rewards"][0] == 1.0
trainer.process_trajectory(trajectory)
# Check that the running mean and variance is correct
steps, mean, variance = trainer.ppo_policy.sess.run(
[
trainer.policy.model.normalization_steps,
trainer.policy.model.running_mean,
trainer.policy.model.running_variance,
]
)
assert steps == 16
assert mean[0] == 0.8125
assert (variance[0] - 1) / steps == pytest.approx(0.152, abs=0.01)
if __name__ == "__main__":

48
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


import unittest.mock as mock
import pytest
from mlagents.trainers.tests.test_buffer import construct_fake_processing_buffer
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer
def dummy_config():

return mock_policy
@mock.patch("mlagents.trainers.rl_trainer.RLTrainer.add_policy_outputs")
@mock.patch("mlagents.trainers.rl_trainer.RLTrainer.add_rewards_outputs")
@pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"])
def test_rl_trainer(add_policy_outputs, add_rewards_outputs, num_vis_obs):
def test_rl_trainer():
trainer.policy = create_mock_policy()
fake_action_outputs = {
"action": [0.1, 0.1],
"value_heads": {},
"entropy": np.array([1.0], dtype=np.float32),
"learning_rate": 1.0,
}
mock_braininfo = mb.create_mock_braininfo(
num_agents=2,
num_vector_observations=8,
num_vector_acts=2,
num_vis_observations=num_vis_obs,
)
trainer.add_experiences(mock_braininfo, mock_braininfo, fake_action_outputs)
# Remove one of the agents
next_mock_braininfo = mb.create_mock_braininfo(
num_agents=1,
num_vector_observations=8,
num_vector_acts=2,
num_vis_observations=num_vis_obs,
)
brain_info = trainer.construct_curr_info(next_mock_braininfo)
# assert construct_curr_info worked properly
assert len(brain_info.agents) == 1
assert len(brain_info.visual_observations) == num_vis_obs
assert len(brain_info.vector_observations) == 1
agent_id = "0"
trainer.episode_steps[agent_id] = 3
trainer.collected_rewards["extrinsic"] = {agent_id: 3}
assert len(trainer.processing_buffer[agent_id]["action"]) == 0
for rewards in trainer.collected_rewards.values():
for agent_id in rewards:
assert rewards[agent_id] == 0

trainer = create_rl_trainer()
trainer.processing_buffer = construct_fake_processing_buffer()
trainer.update_buffer = AgentBuffer()
trainer.processing_buffer.append_to_update_buffer(
trainer.update_buffer, 2, batch_size=None, training_length=2
)
trainer.update_buffer = construct_fake_buffer(0)
trainer.clear_update_buffer()
for _, arr in trainer.update_buffer.items():
assert len(arr) == 0

38
ml-agents/mlagents/trainers/tests/test_sac.py


from mlagents.trainers.sac.trainer import SACTrainer
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.mock_brain import make_brain_parameters
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
@pytest.fixture

# Wipe Trainer and try to load
trainer2 = SACTrainer(mock_brain, 1, trainer_params, True, True, 0, 0)
assert trainer2.update_buffer.num_experiences == buffer_len
def test_process_trajectory(dummy_config):
brain_params = make_brain_parameters(
discrete_action=False, visual_inputs=0, vec_obs_size=6
)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = SACTrainer(brain_params, 0, dummy_config, True, False, 0, "0")
trajectory = make_fake_trajectory(
length=15, max_step_complete=True, vec_obs_size=6, num_vis_obs=0, action_space=2
)
trainer.process_trajectory(trajectory)
# Check that trainer put trajectory in update buffer
assert trainer.update_buffer.num_experiences == 15
# Check that the stats are being collected as episode isn't complete
for reward in trainer.collected_rewards.values():
for agent in reward.values():
assert agent > 0
# Add a terminal trajectory
trajectory = make_fake_trajectory(
length=15,
max_step_complete=False,
vec_obs_size=6,
num_vis_obs=0,
action_space=2,
)
trainer.process_trajectory(trajectory)
# Check that the stats are reset as episode is finished
for reward in trainer.collected_rewards.values():
for agent in reward.values():
assert agent == 0
assert trainer.stats_reporter.get_stats_summaries("Policy/Extrinsic Reward").num > 0
if __name__ == "__main__":

3
ml-agents/mlagents/trainers/tests/test_simple_rl.py


from mlagents.trainers.brain import BrainParameters
from mlagents.trainers.simple_env_manager import SimpleEnvManager
from mlagents.trainers.sampler_class import SamplerManager
from mlagents.trainers.stats import StatsReporter
from mlagents_envs.side_channel.float_properties_channel import FloatPropertiesChannel
BRAIN_NAME = __name__

run_id = "id"
save_freq = 99999
seed = 1337
StatsReporter.writers.clear() # Clear StatsReporters so we don't write to file
trainer_config = yaml.safe_load(config)
env_manager = SimpleEnvManager(env, FloatPropertiesChannel())
trainer_factory = TrainerFactory(

22
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


import pytest
from mlagents.tf_utils import tf
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.trainer_controller import TrainerController, AgentManager
from mlagents.trainers.subprocess_env_manager import EnvironmentStep
from mlagents.trainers.sampler_class import SamplerManager

trainer_mock.parameters = {"some": "parameter"}
trainer_mock.write_tensorboard_text = MagicMock()
processor_mock = MagicMock()
tc.managers = {"testbrain": AgentManager(processor=processor_mock)}
return tc, trainer_mock

env_mock.reset.return_value = [old_step_info]
tc.advance(env_mock)
trainer_mock.add_experiences.assert_called_once_with(
processor_mock = tc.managers[brain_name].processor
processor_mock.add_experiences.assert_called_once_with(
trainer_mock.process_experiences.assert_called_once_with(
new_step_info.previous_all_brain_info[brain_name],
new_step_info.current_all_brain_info[brain_name],
)
trainer_mock.update_policy.assert_called_once()
trainer_mock.increment_step.assert_called_once()

tc.advance(env_mock)
env_mock.reset.assert_not_called()
env_mock.step.assert_called_once()
trainer_mock.add_experiences.assert_called_once_with(
processor_mock = tc.managers[brain_name].processor
processor_mock.add_experiences.assert_called_once_with(
)
trainer_mock.process_experiences.assert_called_once_with(
new_step_info.previous_all_brain_info[brain_name],
new_step_info.current_all_brain_info[brain_name],
)
trainer_mock.advance.assert_called_once()

4
ml-agents/mlagents/trainers/tests/test_trainer_util.py


base_config = dummy_config_with_override
expected_config = base_config["default"]
expected_config["summary_path"] = summaries_dir + f"/{run_id}_testbrain"
expected_config["summary_path"] = f"{run_id}_testbrain"
expected_config["model_path"] = model_path + "/testbrain"
expected_config["keep_checkpoints"] = keep_checkpoints

base_config = dummy_config
expected_config = base_config["default"]
expected_config["summary_path"] = summaries_dir + f"/{run_id}_testbrain"
expected_config["summary_path"] = f"{run_id}_testbrain"
expected_config["model_path"] = model_path + "/testbrain"
expected_config["keep_checkpoints"] = keep_checkpoints

62
ml-agents/mlagents/trainers/tf_policy.py


from tensorflow.python.platform import gfile
from tensorflow.python.framework import graph_util
from mlagents.trainers import tensorflow_to_barracuda as tf2bc
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.brain import BrainInfo

self.model.update_normalization,
feed_dict={self.model.vector_in: vector_obs},
)
def get_batched_value_estimates(self, batch: AgentBuffer) -> Dict[str, np.ndarray]:
feed_dict: Dict[tf.Tensor, Any] = {
self.model.batch_size: batch.num_experiences,
self.model.sequence_length: 1, # We want to feed data in batch-wise, not time-wise.
}
if self.use_vec_obs:
feed_dict[self.model.vector_in] = batch["vector_obs"]
if self.model.vis_obs_size > 0:
for i in range(len(self.model.visual_in)):
_obs = batch["visual_obs%d" % i]
feed_dict[self.model.visual_in[i]] = _obs
if self.use_recurrent:
feed_dict[self.model.memory_in] = batch["memory"]
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = batch["prev_action"]
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: np.squeeze(v, axis=1) for k, v in value_estimates.items()}
return value_estimates
def get_value_estimates(
self, next_obs: List[np.ndarray], agent_id: str, done: bool
) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param experience: AgentExperience to be used for bootstrapping.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.model.batch_size: 1,
self.model.sequence_length: 1,
}
vec_vis_obs = SplitObservations.from_observations(next_obs)
for i in range(len(vec_vis_obs.visual_observations)):
feed_dict[self.model.visual_in[i]] = [vec_vis_obs.visual_observations[i]]
if self.use_vec_obs:
feed_dict[self.model.vector_in] = [vec_vis_obs.vector_observations]
if self.use_recurrent:
feed_dict[self.model.memory_in] = self.retrieve_memories([agent_id])
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = self.retrieve_previous_action(
[agent_id]
)
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates
@property
def vis_obs_size(self):

108
ml-agents/mlagents/trainers/trainer.py


# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Deque, Any
import os
import numpy as np
from collections import deque, defaultdict
from collections import deque
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.brain import BrainParameters, BrainInfo
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.brain import BrainParameters
LOGGER = logging.getLogger("mlagents.trainers")

self.run_id = run_id
self.trainer_parameters = trainer_parameters
self.summary_path = trainer_parameters["summary_path"]
if not os.path.exists(self.summary_path):
os.makedirs(self.summary_path)
self.stats_reporter = StatsReporter(self.summary_path)
self.stats: Dict[str, List] = defaultdict(list)
self.summary_writer = tf.summary.FileWriter(self.summary_path)
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
self.policy: TFPolicy = None # type: ignore # this will always get set
self.step: int = 0

"brain {2}.".format(k, self.__class__, self.brain_name)
)
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
"""
Saves text to Tensorboard.
Note: Only works on tensorflow r1.2 or above.
:param key: The name of the text.
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
"""
try:
with tf.Session() as sess:
s_op = tf.summary.text(
key,
tf.convert_to_tensor(
([[str(x), str(input_dict[x])] for x in input_dict])
),
)
s = sess.run(s_op)
self.stats_reporter.write_text(s, self.get_step)
except Exception:
LOGGER.info("Could not write text summary for Tensorboard.")
pass
def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
"""
Takes a parameter dictionary and converts it to a human-readable string.

"""
self.trainer_metrics.write_training_metrics()
def write_summary(
self, global_step: int, delta_train_start: float, lesson_num: int = 0
) -> None:
def write_summary(self, global_step: int, delta_train_start: float) -> None:
:param lesson_num: Current lesson number in curriculum.
:param global_step: The number of steps the simulation has been going for
"""
if (

else "Not Training."
)
step = min(self.get_step, self.get_max_steps)
if len(self.stats["Environment/Cumulative Reward"]) > 0:
mean_reward = np.mean(self.stats["Environment/Cumulative Reward"])
stats_summary = self.stats_reporter.get_stats_summaries(
"Environment/Cumulative Reward"
)
if stats_summary.num > 0:
LOGGER.info(
" {}: {}: Step: {}. "
"Time Elapsed: {:0.3f} s "

self.brain_name,
step,
delta_train_start,
mean_reward,
np.std(self.stats["Environment/Cumulative Reward"]),
stats_summary.mean,
stats_summary.std,
set_gauge(f"{self.brain_name}.mean_reward", mean_reward)
set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean)
else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(

summary = tf.Summary()
for key in self.stats:
if len(self.stats[key]) > 0:
stat_mean = float(np.mean(self.stats[key]))
summary.value.add(tag="{}".format(key), simple_value=stat_mean)
self.stats[key] = []
summary.value.add(tag="Environment/Lesson", simple_value=lesson_num)
self.summary_writer.add_summary(summary, step)
self.summary_writer.flush()
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
"""
Saves text to Tensorboard.
Note: Only works on tensorflow r1.2 or above.
:param key: The name of the text.
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
"""
try:
with tf.Session() as sess:
s_op = tf.summary.text(
key,
tf.convert_to_tensor(
([[str(x), str(input_dict[x])] for x in input_dict])
),
)
s = sess.run(s_op)
self.summary_writer.add_summary(s, self.get_step)
except Exception:
LOGGER.info(
"Cannot write text summary for Tensorboard. Tensorflow version must be r1.2 or above."
)
pass
self.stats_reporter.write_stats(int(step))
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
def process_trajectory(self, trajectory: Trajectory) -> None:
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
raise UnityTrainerException("The add_experiences method was not implemented.")
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
Takes a trajectory and processes it, putting it into the update buffer.
:param current_info: current BrainInfo.
:param next_info: next BrainInfo.
:param trajectory: The Trajectory tuple containing the steps to be processed.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."

39
ml-agents/mlagents/trainers/trainer_controller.py


"""Launches trainers for each External Brains in a Unity Environment."""
import os
import sys
from typing import Dict, List, Optional, Set
from typing import Dict, List, Optional, Set, NamedTuple
import numpy as np
from mlagents.tf_utils import tf

from mlagents.trainers.trainer import Trainer, TrainerMetrics
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.agent_processor import AgentProcessor
class AgentManager(NamedTuple):
processor: AgentProcessor
class TrainerController(object):

:param resampling_interval: Specifies number of simulation steps after which reset parameters are resampled.
"""
self.trainers: Dict[str, Trainer] = {}
self.managers: Dict[str, AgentManager] = {}
self.trainer_factory = trainer_factory
self.model_path = model_path
self.summaries_dir = summaries_dir

self.meta_curriculum
and brain_name in self.meta_curriculum.brains_to_curriculums
):
trainer.write_summary(
global_step,
delta_train_start,
lesson_num=self.meta_curriculum.brains_to_curriculums[
brain_name
].lesson_num,
)
else:
trainer.write_summary(global_step, delta_train_start)
lesson_num = self.meta_curriculum.brains_to_curriculums[
brain_name
].lesson_num
trainer.stats_reporter.add_stat("Environment/Lesson", lesson_num)
trainer.write_summary(global_step, delta_train_start)
def start_trainer(self, trainer: Trainer, env_manager: EnvManager) -> None:
self.trainers[trainer.brain_name] = trainer

env_manager.external_brains[name]
)
self.start_trainer(trainer, env_manager)
agent_manager = AgentManager(
processor=AgentProcessor(
trainer,
trainer.policy,
trainer.stats_reporter,
trainer.parameters.get("time_horizon", sys.maxsize),
)
)
self.managers[name] = agent_manager
last_brain_names = external_brains
n_steps = self.advance(env_manager)
for i in range(n_steps):

if brain_name in self.trainer_metrics:
self.trainer_metrics[brain_name].add_delta_step(delta_time_step)
if step_info.has_actions_for_brain(brain_name):
trainer.add_experiences(
_processor = self.managers[brain_name].processor
_processor.add_experiences(
)
trainer.process_experiences(
step_info.previous_all_brain_info[brain_name],
step_info.current_all_brain_info[brain_name],
)
for brain_name, trainer in self.trainers.items():
if brain_name in self.trainer_metrics:

4
ml-agents/mlagents/trainers/trainer_util.py


)
trainer_parameters = trainer_config.get("default", {}).copy()
trainer_parameters["summary_path"] = "{basedir}/{name}".format(
basedir=summaries_dir, name=str(run_id) + "_" + brain_name
)
trainer_parameters["summary_path"] = str(run_id) + "_" + brain_name
trainer_parameters["model_path"] = "{basedir}/{name}".format(
basedir=model_path, name=brain_name
)

118
ml-agents/mlagents/trainers/stats.py


from collections import defaultdict
from typing import List, Dict, NamedTuple
import numpy as np
import abc
import os
from mlagents.tf_utils import tf
class StatsWriter(abc.ABC):
"""
A StatsWriter abstract class. A StatsWriter takes in a category, key, scalar value, and step
and writes it out by some method.
"""
@abc.abstractmethod
def write_stats(self, category: str, key: str, value: float, step: int) -> None:
pass
@abc.abstractmethod
def write_text(self, category: str, text: str, step: int) -> None:
pass
class TensorboardWriter(StatsWriter):
def __init__(self, base_dir: str):
self.summary_writers: Dict[str, tf.summary.FileWriter] = {}
self.base_dir: str = base_dir
def write_stats(self, category: str, key: str, value: float, step: int) -> None:
self._maybe_create_summary_writer(category)
summary = tf.Summary()
summary.value.add(tag="{}".format(key), simple_value=value)
self.summary_writers[category].add_summary(summary, step)
self.summary_writers[category].flush()
def _maybe_create_summary_writer(self, category: str) -> None:
if category not in self.summary_writers:
filewriter_dir = "{basedir}/{category}".format(
basedir=self.base_dir, category=category
)
os.makedirs(filewriter_dir, exist_ok=True)
self.summary_writers[category] = tf.summary.FileWriter(filewriter_dir)
def write_text(self, category: str, text: str, step: int) -> None:
self._maybe_create_summary_writer(category)
self.summary_writers[category].add_summary(text, step)
class StatsSummary(NamedTuple):
mean: float
std: float
num: int
class StatsReporter:
writers: List[StatsWriter] = []
stats_dict: Dict[str, Dict[str, List]] = defaultdict(lambda: defaultdict(list))
def __init__(self, category):
"""
Generic StatsReporter. A category is the broadest type of storage (would
correspond the run name and trainer name, e.g. 3DBalltest_3DBall. A key is the
type of stat it is (e.g. Environment/Reward). Finally the Value is the float value
attached to this stat.
"""
self.category: str = category
@staticmethod
def add_writer(writer: StatsWriter) -> None:
StatsReporter.writers.append(writer)
def add_stat(self, key: str, value: float) -> None:
"""
Add a float value stat to the StatsReporter.
:param category: The highest categorization of the statistic, e.g. behavior name.
:param key: The type of statistic, e.g. Environment/Reward.
:param value: the value of the statistic.
"""
StatsReporter.stats_dict[self.category][key].append(value)
def write_stats(self, step: int) -> None:
"""
Write out all stored statistics that fall under the category specified.
The currently stored values will be averaged, written out as a single value,
and the buffer cleared.
:param category: The category which to write out the stats.
:param step: Training step which to write these stats as.
"""
for key in StatsReporter.stats_dict[self.category]:
if len(StatsReporter.stats_dict[self.category][key]) > 0:
stat_mean = float(np.mean(StatsReporter.stats_dict[self.category][key]))
for writer in StatsReporter.writers:
writer.write_stats(self.category, key, stat_mean, step)
del StatsReporter.stats_dict[self.category]
def write_text(self, text: str, step: int) -> None:
"""
Write out some text.
:param category: The highest categorization of the statistic, e.g. behavior name.
:param text: The text to write out.
:param step: Training step which to write these stats as.
"""
for writer in StatsReporter.writers:
writer.write_text(self.category, text, step)
def get_stats_summaries(self, key: str) -> StatsSummary:
"""
Get the mean, std, and count of a particular statistic, since last write.
:param category: The highest categorization of the statistic, e.g. behavior name.
:param key: The type of statistic, e.g. Environment/Reward.
:returns: A StatsSummary NamedTuple containing (mean, std, count).
"""
return StatsSummary(
mean=np.mean(StatsReporter.stats_dict[self.category][key]),
std=np.std(StatsReporter.stats_dict[self.category][key]),
num=len(StatsReporter.stats_dict[self.category][key]),
)

63
ml-agents/mlagents/trainers/tests/test_agent_processor.py


import unittest.mock as mock
import pytest
import mlagents.trainers.tests.mock_brain as mb
import numpy as np
from mlagents.trainers.agent_processor import AgentProcessor
from mlagents.trainers.stats import StatsReporter
def create_mock_brain():
mock_brain = mb.create_mock_brainparams(
vector_action_space_type="continuous",
vector_action_space_size=[2],
vector_observation_space_size=8,
number_visual_observations=1,
)
return mock_brain
def create_mock_policy():
mock_policy = mock.Mock()
mock_policy.reward_signals = {}
mock_policy.retrieve_memories.return_value = np.zeros((1, 1), dtype=np.float32)
mock_policy.retrieve_previous_action.return_value = np.zeros(
(1, 1), dtype=np.float32
)
return mock_policy
@pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"])
def test_agentprocessor(num_vis_obs):
policy = create_mock_policy()
trainer = mock.Mock()
processor = AgentProcessor(
trainer,
policy,
max_trajectory_length=5,
stats_reporter=StatsReporter("testcat"),
)
fake_action_outputs = {
"action": [0.1, 0.1],
"entropy": np.array([1.0], dtype=np.float32),
"learning_rate": 1.0,
"pre_action": [0.1, 0.1],
"log_probs": [0.1, 0.1],
}
mock_braininfo = mb.create_mock_braininfo(
num_agents=2,
num_vector_observations=8,
num_vector_acts=2,
num_vis_observations=num_vis_obs,
)
for i in range(5):
processor.add_experiences(mock_braininfo, mock_braininfo, fake_action_outputs)
# Assert that two trajectories have been added to the Trainer
assert len(trainer.process_trajectory.call_args_list) == 2
# Assert that the trajectory is of length 5
trajectory = trainer.process_trajectory.call_args_list[0][0][0]
assert len(trajectory.steps) == 5
# Assert that the AgentProcessor is empty
assert len(processor.experience_buffers[0]) == 0

80
ml-agents/mlagents/trainers/tests/test_stats.py


import unittest.mock as mock
import os
import pytest
import tempfile
from mlagents.trainers.stats import StatsReporter, TensorboardWriter
def test_stat_reporter_add_summary_write():
# Test add_writer
StatsReporter.writers.clear()
mock_writer1 = mock.Mock()
mock_writer2 = mock.Mock()
StatsReporter.add_writer(mock_writer1)
StatsReporter.add_writer(mock_writer2)
assert len(StatsReporter.writers) == 2
# Test add_stats and summaries
statsreporter1 = StatsReporter("category1")
statsreporter2 = StatsReporter("category2")
for i in range(10):
statsreporter1.add_stat("key1", float(i))
statsreporter2.add_stat("key2", float(i))
statssummary1 = statsreporter1.get_stats_summaries("key1")
statssummary2 = statsreporter2.get_stats_summaries("key2")
assert statssummary1.num == 10
assert statssummary2.num == 10
assert statssummary1.mean == 4.5
assert statssummary2.mean == 4.5
assert statssummary1.std == pytest.approx(2.9, abs=0.1)
assert statssummary2.std == pytest.approx(2.9, abs=0.1)
# Test write_stats
step = 10
statsreporter1.write_stats(step)
mock_writer1.write_stats.assert_called_once_with("category1", "key1", 4.5, step)
mock_writer2.write_stats.assert_called_once_with("category1", "key1", 4.5, step)
def test_stat_reporter_text():
# Test add_writer
mock_writer = mock.Mock()
StatsReporter.writers.clear()
StatsReporter.add_writer(mock_writer)
assert len(StatsReporter.writers) == 1
statsreporter1 = StatsReporter("category1")
# Test write_text
step = 10
statsreporter1.write_text("this is a text", step)
mock_writer.write_text.assert_called_once_with("category1", "this is a text", step)
@mock.patch("mlagents.tf_utils.tf.Summary")
@mock.patch("mlagents.tf_utils.tf.summary.FileWriter")
def test_tensorboard_writer(mock_filewriter, mock_summary):
# Test write_stats
category = "category1"
with tempfile.TemporaryDirectory(prefix="unittest-") as base_dir:
tb_writer = TensorboardWriter(base_dir)
tb_writer.write_stats("category1", "key1", 1.0, 10)
# Test that the filewriter has been created and the directory has been created.
filewriter_dir = "{basedir}/{category}".format(
basedir=base_dir, category=category
)
assert os.path.exists(filewriter_dir)
mock_filewriter.assert_called_once_with(filewriter_dir)
# Test that the filewriter was written to and the summary was added.
mock_summary.return_value.value.add.assert_called_once_with(
tag="key1", simple_value=1.0
)
mock_filewriter.return_value.add_summary.assert_called_once_with(
mock_summary.return_value, 10
)
mock_filewriter.return_value.flush.assert_called_once()

110
ml-agents/mlagents/trainers/tests/test_trajectory.py


import numpy as np
import pytest
from mlagents.trainers.trajectory import AgentExperience, Trajectory, SplitObservations
VEC_OBS_SIZE = 6
ACTION_SIZE = 4
def make_fake_trajectory(
length: int,
max_step_complete: bool = False,
vec_obs_size: int = VEC_OBS_SIZE,
num_vis_obs: int = 1,
action_space: int = ACTION_SIZE,
) -> Trajectory:
"""
Makes a fake trajectory of length length. If max_step_complete,
the trajectory is terminated by a max step rather than a done.
"""
steps_list = []
for i in range(length - 1):
obs = []
for i in range(num_vis_obs):
obs.append(np.ones((84, 84, 3), dtype=np.float32))
obs.append(np.ones(vec_obs_size, dtype=np.float32))
reward = 1.0
done = False
action = np.zeros(action_space, dtype=np.float32)
action_probs = np.ones(action_space, dtype=np.float32)
action_pre = np.zeros(action_space, dtype=np.float32)
action_mask = np.ones(action_space, dtype=np.float32)
prev_action = np.ones(action_space, dtype=np.float32)
max_step = False
memory = np.ones(10, dtype=np.float32)
agent_id = "test_agent"
experience = AgentExperience(
obs=obs,
reward=reward,
done=done,
action=action,
action_probs=action_probs,
action_pre=action_pre,
action_mask=action_mask,
prev_action=prev_action,
max_step=max_step,
memory=memory,
)
steps_list.append(experience)
last_experience = AgentExperience(
obs=obs,
reward=reward,
done=not max_step_complete,
action=action,
action_probs=action_probs,
action_pre=action_pre,
action_mask=action_mask,
prev_action=prev_action,
max_step=max_step_complete,
memory=memory,
)
steps_list.append(last_experience)
return Trajectory(steps=steps_list, agent_id=agent_id, next_obs=obs)
@pytest.mark.parametrize("num_visual_obs", [0, 1, 2])
@pytest.mark.parametrize("num_vec_obs", [0, 1])
def test_split_obs(num_visual_obs, num_vec_obs):
obs = []
for i in range(num_visual_obs):
obs.append(np.ones((84, 84, 3), dtype=np.float32))
for i in range(num_vec_obs):
obs.append(np.ones(VEC_OBS_SIZE, dtype=np.float32))
split_observations = SplitObservations.from_observations(obs)
if num_vec_obs == 1:
assert len(split_observations.vector_observations) == VEC_OBS_SIZE
else:
assert len(split_observations.vector_observations) == 0
# Assert the number of vector observations.
assert len(split_observations.visual_observations) == num_visual_obs
def test_trajectory_to_agentbuffer():
length = 15
wanted_keys = [
"next_visual_obs0",
"visual_obs0",
"vector_obs",
"next_vector_in",
"memory",
"masks",
"done",
"actions_pre",
"actions",
"action_probs",
"action_mask",
"prev_action",
"environment_rewards",
]
wanted_keys = set(wanted_keys)
trajectory = make_fake_trajectory(length=length)
agentbuffer = trajectory.to_agentbuffer()
seen_keys = set()
for key, field in agentbuffer.items():
assert len(field) == length
seen_keys.add(key)
assert seen_keys == wanted_keys

128
ml-agents/mlagents/trainers/trajectory.py


from typing import List, NamedTuple
import numpy as np
from mlagents.trainers.buffer import AgentBuffer
class AgentExperience(NamedTuple):
obs: List[np.ndarray]
reward: float
done: bool
action: np.ndarray
action_probs: np.ndarray
action_pre: np.ndarray # TODO: Remove this
action_mask: np.ndarray
prev_action: np.ndarray
max_step: bool
memory: np.ndarray
class SplitObservations(NamedTuple):
vector_observations: np.ndarray
visual_observations: List[np.ndarray]
@staticmethod
def from_observations(obs: List[np.ndarray]) -> "SplitObservations":
"""
Divides a List of numpy arrays into a SplitObservations NamedTuple.
This allows you to access the vector and visual observations directly,
without enumerating the list over and over.
:param obs: List of numpy arrays (observation)
:returns: A SplitObservations object.
"""
vis_obs_list: List[np.ndarray] = []
vec_obs_list: List[np.ndarray] = []
for observation in obs:
if len(observation.shape) == 1:
vec_obs_list.append(observation)
if len(observation.shape) == 3:
vis_obs_list.append(observation)
vec_obs = (
np.concatenate(vec_obs_list, axis=0)
if len(vec_obs_list) > 0
else np.array([], dtype=np.float32)
)
return SplitObservations(
vector_observations=vec_obs, visual_observations=vis_obs_list
)
class Trajectory(NamedTuple):
steps: List[AgentExperience]
next_obs: List[
np.ndarray
] # Observation following the trajectory, for bootstrapping
agent_id: str
def to_agentbuffer(self) -> AgentBuffer:
"""
Converts a Trajectory to an AgentBuffer
:param trajectory: A Trajectory
:returns: AgentBuffer. Note that the length of the AgentBuffer will be one
less than the trajectory, as the next observation need to be populated from the last
step of the trajectory.
"""
agent_buffer_trajectory = AgentBuffer()
vec_vis_obs = SplitObservations.from_observations(self.steps[0].obs)
for step, exp in enumerate(self.steps):
if step < len(self.steps) - 1:
next_vec_vis_obs = SplitObservations.from_observations(
self.steps[step + 1].obs
)
else:
next_vec_vis_obs = SplitObservations.from_observations(self.next_obs)
for i, _ in enumerate(vec_vis_obs.visual_observations):
agent_buffer_trajectory["visual_obs%d" % i].append(
vec_vis_obs.visual_observations[i]
)
agent_buffer_trajectory["next_visual_obs%d" % i].append(
next_vec_vis_obs.visual_observations[i]
)
agent_buffer_trajectory["vector_obs"].append(
vec_vis_obs.vector_observations
)
agent_buffer_trajectory["next_vector_in"].append(
next_vec_vis_obs.vector_observations
)
if exp.memory is not None:
agent_buffer_trajectory["memory"].append(exp.memory)
agent_buffer_trajectory["masks"].append(1.0)
agent_buffer_trajectory["done"].append(exp.done)
# Add the outputs of the last eval
if exp.action_pre is not None:
actions_pre = exp.action_pre
agent_buffer_trajectory["actions_pre"].append(actions_pre)
# value is a dictionary from name of reward to value estimate of the value head
agent_buffer_trajectory["actions"].append(exp.action)
agent_buffer_trajectory["action_probs"].append(exp.action_probs)
# Store action masks if necessary. Eventually these will be
# None for continuous actions
if exp.action_mask is not None:
agent_buffer_trajectory["action_mask"].append(
exp.action_mask, padding_value=1
)
agent_buffer_trajectory["prev_action"].append(exp.prev_action)
agent_buffer_trajectory["environment_rewards"].append(exp.reward)
# Store the next visual obs as the current
vec_vis_obs = next_vec_vis_obs
return agent_buffer_trajectory
@property
def done_reached(self) -> bool:
"""
Returns true if trajectory is terminated with a Done.
"""
return self.steps[-1].done
@property
def max_step_reached(self) -> bool:
"""
Returns true if trajectory was terminated because max steps was reached.
"""
return self.steps[-1].max_step
正在加载...
取消
保存