您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
326 行
12 KiB
326 行
12 KiB
# # Unity ML-Agents Toolkit
|
|
from typing import Dict, List, Deque, Any
|
|
import time
|
|
import abc
|
|
|
|
from mlagents.tf_utils import tf
|
|
from mlagents import tf_utils
|
|
|
|
from collections import deque
|
|
|
|
from mlagents_envs.timers import set_gauge
|
|
from mlagents_envs.logging_util import get_logger
|
|
from mlagents.model_serialization import export_policy_model, SerializationSettings
|
|
from mlagents.trainers.policy.tf_policy import TFPolicy
|
|
from mlagents.trainers.stats import StatsReporter
|
|
from mlagents.trainers.trajectory import Trajectory
|
|
from mlagents.trainers.agent_processor import AgentManagerQueue
|
|
from mlagents.trainers.brain import BrainParameters
|
|
from mlagents.trainers.policy import Policy
|
|
from mlagents.trainers.exception import UnityTrainerException
|
|
from mlagents_envs.timers import hierarchical_timer
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class Trainer(abc.ABC):
|
|
"""This class is the base class for the mlagents_envs.trainers"""
|
|
|
|
def __init__(
|
|
self,
|
|
brain_name: str,
|
|
trainer_parameters: dict,
|
|
training: bool,
|
|
run_id: str,
|
|
reward_buff_cap: int = 1,
|
|
):
|
|
"""
|
|
Responsible for collecting experiences and training a neural network model.
|
|
:BrainParameters brain: Brain to be trained.
|
|
:dict trainer_parameters: The parameters for the trainer (dictionary).
|
|
:bool training: Whether the trainer is set for training.
|
|
:str run_id: The identifier of the current run
|
|
:int reward_buff_cap:
|
|
"""
|
|
self.param_keys: List[str] = []
|
|
self.brain_name = brain_name
|
|
self.run_id = run_id
|
|
self.trainer_parameters = trainer_parameters
|
|
self.summary_path = trainer_parameters["summary_path"]
|
|
self.stats_reporter = StatsReporter(self.summary_path)
|
|
self.cumulative_returns_since_policy_update: List[float] = []
|
|
self.is_training = training
|
|
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
|
|
self.policy_queues: List[AgentManagerQueue[Policy]] = []
|
|
self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
|
|
self.step: int = 0
|
|
self.training_start_time = time.time()
|
|
self.summary_freq = self.trainer_parameters["summary_freq"]
|
|
self.next_summary_step = self.summary_freq
|
|
|
|
def _check_param_keys(self):
|
|
for k in self.param_keys:
|
|
if k not in self.trainer_parameters:
|
|
raise UnityTrainerException(
|
|
"The hyper-parameter {0} could not be found for the {1} trainer of "
|
|
"brain {2}.".format(k, self.__class__, self.brain_name)
|
|
)
|
|
|
|
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
|
|
"""
|
|
Saves text to Tensorboard.
|
|
Note: Only works on tensorflow r1.2 or above.
|
|
:param key: The name of the text.
|
|
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
|
|
"""
|
|
try:
|
|
with tf.Session(config=tf_utils.generate_session_config()) as sess:
|
|
s_op = tf.summary.text(
|
|
key,
|
|
tf.convert_to_tensor(
|
|
([[str(x), str(input_dict[x])] for x in input_dict])
|
|
),
|
|
)
|
|
s = sess.run(s_op)
|
|
self.stats_reporter.write_text(s, self.get_step)
|
|
except Exception:
|
|
logger.info("Could not write text summary for Tensorboard.")
|
|
pass
|
|
|
|
def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
|
|
"""
|
|
Takes a parameter dictionary and converts it to a human-readable string.
|
|
Recurses if there are multiple levels of dict. Used to print out hyperaparameters.
|
|
param: param_dict: A Dictionary of key, value parameters.
|
|
return: A string version of this dictionary.
|
|
"""
|
|
if not isinstance(param_dict, dict):
|
|
return str(param_dict)
|
|
else:
|
|
append_newline = "\n" if num_tabs > 0 else ""
|
|
return append_newline + "\n".join(
|
|
[
|
|
"\t"
|
|
+ " " * num_tabs
|
|
+ "{0}:\t{1}".format(
|
|
x, self._dict_to_str(param_dict[x], num_tabs + 1)
|
|
)
|
|
for x in param_dict
|
|
]
|
|
)
|
|
|
|
def __str__(self) -> str:
|
|
return """Hyperparameters for the {0} of brain {1}: \n{2}""".format(
|
|
self.__class__.__name__,
|
|
self.brain_name,
|
|
self._dict_to_str(self.trainer_parameters, 0),
|
|
)
|
|
|
|
@property
|
|
def parameters(self) -> Dict[str, Any]:
|
|
"""
|
|
Returns the trainer parameters of the trainer.
|
|
"""
|
|
return self.trainer_parameters
|
|
|
|
@property
|
|
def get_max_steps(self) -> int:
|
|
"""
|
|
Returns the maximum number of steps. Is used to know when the trainer should be stopped.
|
|
:return: The maximum number of steps of the trainer
|
|
"""
|
|
return int(float(self.trainer_parameters["max_steps"]))
|
|
|
|
@property
|
|
def get_step(self) -> int:
|
|
"""
|
|
Returns the number of steps the trainer has performed
|
|
:return: the step count of the trainer
|
|
"""
|
|
return self.step
|
|
|
|
@property
|
|
def should_still_train(self) -> bool:
|
|
"""
|
|
Returns whether or not the trainer should train. A Trainer could
|
|
stop training if it wasn't training to begin with, or if max_steps
|
|
is reached.
|
|
"""
|
|
return self.is_training and self.get_step <= self.get_max_steps
|
|
|
|
@property
|
|
def reward_buffer(self) -> Deque[float]:
|
|
"""
|
|
Returns the reward buffer. The reward buffer contains the cumulative
|
|
rewards of the most recent episodes completed by agents using this
|
|
trainer.
|
|
:return: the reward buffer.
|
|
"""
|
|
return self._reward_buffer
|
|
|
|
def _increment_step(self, n_steps: int, name_behavior_id: str) -> None:
|
|
"""
|
|
Increment the step count of the trainer
|
|
:param n_steps: number of steps to increment the step count by
|
|
"""
|
|
self.step += n_steps
|
|
self.next_summary_step = self._get_next_summary_step()
|
|
p = self.get_policy(name_behavior_id)
|
|
if p:
|
|
p.increment_step(n_steps)
|
|
|
|
def _get_next_summary_step(self) -> int:
|
|
"""
|
|
Get the next step count that should result in a summary write.
|
|
"""
|
|
return self.step + (self.summary_freq - self.step % self.summary_freq)
|
|
|
|
def save_model(self, name_behavior_id: str) -> None:
|
|
"""
|
|
Saves the model
|
|
"""
|
|
self.get_policy(name_behavior_id).save_model(self.get_step)
|
|
|
|
def export_model(self, name_behavior_id: str) -> None:
|
|
"""
|
|
Exports the model
|
|
"""
|
|
policy = self.get_policy(name_behavior_id)
|
|
settings = SerializationSettings(policy.model_path, policy.brain.brain_name)
|
|
export_policy_model(settings, policy.graph, policy.sess)
|
|
|
|
def _write_summary(self, step: int) -> None:
|
|
"""
|
|
Saves training statistics to Tensorboard.
|
|
"""
|
|
is_training = "Training." if self.should_still_train else "Not Training."
|
|
stats_summary = self.stats_reporter.get_stats_summaries(
|
|
"Environment/Cumulative Reward"
|
|
)
|
|
if stats_summary.num > 0:
|
|
logger.info(
|
|
"{}: {}: Step: {}. "
|
|
"Time Elapsed: {:0.3f} s "
|
|
"Mean "
|
|
"Reward: {:0.3f}"
|
|
". Std of Reward: {:0.3f}. {}".format(
|
|
self.run_id,
|
|
self.brain_name,
|
|
step,
|
|
time.time() - self.training_start_time,
|
|
stats_summary.mean,
|
|
stats_summary.std,
|
|
is_training,
|
|
)
|
|
)
|
|
set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean)
|
|
else:
|
|
logger.info(
|
|
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(
|
|
self.run_id, self.brain_name, step, is_training
|
|
)
|
|
)
|
|
self.stats_reporter.write_stats(int(step))
|
|
|
|
@abc.abstractmethod
|
|
def _process_trajectory(self, trajectory: Trajectory) -> None:
|
|
"""
|
|
Takes a trajectory and processes it, putting it into the update buffer.
|
|
:param trajectory: The Trajectory tuple containing the steps to be processed.
|
|
"""
|
|
self._maybe_write_summary(self.get_step + len(trajectory.steps))
|
|
self._increment_step(len(trajectory.steps), trajectory.behavior_id)
|
|
|
|
def _maybe_write_summary(self, step_after_process: int) -> None:
|
|
"""
|
|
If processing the trajectory will make the step exceed the next summary write,
|
|
write the summary. This logic ensures summaries are written on the update step and not in between.
|
|
:param step_after_process: the step count after processing the next trajectory.
|
|
"""
|
|
if step_after_process >= self.next_summary_step and self.get_step != 0:
|
|
self._write_summary(self.next_summary_step)
|
|
|
|
@abc.abstractmethod
|
|
def end_episode(self):
|
|
"""
|
|
A signal that the Episode has ended. The buffer must be reset.
|
|
Get only called when the academy resets.
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy:
|
|
"""
|
|
Creates policy
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None:
|
|
"""
|
|
Adds policy to trainer
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def get_policy(self, name_behavior_id: str) -> TFPolicy:
|
|
"""
|
|
Gets policy from trainer
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _is_ready_update(self):
|
|
"""
|
|
Returns whether or not the trainer has enough elements to run update model
|
|
:return: A boolean corresponding to wether or not update_model() can be run
|
|
"""
|
|
return False
|
|
|
|
@abc.abstractmethod
|
|
def _update_policy(self):
|
|
"""
|
|
Uses demonstration_buffer to update model.
|
|
"""
|
|
pass
|
|
|
|
def advance(self) -> None:
|
|
"""
|
|
Steps the trainer, taking in trajectories and updates if ready.
|
|
"""
|
|
with hierarchical_timer("process_trajectory"):
|
|
for traj_queue in self.trajectory_queues:
|
|
# We grab at most the maximum length of the queue.
|
|
# This ensures that even if the queue is being filled faster than it is
|
|
# being emptied, the trajectories in the queue are on-policy.
|
|
for _ in range(traj_queue.maxlen):
|
|
try:
|
|
t = traj_queue.get_nowait()
|
|
self._process_trajectory(t)
|
|
except AgentManagerQueue.Empty:
|
|
break
|
|
if self.should_still_train:
|
|
if self._is_ready_update():
|
|
with hierarchical_timer("_update_policy"):
|
|
self._update_policy()
|
|
for q in self.policy_queues:
|
|
# Get policies that correspond to the policy queue in question
|
|
q.put(self.get_policy(q.behavior_id))
|
|
|
|
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None:
|
|
"""
|
|
Adds a policy queue to the list of queues to publish to when this Trainer
|
|
makes a policy update
|
|
:param queue: Policy queue to publish to.
|
|
"""
|
|
self.policy_queues.append(policy_queue)
|
|
|
|
def subscribe_trajectory_queue(
|
|
self, trajectory_queue: AgentManagerQueue[Trajectory]
|
|
) -> None:
|
|
"""
|
|
Adds a trajectory queue to the list of queues for the trainer to ingest Trajectories from.
|
|
:param queue: Trajectory queue to publish to.
|
|
"""
|
|
self.trajectory_queues.append(trajectory_queue)
|