|
|
|
|
|
|
# # Unity ML-Agents Toolkit |
|
|
|
import logging |
|
|
|
from typing import Dict, List, Deque, Any |
|
|
|
import time |
|
|
|
|
|
|
|
from mlagents.tf_utils import tf |
|
|
|
from mlagents_envs.timers import set_gauge |
|
|
|
from mlagents.trainers.tf_policy import TFPolicy |
|
|
|
from mlagents.trainers.stats import StatsReporter |
|
|
|
from mlagents.trainers.trajectory import Trajectory |
|
|
|
|
|
|
from mlagents_envs.timers import hierarchical_timer |
|
|
|
|
|
|
|
LOGGER = logging.getLogger("mlagents.trainers") |
|
|
|
|
|
|
|
|
|
|
:str run_id: The identifier of the current run |
|
|
|
:int reward_buff_cap: |
|
|
|
""" |
|
|
|
self.param_keys: List[str] = [] |
|
|
|
self.is_training = training |
|
|
|
self.cumulative_returns_since_policy_update: List[float] = [] |
|
|
|
self.is_training = training |
|
|
|
self.step: int = 0 |
|
|
|
self.training_start_time = time.time() |
|
|
|
self.summary_freq = self.trainer_parameters["summary_freq"] |
|
|
|
self.next_update_step = self.summary_freq |
|
|
|
|
|
|
|
def _check_param_keys(self): |
|
|
|
for k in self.param_keys: |
|
|
|
if k not in self.trainer_parameters: |
|
|
|
raise UnityTrainerException( |
|
|
|
"The hyper-parameter {0} could not be found for the {1} trainer of " |
|
|
|
"brain {2}.".format(k, self.__class__, self.brain_name) |
|
|
|
) |
|
|
|
|
|
|
|
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None: |
|
|
|
""" |
|
|
|
Saves text to Tensorboard. |
|
|
|
Note: Only works on tensorflow r1.2 or above. |
|
|
|
:param key: The name of the text. |
|
|
|
:param input_dict: A dictionary that will be displayed in a table on Tensorboard. |
|
|
|
""" |
|
|
|
try: |
|
|
|
with tf.Session() as sess: |
|
|
|
s_op = tf.summary.text( |
|
|
|
key, |
|
|
|
tf.convert_to_tensor( |
|
|
|
([[str(x), str(input_dict[x])] for x in input_dict]) |
|
|
|
), |
|
|
|
) |
|
|
|
s = sess.run(s_op) |
|
|
|
self.stats_reporter.write_text(s, self.get_step) |
|
|
|
except Exception: |
|
|
|
LOGGER.info("Could not write text summary for Tensorboard.") |
|
|
|
pass |
|
|
|
|
|
|
|
def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str: |
|
|
|
""" |
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
@property |
|
|
|
def parameters(self) -> Dict[str, Any]: |
|
|
|
""" |
|
|
|
Returns the trainer parameters of the trainer. |
|
|
|
""" |
|
|
|
return self.trainer_parameters |
|
|
|
|
|
|
|
@property |
|
|
|
def get_max_steps(self) -> int: |
|
|
|
""" |
|
|
|
Returns the maximum number of steps. Is used to know when the trainer should be stopped. |
|
|
|
:return: The maximum number of steps of the trainer |
|
|
|
""" |
|
|
|
return int(float(self.trainer_parameters["max_steps"])) |
|
|
|
|
|
|
|
@property |
|
|
|
def get_step(self) -> int: |
|
|
|
""" |
|
|
|
Returns the number of steps the trainer has performed |
|
|
|
:return: the step count of the trainer |
|
|
|
""" |
|
|
|
return self.step |
|
|
|
|
|
|
|
@property |
|
|
|
def should_still_train(self) -> bool: |
|
|
|
@abc.abstractmethod |
|
|
|
def training_progress(self) -> float: |
|
|
|
Returns whether or not the trainer should train. A Trainer could |
|
|
|
stop training if it wasn't training to begin with, or if max_steps |
|
|
|
Returns a float between 0 and 1 indicating how far along in the training progress the Trainer is. |
|
|
|
If 1, the Trainer wasn't training to begin with, or max_steps |
|
|
|
return self.is_training and self.get_step <= self.get_max_steps |
|
|
|
pass |
|
|
|
|
|
|
|
@property |
|
|
|
def reward_buffer(self) -> Deque[float]: |
|
|
|
|
|
|
""" |
|
|
|
return self._reward_buffer |
|
|
|
|
|
|
|
def _increment_step(self, n_steps: int, name_behavior_id: str) -> None: |
|
|
|
""" |
|
|
|
Increment the step count of the trainer |
|
|
|
:param n_steps: number of steps to increment the step count by |
|
|
|
""" |
|
|
|
self.step += n_steps |
|
|
|
self.next_update_step = self.step + ( |
|
|
|
self.summary_freq - self.step % self.summary_freq |
|
|
|
) |
|
|
|
p = self.get_policy(name_behavior_id) |
|
|
|
if p: |
|
|
|
p.increment_step(n_steps) |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
self.get_policy(name_behavior_id).save_model(self.get_step) |
|
|
|
pass |
|
|
|
@abc.abstractmethod |
|
|
|
self.get_policy(name_behavior_id).export_model() |
|
|
|
|
|
|
|
def _write_summary(self, step: int) -> None: |
|
|
|
""" |
|
|
|
Saves training statistics to Tensorboard. |
|
|
|
""" |
|
|
|
is_training = "Training." if self.should_still_train else "Not Training." |
|
|
|
stats_summary = self.stats_reporter.get_stats_summaries( |
|
|
|
"Environment/Cumulative Reward" |
|
|
|
) |
|
|
|
if stats_summary.num > 0: |
|
|
|
LOGGER.info( |
|
|
|
" {}: {}: Step: {}. " |
|
|
|
"Time Elapsed: {:0.3f} s " |
|
|
|
"Mean " |
|
|
|
"Reward: {:0.3f}" |
|
|
|
". Std of Reward: {:0.3f}. {}".format( |
|
|
|
self.run_id, |
|
|
|
self.brain_name, |
|
|
|
step, |
|
|
|
time.time() - self.training_start_time, |
|
|
|
stats_summary.mean, |
|
|
|
stats_summary.std, |
|
|
|
is_training, |
|
|
|
) |
|
|
|
) |
|
|
|
set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean) |
|
|
|
else: |
|
|
|
LOGGER.info( |
|
|
|
" {}: {}: Step: {}. No episode was completed since last summary. {}".format( |
|
|
|
self.run_id, self.brain_name, step, is_training |
|
|
|
) |
|
|
|
) |
|
|
|
self.stats_reporter.write_stats(int(step)) |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def _process_trajectory(self, trajectory: Trajectory) -> None: |
|
|
|
""" |
|
|
|
Takes a trajectory and processes it, putting it into the update buffer. |
|
|
|
:param trajectory: The Trajectory tuple containing the steps to be processed. |
|
|
|
""" |
|
|
|
self._maybe_write_summary(self.get_step + len(trajectory.steps)) |
|
|
|
self._increment_step(len(trajectory.steps), trajectory.behavior_id) |
|
|
|
|
|
|
|
def _maybe_write_summary(self, step_after_process: int) -> None: |
|
|
|
""" |
|
|
|
If processing the trajectory will make the step exceed the next summary write, |
|
|
|
write the summary. This logic ensures summaries are written on the update step and not in between. |
|
|
|
:param step_after_process: the step count after processing the next trajectory. |
|
|
|
""" |
|
|
|
if step_after_process >= self.next_update_step and self.get_step != 0: |
|
|
|
self._write_summary(self.next_update_step) |
|
|
|
pass |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def end_episode(self): |
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def _is_ready_update(self): |
|
|
|
""" |
|
|
|
Returns whether or not the trainer has enough elements to run update model |
|
|
|
:return: A boolean corresponding to wether or not update_model() can be run |
|
|
|
def advance(self) -> None: |
|
|
|
return False |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def _update_policy(self): |
|
|
|
""" |
|
|
|
Uses demonstration_buffer to update model. |
|
|
|
Steps the trainer, taking in trajectories and updates if ready |
|
|
|
|
|
|
|
def advance(self) -> None: |
|
|
|
""" |
|
|
|
Steps the trainer, taking in trajectories and updates if ready. |
|
|
|
""" |
|
|
|
with hierarchical_timer("process_trajectory"): |
|
|
|
for traj_queue in self.trajectory_queues: |
|
|
|
try: |
|
|
|
t = traj_queue.get_nowait() |
|
|
|
self._process_trajectory(t) |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
if self.should_still_train: |
|
|
|
if self._is_ready_update(): |
|
|
|
with hierarchical_timer("_update_policy"): |
|
|
|
self._update_policy() |
|
|
|
for q in self.policy_queues: |
|
|
|
# Get policies that correspond to the policy queue in question |
|
|
|
q.put(self.get_policy(q.behavior_id)) |
|
|
|
|
|
|
|
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None: |
|
|
|
""" |
|
|
|