|
|
|
|
|
|
# # Unity ML-Agents Toolkit |
|
|
|
# ## ML-Agent Learning (Ghost Trainer) |
|
|
|
|
|
|
|
from typing import Deque, Dict, List, Any, cast |
|
|
|
from typing import Deque, Dict, List, cast |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import logging |
|
|
|
|
|
|
from mlagents.trainers.trajectory import Trajectory |
|
|
|
from mlagents.trainers.agent_processor import AgentManagerQueue |
|
|
|
from mlagents.trainers.stats import StatsPropertyType |
|
|
|
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers |
|
|
|
from mlagents.trainers.behavior_id_utils import ( |
|
|
|
BehaviorIdentifiers, |
|
|
|
create_name_behavior_id, |
|
|
|
) |
|
|
|
|
|
|
|
logger = logging.getLogger("mlagents.trainers") |
|
|
|
|
|
|
|
|
|
|
self._internal_trajectory_queues: Dict[str, AgentManagerQueue[Trajectory]] = {} |
|
|
|
self._internal_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {} |
|
|
|
|
|
|
|
self._team_to_name_to_policy_queue: Dict[ |
|
|
|
int, Dict[str, AgentManagerQueue[Policy]] |
|
|
|
] = {} |
|
|
|
|
|
|
|
self._name_to_parsed_behavior_id: Dict[str, BehaviorIdentifiers] = {} |
|
|
|
|
|
|
|
# assign ghost's stats collection to wrapped trainer's |
|
|
|
|
|
|
self.steps_between_swap = self_play_parameters.get("swap_steps", 20000) |
|
|
|
self.ghost_step: int = 0 |
|
|
|
|
|
|
|
self.policies: Dict[str, TFPolicy] = {} |
|
|
|
self.policy_snapshots: List[Any] = [] |
|
|
|
# A list of dicts from brain name to a single snapshot for this trainer's policies |
|
|
|
self.policy_snapshots: List[Dict[str, List[float]]] = [] |
|
|
|
|
|
|
|
# A dict from brain name to the current snapshot of this trainer's policies |
|
|
|
self.current_policy_snapshot: Dict[str, List[float]] = {} |
|
|
|
|
|
|
|
self.policies: Dict[str, TFPolicy] = {} |
|
|
|
|
|
|
|
# wrapped_training_team and learning team need to be separate |
|
|
|
# in the situation where new agents are created destroyed |
|
|
|
|
|
|
self.wrapped_trainer_team: int = None |
|
|
|
self.current_policy_snapshot = None |
|
|
|
self.last_save = 0 |
|
|
|
self.last_swap = 0 |
|
|
|
self.last_save: int = 0 |
|
|
|
self.last_swap: int = 0 |
|
|
|
|
|
|
|
# Chosen because it is the initial ELO in Chess |
|
|
|
self.initial_elo: float = self_play_parameters.get("initial_elo", 1200.0) |
|
|
|
|
|
|
self.next_summary_step = self.trainer.next_summary_step |
|
|
|
self.trainer.advance() |
|
|
|
|
|
|
|
for policy_queue in self.policy_queues: |
|
|
|
parsed_behavior_id = self._name_to_parsed_behavior_id[ |
|
|
|
policy_queue.behavior_id |
|
|
|
] |
|
|
|
if parsed_behavior_id.team_id == self._learning_team: |
|
|
|
# With a future multiagent trainer, this will be indexed by 'role' |
|
|
|
internal_policy_queue = self._internal_policy_queues[ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
# CASE 1: Current learning team is managed by this GhostTrainer. |
|
|
|
# If the learning team changes, the following loop over queues will push the |
|
|
|
# new policy into the policy queue for the new learning agent if |
|
|
|
# that policy is managed by this GhostTrainer. Otherwise, it will save the current snapshot. |
|
|
|
# CASE 2: Current learning team is managed by a different GhostTrainer. |
|
|
|
# If the learning team changes to a team managed by this GhostTrainer, this loop |
|
|
|
# will push the current_snapshot into the correct queue. Otherwise, |
|
|
|
# it will continue skipping and swap_snapshot will continue to handle |
|
|
|
# pushing fixed snapshots |
|
|
|
next_learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
for brain_name in self._internal_policy_queues: |
|
|
|
internal_policy_queue = self._internal_policy_queues[brain_name] |
|
|
|
try: |
|
|
|
policy = cast(TFPolicy, internal_policy_queue.get_nowait()) |
|
|
|
self.current_policy_snapshot[brain_name] = policy.get_weights() |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
if next_learning_team in self._team_to_name_to_policy_queue: |
|
|
|
name_to_policy_queue = self._team_to_name_to_policy_queue[ |
|
|
|
next_learning_team |
|
|
|
# Get policies that correspond to the policy queue in question |
|
|
|
try: |
|
|
|
policy = cast(TFPolicy, internal_policy_queue.get_nowait()) |
|
|
|
self.current_policy_snapshot = policy.get_weights() |
|
|
|
policy_queue.put(policy) |
|
|
|
except AgentManagerQueue.Empty: |
|
|
|
pass |
|
|
|
|
|
|
|
self._learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
if brain_name in name_to_policy_queue: |
|
|
|
behavior_id = create_name_behavior_id( |
|
|
|
brain_name, next_learning_team |
|
|
|
) |
|
|
|
policy = self.get_policy(behavior_id) |
|
|
|
policy.load_weights(self.current_policy_snapshot[brain_name]) |
|
|
|
name_to_policy_queue[brain_name].put(policy) |
|
|
|
self._save_snapshot(self.trainer.policy) |
|
|
|
self._save_snapshot() |
|
|
|
if self.ghost_step - self.last_swap > self.steps_between_swap: |
|
|
|
if ( |
|
|
|
self._learning_team != next_learning_team |
|
|
|
or self.ghost_step - self.last_swap > self.steps_between_swap |
|
|
|
): |
|
|
|
self._learning_team = next_learning_team |
|
|
|
self._swap_snapshots() |
|
|
|
self.last_swap = self.ghost_step |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# First policy or a new agent on the same team encountered |
|
|
|
if self.wrapped_trainer_team is None or team_id == self.wrapped_trainer_team: |
|
|
|
weights = policy.get_weights() |
|
|
|
self.current_policy_snapshot = weights |
|
|
|
self.current_policy_snapshot[ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
] = policy.get_weights() |
|
|
|
self._save_snapshot() # Need to save after trainer initializes policy |
|
|
|
self._save_snapshot(policy) # Need to save after trainer initializes policy |
|
|
|
self._learning_team = self.controller.get_learning_team(self.ghost_step) |
|
|
|
self.wrapped_trainer_team = team_id |
|
|
|
else: |
|
|
|
|
|
|
""" |
|
|
|
return self.policies[name_behavior_id] |
|
|
|
|
|
|
|
def _save_snapshot(self, policy: TFPolicy) -> None: |
|
|
|
def _save_snapshot(self) -> None: |
|
|
|
Saves a snapshot of the weights of the policy and maintains the policy_snapshots |
|
|
|
Saves a snapshot of the current weights of the policy and maintains the policy_snapshots |
|
|
|
:param policy: The policy to be snapshotted |
|
|
|
weights = policy.get_weights() |
|
|
|
try: |
|
|
|
self.policy_snapshots[self.snapshot_counter] = weights |
|
|
|
except IndexError: |
|
|
|
self.policy_snapshots.append(weights) |
|
|
|
for brain_name in self.current_policy_snapshot: |
|
|
|
current_snapshot_for_brain_name = self.current_policy_snapshot[brain_name] |
|
|
|
|
|
|
|
try: |
|
|
|
self.policy_snapshots[self.snapshot_counter][ |
|
|
|
brain_name |
|
|
|
] = current_snapshot_for_brain_name |
|
|
|
except IndexError: |
|
|
|
self.policy_snapshots.append( |
|
|
|
{brain_name: current_snapshot_for_brain_name} |
|
|
|
) |
|
|
|
self.policy_elos[self.snapshot_counter] = self.current_elo |
|
|
|
self.snapshot_counter = (self.snapshot_counter + 1) % self.window |
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
for policy_queue in self.policy_queues: |
|
|
|
parsed_behavior_id = self._name_to_parsed_behavior_id[ |
|
|
|
policy_queue.behavior_id |
|
|
|
] |
|
|
|
# Here is the place for a sampling protocol. If the learning team switches |
|
|
|
# immediately before swapping, this first check ensures that the new learning |
|
|
|
# team gets the current policy snapshot. Otherwise, it redundantly swaps |
|
|
|
# the current policy. |
|
|
|
if parsed_behavior_id.team_id != self._learning_team and np.random.uniform() < ( |
|
|
|
1 - self.play_against_current_self_ratio |
|
|
|
): |
|
|
|
for team_id in self._team_to_name_to_policy_queue: |
|
|
|
if team_id == self._learning_team: |
|
|
|
continue |
|
|
|
elif np.random.uniform() < (1 - self.play_against_current_self_ratio): |
|
|
|
x = np.random.randint(len(self.policy_snapshots)) |
|
|
|
snapshot = self.policy_snapshots[x] |
|
|
|
else: |
|
|
|
|
|
|
logger.debug( |
|
|
|
"Step {}: Swapping snapshot {} to id {} with {} learning".format( |
|
|
|
self.ghost_step, |
|
|
|
x, |
|
|
|
parsed_behavior_id.behavior_id, |
|
|
|
self._learning_team, |
|
|
|
name_to_policy_queue = self._team_to_name_to_policy_queue[team_id] |
|
|
|
for brain_name in self._team_to_name_to_policy_queue[team_id]: |
|
|
|
behavior_id = create_name_behavior_id(brain_name, team_id) |
|
|
|
policy = self.get_policy(behavior_id) |
|
|
|
policy.load_weights(snapshot[brain_name]) |
|
|
|
name_to_policy_queue[brain_name].put(policy) |
|
|
|
logger.debug( |
|
|
|
"Step {}: Swapping snapshot {} to id {} with team {} learning".format( |
|
|
|
self.ghost_step, x, behavior_id, self._learning_team |
|
|
|
) |
|
|
|
) |
|
|
|
policy = self.get_policy(parsed_behavior_id.behavior_id) |
|
|
|
policy.load_weights(snapshot) |
|
|
|
policy_queue.put(policy) |
|
|
|
|
|
|
|
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None: |
|
|
|
""" |
|
|
|
|
|
|
""" |
|
|
|
super().publish_policy_queue(policy_queue) |
|
|
|
parsed_behavior_id = self._name_to_parsed_behavior_id[policy_queue.behavior_id] |
|
|
|
try: |
|
|
|
self._team_to_name_to_policy_queue[parsed_behavior_id.team_id][ |
|
|
|
parsed_behavior_id.brain_name |
|
|
|
] = policy_queue |
|
|
|
except KeyError: |
|
|
|
self._team_to_name_to_policy_queue[parsed_behavior_id.team_id] = { |
|
|
|
parsed_behavior_id.brain_name: policy_queue |
|
|
|
} |
|
|
|
if parsed_behavior_id.team_id == self.wrapped_trainer_team: |
|
|
|
# With a future multiagent trainer, this will be indexed by 'role' |
|
|
|
internal_policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( |
|
|
|