|
|
|
|
|
|
# # Unity ML-Agents Toolkit |
|
|
|
# ## ML-Agent Learning (Ghost Trainer) |
|
|
|
|
|
|
|
from typing import Deque, Dict, List, Any, cast |
|
|
|
from typing import Deque, Dict, List, cast |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import logging |
|
|
|
|
|
|
from mlagents.trainers.trajectory import Trajectory |
|
|
|
from mlagents.trainers.agent_processor import AgentManagerQueue |
|
|
|
from mlagents.trainers.stats import StatsPropertyType |
|
|
|
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers |
|
|
|
from mlagents.trainers.behavior_id_utils import ( |
|
|
|
BehaviorIdentifiers, |
|
|
|
create_name_behavior_id, |
|
|
|
) |
|
|
|
""" |
|
|
|
The GhostTrainer trains agents in adversarial games (there are teams in opposition) using a self-play mechanism. |
|
|
|
In adversarial settings with self-play, at any time, there is only a single learning team. The other team(s) is |
|
|
|
"ghosted" which means that its agents are executing fixed policies and not learning. The GhostTrainer wraps |
|
|
|
a standard RL trainer which trains the learning team and ensures that only the trajectories collected |
|
|
|
by the learning team are used for training. The GhostTrainer also maintains past policy snapshots to be used |
|
|
|
as the fixed policies when the team is not learning. The GhostTrainer is 1:1 with brain_names as the other |
|
|
|
trainers, and is responsible for one or more teams. Note, a GhostTrainer can have only one team in |
|
|
|
asymmetric games where there is only one team with a particular behavior i.e. Hide and Seek. |
|
|
|
The GhostController manages high level coordination between multiple ghost trainers. The learning team id |
|
|
|
is cycled throughout a training run. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
trainer, |
|
|
|
|
|
|
run_id, |
|
|
|
): |
|
|
|
""" |
|
|
|
Responsible for collecting experiences and training trainer model via self_play. |
|
|
|
Creates a GhostTrainer. |
|
|
|
:param controller: Object that coordinates all ghost trainers |
|
|
|
:param controller: GhostController that coordinates all ghost trainers and calculates ELO |
|
|
|
:param reward_buff_cap: Max reward history to track in the reward buffer |
|
|
|
:param trainer_parameters: The parameters for the trainer (dictionary). |
|
|
|
:param training: Whether the trainer is set for training. |
|
|
|
|
|
|
|
|
|
|
self._internal_trajectory_queues: Dict[str, AgentManagerQueue[Trajectory]] = {} |
|
|
|
self._internal_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {} |
|
|
|
|
|
|
|
self._team_to_name_to_policy_queue: Dict[ |
|
|
|
int, Dict[str, AgentManagerQueue[Policy]] |
|
|
|
] = {} |
|
|
|
|
|
|
|
self._name_to_parsed_behavior_id: Dict[str, BehaviorIdentifiers] = {} |
|
|
|
|
|
|
|
|
|
|
self.steps_between_swap = self_play_parameters.get("swap_steps", 20000) |
|
|
|
self.ghost_step: int = 0 |
|
|
|
|
|
|
|
self.policies: Dict[str, TFPolicy] = {} |
|
|
|
self.policy_snapshots: List[Any] = [] |
|
|
|
# A list of dicts from brain name to a single snapshot for this trainer's policies |
|
|
|
self.policy_snapshots: List[Dict[str, List[float]]] = [] |
|
|
|
|
|
|
|
# A dict from brain name to the current snapshot of this trainer's policies |
|
|
|
self.current_policy_snapshot: Dict[str, List[float]] = {} |
|
|
|
|
|
|
|
self.policies: Dict[str, TFPolicy] = {} |
|
|
|
|
|
|
|
# wrapped_training_team and learning team need to be separate |
|
|
|
# in the situation where new agents are created destroyed |
|
|
|
|
|
|
self.wrapped_trainer_team: int = None |
|
|
|
self.current_policy_snapshot = None |
|
|
|
self.last_save = 0 |
|
|
|
self.last_swap = 0 |
|
|
|
self.last_save: int = 0 |
|
|
|
self.last_swap: int = 0 |
|
|
|
|
|
|
|
# Chosen because it is the initial ELO in Chess |
|
|
|
self.initial_elo: float = self_play_parameters.get("initial_elo", 1200.0) |
|
|
|
|
|
|
|
|
|
|
@property |
|
|
|
def current_elo(self) -> float: |
|
|
|
""" |
|
|
|
Gets ELO of current policy which is always last in the list |
|
|
|
:return: ELO of current policy |
|
|
|
""" |
|
|
|
""" |
|
|
|
Changes elo of current policy which is always last in the list |
|
|
|
:param change: Amount to change current elo by |
|
|
|
""" |
|
|
|
""" |
|
|
|
Get elo of current opponent policy |
|
|
|
:return: ELO of current opponent policy |
|
|
|
""" |
|
|
|
""" |
|
|
|
Changes elo of current opponent policy |
|
|
|
:param change: Amount to change current opponent elo by |
|
|
|
""" |
|
|
|
""" |
|
|
|
Determines the final result of an episode and asks the GhostController |
|
|
|
to calculate the ELO change. The GhostController changes the ELO |
|
|
|
of the opponent policy since this may be in a different GhostTrainer |
|
|
|
i.e. in asymmetric games. We assume the last reward determines the winner. |
|
|
|
:param trajectory: Trajectory. |
|
|
|
""" |
|
|
|
if trajectory.done_reached and not trajectory.max_step_reached: |
|
|
|
# Assumption is that final reward is 1/.5/0 for win/draw/loss |
|
|
|
final_reward = trajectory.steps[-1].reward |
|
|
|
|
|
|
self.next_summary_step = self.trainer.next_summary_step |
|
|
|
self.trainer.advance() |
|
|
|
|
|
|
|
for policy_queue in self.policy_queues: |
|
|
|
parsed_behavior_id = self._name_to_parsed_behavior_id[ |
|
|
|
policy_queue.behavior_id |
|
|
|
] |
|
|
|
if parsed_behavior_id.team_id == self._learning_team: |
|
|
|
# With a future multiagent trainer, this will be indexed by 'role' |
|
|
|
internal_policy_queue = self._internal_policy_queues[ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
# CASE 1: Current learning team is managed by this GhostTrainer. |
|
|
|
# If the learning team changes, the following loop over queues will push the |
|
|
|
# new policy into the policy queue for the new learning agent if |
|
|
|
# that policy is managed by this GhostTrainer. Otherwise, it will save the current snapshot. |
|
|
|
# CASE 2: Current learning team is managed by a different GhostTrainer. |
|
|
|
# If the learning team changes to a team managed by this GhostTrainer, this loop |
|
|
|
# will push the current_snapshot into the correct queue. Otherwise, |
|
|
|
# it will continue skipping and swap_snapshot will continue to handle |
|
|
|
# pushing fixed snapshots |
|
|
|
next_learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
for brain_name in self._internal_policy_queues: |
|
|
|
internal_policy_queue = self._internal_policy_queues[brain_name] |
|
|
|
try: |
|
|
|
policy = cast(TFPolicy, internal_policy_queue.get_nowait()) |
|
|
|
self.current_policy_snapshot[brain_name] = policy.get_weights() |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
if next_learning_team in self._team_to_name_to_policy_queue: |
|
|
|
name_to_policy_queue = self._team_to_name_to_policy_queue[ |
|
|
|
next_learning_team |
|
|
|
# Get policies that correspond to the policy queue in question |
|
|
|
try: |
|
|
|
policy = cast(TFPolicy, internal_policy_queue.get_nowait()) |
|
|
|
self.current_policy_snapshot = policy.get_weights() |
|
|
|
policy_queue.put(policy) |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
|
|
|
|
self._learning_team = self.controller.get_learning_team(self.get_step) |
|
|
|
# self._learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
if brain_name in name_to_policy_queue: |
|
|
|
behavior_id = create_name_behavior_id( |
|
|
|
brain_name, next_learning_team |
|
|
|
) |
|
|
|
policy = self.get_policy(behavior_id) |
|
|
|
policy.load_weights(self.current_policy_snapshot[brain_name]) |
|
|
|
name_to_policy_queue[brain_name].put(policy) |
|
|
|
self._save_snapshot(self.trainer.policy) |
|
|
|
self._save_snapshot() |
|
|
|
if self.ghost_step - self.last_swap > self.steps_between_swap: |
|
|
|
if ( |
|
|
|
self._learning_team != next_learning_team |
|
|
|
or self.ghost_step - self.last_swap > self.steps_between_swap |
|
|
|
): |
|
|
|
self._learning_team = next_learning_team |
|
|
|
""" |
|
|
|
Forwarding call to wrapped trainers end_episode |
|
|
|
""" |
|
|
|
""" |
|
|
|
Forwarding call to wrapped trainers save_model |
|
|
|
""" |
|
|
|
""" |
|
|
|
Forwarding call to wrapped trainers export_model |
|
|
|
""" |
|
|
|
""" |
|
|
|
Creates policy with the wrapped trainer's create_policy function |
|
|
|
""" |
|
|
|
return self.trainer.create_policy(brain_parameters) |
|
|
|
|
|
|
|
def add_policy( |
|
|
|
|
|
|
Adds policy to trainer. For the first policy added, add a trainer |
|
|
|
to the policy and set the learning behavior name to name_behavior_id. |
|
|
|
Adds policy to trainer. The first policy encountered sets the wrapped |
|
|
|
trainer team. This is to ensure that all agents from the same multi-agent |
|
|
|
team are grouped. All policies associated with this team are added to the |
|
|
|
wrapped trainer to be trained. |
|
|
|
:param name_behavior_id: Behavior ID that the policy should belong to. |
|
|
|
:param policy: Policy to associate with name_behavior_id. |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
# First policy or a new agent on the same team encountered |
|
|
|
if self.wrapped_trainer_team is None or team_id == self.wrapped_trainer_team: |
|
|
|
weights = policy.get_weights() |
|
|
|
self.current_policy_snapshot = weights |
|
|
|
self.current_policy_snapshot[ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
] = policy.get_weights() |
|
|
|
self._save_snapshot() # Need to save after trainer initializes policy |
|
|
|
self._save_snapshot(policy) # Need to save after trainer initializes policy |
|
|
|
self._learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
self.wrapped_trainer_team = team_id |
|
|
|
else: |
|
|
|
|
|
|
def get_policy(self, name_behavior_id: str) -> TFPolicy: |
|
|
|
""" |
|
|
|
Gets policy associated with name_behavior_id |
|
|
|
:param name_behavior_id: Fully qualified behavior name |
|
|
|
:return: Policy associated with name_behavior_id |
|
|
|
""" |
|
|
|
def _save_snapshot(self, policy: TFPolicy) -> None: |
|
|
|
weights = policy.get_weights() |
|
|
|
try: |
|
|
|
self.policy_snapshots[self.snapshot_counter] = weights |
|
|
|
except IndexError: |
|
|
|
self.policy_snapshots.append(weights) |
|
|
|
def _save_snapshot(self) -> None: |
|
|
|
""" |
|
|
|
Saves a snapshot of the current weights of the policy and maintains the policy_snapshots |
|
|
|
according to the window size |
|
|
|
""" |
|
|
|
for brain_name in self.current_policy_snapshot: |
|
|
|
current_snapshot_for_brain_name = self.current_policy_snapshot[brain_name] |
|
|
|
|
|
|
|
try: |
|
|
|
self.policy_snapshots[self.snapshot_counter][ |
|
|
|
brain_name |
|
|
|
] = current_snapshot_for_brain_name |
|
|
|
except IndexError: |
|
|
|
self.policy_snapshots.append( |
|
|
|
{brain_name: current_snapshot_for_brain_name} |
|
|
|
) |
|
|
|
for policy_queue in self.policy_queues: |
|
|
|
parsed_behavior_id = self._name_to_parsed_behavior_id[ |
|
|
|
policy_queue.behavior_id |
|
|
|
] |
|
|
|
# here is the place for a sampling protocol |
|
|
|
if parsed_behavior_id.team_id == self._learning_team: |
|
|
|
""" |
|
|
|
Swaps the appropriate weight to the policy and pushes it to respective policy queues |
|
|
|
""" |
|
|
|
|
|
|
|
for team_id in self._team_to_name_to_policy_queue: |
|
|
|
if team_id == self._learning_team: |
|
|
|
continue |
|
|
|
elif np.random.uniform() < (1 - self.play_against_current_self_ratio): |
|
|
|
x = np.random.randint(len(self.policy_snapshots)) |
|
|
|
|
|
|
x = "current" |
|
|
|
self.current_opponent = -1 if x == "current" else x |
|
|
|
logger.debug( |
|
|
|
"Step {}: Swapping snapshot {} to id {} with {} learning".format( |
|
|
|
self.ghost_step, |
|
|
|
x, |
|
|
|
parsed_behavior_id.behavior_id, |
|
|
|
self._learning_team, |
|
|
|
name_to_policy_queue = self._team_to_name_to_policy_queue[team_id] |
|
|
|
for brain_name in self._team_to_name_to_policy_queue[team_id]: |
|
|
|
behavior_id = create_name_behavior_id(brain_name, team_id) |
|
|
|
policy = self.get_policy(behavior_id) |
|
|
|
policy.load_weights(snapshot[brain_name]) |
|
|
|
name_to_policy_queue[brain_name].put(policy) |
|
|
|
logger.debug( |
|
|
|
"Step {}: Swapping snapshot {} to id {} with team {} learning".format( |
|
|
|
self.ghost_step, x, behavior_id, self._learning_team |
|
|
|
) |
|
|
|
) |
|
|
|
policy = self.get_policy(parsed_behavior_id.behavior_id) |
|
|
|
policy.load_weights(snapshot) |
|
|
|
policy_queue.put(policy) |
|
|
|
Adds a policy queue to the list of queues to publish to when this Trainer |
|
|
|
makes a policy update |
|
|
|
Adds a policy queue for every member of the team to the list of queues to publish to when this Trainer |
|
|
|
makes a policy update. Creates an internal policy queue for the wrapped |
|
|
|
trainer to push to. The GhostTrainer pushes all policies to the env. |
|
|
|
try: |
|
|
|
self._team_to_name_to_policy_queue[parsed_behavior_id.team_id][ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
] = policy_queue |
|
|
|
except KeyError: |
|
|
|
self._team_to_name_to_policy_queue[parsed_behavior_id.team_id] = { |
|
|
|
parsed_behavior_id.brain_name: policy_queue |
|
|
|
} |
|
|
|
if parsed_behavior_id.team_id == self.wrapped_trainer_team: |
|
|
|
# With a future multiagent trainer, this will be indexed by 'role' |
|
|
|
internal_policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( |
|
|
|
|
|
|
self, trajectory_queue: AgentManagerQueue[Trajectory] |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Adds a trajectory queue to the list of queues for the trainer to ingest Trajectories from. |
|
|
|
Adds a trajectory queue for every member of the team to the list of queues for the trainer |
|
|
|
to ingest Trajectories from. Creates an internal trajectory queue to push trajectories from |
|
|
|
the learning team. The wrapped trainer subscribes to this queue. |
|
|
|
:param queue: Trajectory queue to publish to. |
|
|
|
""" |
|
|
|
super().subscribe_trajectory_queue(trajectory_queue) |
|
|
|