|
|
|
|
|
|
|
|
|
|
self.internal_policy_queues: List[AgentManagerQueue[Policy]] = [] |
|
|
|
self.internal_trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] |
|
|
|
self.ignored_trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] |
|
|
|
self.learning_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {} |
|
|
|
|
|
|
|
# assign ghost's stats collection to wrapped trainer's |
|
|
|
|
|
|
self.trajectory_queues, self.internal_trajectory_queues |
|
|
|
): |
|
|
|
try: |
|
|
|
t = traj_queue.get_nowait() |
|
|
|
# adds to wrapped trainers queue |
|
|
|
internal_traj_queue.put(t) |
|
|
|
self._process_trajectory(t) |
|
|
|
# We grab at most the maximum length of the queue. |
|
|
|
# This ensures that even if the queue is being filled faster than it is |
|
|
|
# being emptied, the trajectories in the queue are on-policy. |
|
|
|
for _ in range(traj_queue.maxlen): |
|
|
|
t = traj_queue.get_nowait() |
|
|
|
# adds to wrapped trainers queue |
|
|
|
internal_traj_queue.put(t) |
|
|
|
self._process_trajectory(t) |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
if self.get_step - self.last_swap > self.steps_between_swap: |
|
|
|
self._swap_snapshots() |
|
|
|
self.last_swap = self.get_step |
|
|
|
|
|
|
|
# Dump trajectories from non-learning policy |
|
|
|
for traj_queue in self.ignored_trajectory_queues: |
|
|
|
try: |
|
|
|
for _ in range(traj_queue.maxlen): |
|
|
|
traj_queue.get_nowait() |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
|
|
|
|
def end_episode(self): |
|
|
|
self.trainer.end_episode() |
|
|
|
|
|
|
|
|
|
|
self.internal_trajectory_queues.append(internal_trajectory_queue) |
|
|
|
self.trainer.subscribe_trajectory_queue(internal_trajectory_queue) |
|
|
|
else: |
|
|
|
self.ignored_trajectory_queues.append(trajectory_queue) |
|
|
|
|
|
|
|
|
|
|
|
# Taken from https://github.com/Unity-Technologies/ml-agents/pull/1975 and |
|
|
|