|
|
|
|
|
|
# # Unity ML-Agents Toolkit |
|
|
|
import logging |
|
|
|
from typing import Dict, List, Deque, Any |
|
|
|
import time |
|
|
|
import abc |
|
|
|
|
|
|
|
from collections import deque |
|
|
|
|
|
|
from mlagents.trainers.stats import StatsReporter, StatsPropertyType |
|
|
|
from mlagents.trainers.stats import StatsReporter |
|
|
|
from mlagents_envs.timers import hierarchical_timer |
|
|
|
|
|
|
|
logger = logging.getLogger("mlagents.trainers") |
|
|
|
|
|
|
|
|
|
|
self.run_id = run_id |
|
|
|
self.trainer_parameters = trainer_parameters |
|
|
|
self.summary_path = trainer_parameters["summary_path"] |
|
|
|
self.stats_reporter = StatsReporter(self.summary_path) |
|
|
|
self.cumulative_returns_since_policy_update: List[float] = [] |
|
|
|
self._stats_reporter = StatsReporter(self.summary_path) |
|
|
|
self.training_start_time = time.time() |
|
|
|
self.stats_reporter.add_property( |
|
|
|
StatsPropertyType.HYPERPARAMETERS, self.trainer_parameters |
|
|
|
) |
|
|
|
|
|
|
|
@property |
|
|
|
def stats_reporter(self): |
|
|
|
""" |
|
|
|
Returns the stats reporter associated with this Trainer. |
|
|
|
""" |
|
|
|
return self._stats_reporter |
|
|
|
|
|
|
|
def _check_param_keys(self): |
|
|
|
for k in self.param_keys: |
|
|
|
|
|
|
""" |
|
|
|
return self._reward_buffer |
|
|
|
|
|
|
|
def _increment_step(self, n_steps: int, name_behavior_id: str) -> None: |
|
|
|
""" |
|
|
|
Increment the step count of the trainer |
|
|
|
:param n_steps: number of steps to increment the step count by |
|
|
|
""" |
|
|
|
self.step += n_steps |
|
|
|
self.next_summary_step = self._get_next_summary_step() |
|
|
|
p = self.get_policy(name_behavior_id) |
|
|
|
if p: |
|
|
|
p.increment_step(n_steps) |
|
|
|
|
|
|
|
def _get_next_summary_step(self) -> int: |
|
|
|
""" |
|
|
|
Get the next step count that should result in a summary write. |
|
|
|
""" |
|
|
|
return self.step + (self.summary_freq - self.step % self.summary_freq) |
|
|
|
|
|
|
|
def save_model(self, name_behavior_id: str) -> None: |
|
|
|
""" |
|
|
|
Saves the model |
|
|
|
|
|
|
settings = SerializationSettings(policy.model_path, policy.brain.brain_name) |
|
|
|
export_policy_model(settings, policy.graph, policy.sess) |
|
|
|
|
|
|
|
def _write_summary(self, step: int) -> None: |
|
|
|
""" |
|
|
|
Saves training statistics to Tensorboard. |
|
|
|
""" |
|
|
|
self.stats_reporter.add_stat("Is Training", float(self.should_still_train)) |
|
|
|
self.stats_reporter.write_stats(int(step)) |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def _process_trajectory(self, trajectory: Trajectory) -> None: |
|
|
|
""" |
|
|
|
Takes a trajectory and processes it, putting it into the update buffer. |
|
|
|
:param trajectory: The Trajectory tuple containing the steps to be processed. |
|
|
|
""" |
|
|
|
self._maybe_write_summary(self.get_step + len(trajectory.steps)) |
|
|
|
self._increment_step(len(trajectory.steps), trajectory.behavior_id) |
|
|
|
|
|
|
|
def _maybe_write_summary(self, step_after_process: int) -> None: |
|
|
|
""" |
|
|
|
If processing the trajectory will make the step exceed the next summary write, |
|
|
|
write the summary. This logic ensures summaries are written on the update step and not in between. |
|
|
|
:param step_after_process: the step count after processing the next trajectory. |
|
|
|
""" |
|
|
|
if step_after_process >= self.next_summary_step and self.get_step != 0: |
|
|
|
self._write_summary(self.next_summary_step) |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def end_episode(self): |
|
|
|
""" |
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None: |
|
|
|
""" |
|
|
|
Adds policy to trainer |
|
|
|
Adds policy to trainer. |
|
|
|
""" |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
Gets policy from trainer |
|
|
|
Gets policy from trainer. |
|
|
|
def _is_ready_update(self): |
|
|
|
def advance(self) -> None: |
|
|
|
Returns whether or not the trainer has enough elements to run update model |
|
|
|
:return: A boolean corresponding to wether or not update_model() can be run |
|
|
|
""" |
|
|
|
return False |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def _update_policy(self): |
|
|
|
""" |
|
|
|
Uses demonstration_buffer to update model. |
|
|
|
Advances the trainer. Typically, this means grabbing trajectories |
|
|
|
from all subscribed trajectory queues (self.trajectory_queues), and updating |
|
|
|
a policy using the steps in them, and if needed pushing a new policy onto the right |
|
|
|
policy queues (self.policy_queues). |
|
|
|
def advance(self) -> None: |
|
|
|
""" |
|
|
|
Steps the trainer, taking in trajectories and updates if ready. |
|
|
|
""" |
|
|
|
with hierarchical_timer("process_trajectory"): |
|
|
|
for traj_queue in self.trajectory_queues: |
|
|
|
# We grab at most the maximum length of the queue. |
|
|
|
# This ensures that even if the queue is being filled faster than it is |
|
|
|
# being emptied, the trajectories in the queue are on-policy. |
|
|
|
for _ in range(traj_queue.maxlen): |
|
|
|
try: |
|
|
|
t = traj_queue.get_nowait() |
|
|
|
self._process_trajectory(t) |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
break |
|
|
|
if self.should_still_train: |
|
|
|
if self._is_ready_update(): |
|
|
|
with hierarchical_timer("_update_policy"): |
|
|
|
self._update_policy() |
|
|
|
for q in self.policy_queues: |
|
|
|
# Get policies that correspond to the policy queue in question |
|
|
|
q.put(self.get_policy(q.behavior_id)) |
|
|
|
|
|
|
|
:param queue: Policy queue to publish to. |
|
|
|
:param policy_queue: Policy queue to publish to. |
|
|
|
""" |
|
|
|
self.policy_queues.append(policy_queue) |
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
Adds a trajectory queue to the list of queues for the trainer to ingest Trajectories from. |
|
|
|
:param queue: Trajectory queue to publish to. |
|
|
|
:param trajectory_queue: Trajectory queue to read from. |
|
|
|
""" |
|
|
|
self.trajectory_queues.append(trajectory_queue) |