您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
289 行
12 KiB
289 行
12 KiB
# # Unity ML-Agents Toolkit
|
|
# ## ML-Agent Learning (Ghost Trainer)
|
|
|
|
from typing import Deque, Dict, List, Any, cast
|
|
|
|
import numpy as np
|
|
|
|
from mlagents_envs.logging_util import get_logger
|
|
from mlagents.trainers.brain import BrainParameters
|
|
from mlagents.trainers.policy import Policy
|
|
from mlagents.trainers.policy.tf_policy import TFPolicy
|
|
|
|
from mlagents.trainers.trainer import Trainer
|
|
from mlagents.trainers.trajectory import Trajectory
|
|
from mlagents.trainers.agent_processor import AgentManagerQueue
|
|
from mlagents.trainers.stats import StatsPropertyType
|
|
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class GhostTrainer(Trainer):
|
|
def __init__(
|
|
self, trainer, brain_name, reward_buff_cap, trainer_parameters, training, run_id
|
|
):
|
|
"""
|
|
Responsible for collecting experiences and training trainer model via self_play.
|
|
:param trainer: The trainer of the policy/policies being trained with self_play
|
|
:param brain_name: The name of the brain associated with trainer config
|
|
:param reward_buff_cap: Max reward history to track in the reward buffer
|
|
:param trainer_parameters: The parameters for the trainer (dictionary).
|
|
:param training: Whether the trainer is set for training.
|
|
:param run_id: The identifier of the current run
|
|
"""
|
|
|
|
super(GhostTrainer, self).__init__(
|
|
brain_name, trainer_parameters, training, run_id, reward_buff_cap
|
|
)
|
|
|
|
self.trainer = trainer
|
|
|
|
self.internal_policy_queues: List[AgentManagerQueue[Policy]] = []
|
|
self.internal_trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
|
|
self.ignored_trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
|
|
self.learning_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {}
|
|
|
|
# assign ghost's stats collection to wrapped trainer's
|
|
self._stats_reporter = self.trainer.stats_reporter
|
|
# Set the logging to print ELO in the console
|
|
self._stats_reporter.add_property(StatsPropertyType.SELF_PLAY, True)
|
|
|
|
self_play_parameters = trainer_parameters["self_play"]
|
|
self.window = self_play_parameters.get("window", 10)
|
|
self.play_against_current_self_ratio = self_play_parameters.get(
|
|
"play_against_current_self_ratio", 0.5
|
|
)
|
|
self.steps_between_save = self_play_parameters.get("save_steps", 20000)
|
|
self.steps_between_swap = self_play_parameters.get("swap_steps", 20000)
|
|
|
|
self.policies: Dict[str, TFPolicy] = {}
|
|
self.policy_snapshots: List[Any] = []
|
|
self.snapshot_counter: int = 0
|
|
self.learning_behavior_name: str = None
|
|
self.current_policy_snapshot = None
|
|
self.last_save = 0
|
|
self.last_swap = 0
|
|
|
|
# Chosen because it is the initial ELO in Chess
|
|
self.initial_elo: float = self_play_parameters.get("initial_elo", 1200.0)
|
|
self.current_elo: float = self.initial_elo
|
|
self.policy_elos: List[float] = [self.initial_elo] * (
|
|
self.window + 1
|
|
) # for learning policy
|
|
self.current_opponent: int = 0
|
|
|
|
@property
|
|
def get_step(self) -> int:
|
|
"""
|
|
Returns the number of steps the trainer has performed
|
|
:return: the step count of the trainer
|
|
"""
|
|
return self.trainer.get_step
|
|
|
|
@property
|
|
def reward_buffer(self) -> Deque[float]:
|
|
"""
|
|
Returns the reward buffer. The reward buffer contains the cumulative
|
|
rewards of the most recent episodes completed by agents using this
|
|
trainer.
|
|
:return: the reward buffer.
|
|
"""
|
|
return self.trainer.reward_buffer
|
|
|
|
def _process_trajectory(self, trajectory: Trajectory) -> None:
|
|
if trajectory.done_reached and not trajectory.max_step_reached:
|
|
# Assumption is that final reward is 1/.5/0 for win/draw/loss
|
|
final_reward = trajectory.steps[-1].reward
|
|
result = 0.5
|
|
if final_reward > 0:
|
|
result = 1.0
|
|
elif final_reward < 0:
|
|
result = 0.0
|
|
|
|
change = compute_elo_rating_changes(
|
|
self.current_elo, self.policy_elos[self.current_opponent], result
|
|
)
|
|
self.current_elo += change
|
|
self.policy_elos[self.current_opponent] -= change
|
|
opponents = np.array(self.policy_elos, dtype=np.float32)
|
|
self._stats_reporter.add_stat("Self-play/ELO", self.current_elo)
|
|
self._stats_reporter.add_stat(
|
|
"Self-play/Mean Opponent ELO", opponents.mean()
|
|
)
|
|
self._stats_reporter.add_stat("Self-play/Std Opponent ELO", opponents.std())
|
|
|
|
def advance(self) -> None:
|
|
"""
|
|
Steps the trainer, passing trajectories to wrapped trainer and calling trainer advance
|
|
"""
|
|
for traj_queue, internal_traj_queue in zip(
|
|
self.trajectory_queues, self.internal_trajectory_queues
|
|
):
|
|
try:
|
|
# We grab at most the maximum length of the queue.
|
|
# This ensures that even if the queue is being filled faster than it is
|
|
# being emptied, the trajectories in the queue are on-policy.
|
|
for _ in range(traj_queue.maxlen):
|
|
t = traj_queue.get_nowait()
|
|
# adds to wrapped trainers queue
|
|
internal_traj_queue.put(t)
|
|
self._process_trajectory(t)
|
|
except AgentManagerQueue.Empty:
|
|
pass
|
|
|
|
self.next_summary_step = self.trainer.next_summary_step
|
|
|
|
self.trainer.advance()
|
|
|
|
for internal_q in self.internal_policy_queues:
|
|
# Get policies that correspond to the policy queue in question
|
|
try:
|
|
policy = cast(TFPolicy, internal_q.get_nowait())
|
|
self.current_policy_snapshot = policy.get_weights()
|
|
self.learning_policy_queues[internal_q.behavior_id].put(policy)
|
|
except AgentManagerQueue.Empty:
|
|
pass
|
|
|
|
if self.get_step - self.last_save > self.steps_between_save:
|
|
self._save_snapshot(self.trainer.policy)
|
|
self.last_save = self.get_step
|
|
|
|
if self.get_step - self.last_swap > self.steps_between_swap:
|
|
self._swap_snapshots()
|
|
self.last_swap = self.get_step
|
|
|
|
# Dump trajectories from non-learning policy
|
|
for traj_queue in self.ignored_trajectory_queues:
|
|
try:
|
|
for _ in range(traj_queue.maxlen):
|
|
traj_queue.get_nowait()
|
|
except AgentManagerQueue.Empty:
|
|
pass
|
|
|
|
def end_episode(self):
|
|
self.trainer.end_episode()
|
|
|
|
def save_model(self, name_behavior_id: str) -> None:
|
|
self.trainer.save_model(name_behavior_id)
|
|
|
|
def export_model(self, name_behavior_id: str) -> None:
|
|
self.trainer.export_model(name_behavior_id)
|
|
|
|
def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy:
|
|
return self.trainer.create_policy(brain_parameters)
|
|
|
|
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None:
|
|
"""
|
|
Adds policy to trainer. For the first policy added, add a trainer
|
|
to the policy and set the learning behavior name to name_behavior_id.
|
|
:param name_behavior_id: Behavior ID that the policy should belong to.
|
|
:param policy: Policy to associate with name_behavior_id.
|
|
"""
|
|
self.policies[name_behavior_id] = policy
|
|
policy.create_tf_graph()
|
|
|
|
# First policy encountered
|
|
if not self.learning_behavior_name:
|
|
weights = policy.get_weights()
|
|
self.current_policy_snapshot = weights
|
|
self.trainer.add_policy(name_behavior_id, policy)
|
|
self._save_snapshot(policy) # Need to save after trainer initializes policy
|
|
self.learning_behavior_name = name_behavior_id
|
|
behavior_id_parsed = BehaviorIdentifiers.from_name_behavior_id(
|
|
self.learning_behavior_name
|
|
)
|
|
team_id = behavior_id_parsed.behavior_ids["team"]
|
|
self._stats_reporter.add_property(StatsPropertyType.SELF_PLAY_TEAM, team_id)
|
|
else:
|
|
# for saving/swapping snapshots
|
|
policy.init_load_weights()
|
|
|
|
def get_policy(self, name_behavior_id: str) -> TFPolicy:
|
|
return self.policies[name_behavior_id]
|
|
|
|
def _save_snapshot(self, policy: TFPolicy) -> None:
|
|
weights = policy.get_weights()
|
|
try:
|
|
self.policy_snapshots[self.snapshot_counter] = weights
|
|
except IndexError:
|
|
self.policy_snapshots.append(weights)
|
|
self.policy_elos[self.snapshot_counter] = self.current_elo
|
|
self.snapshot_counter = (self.snapshot_counter + 1) % self.window
|
|
|
|
def _swap_snapshots(self) -> None:
|
|
for q in self.policy_queues:
|
|
name_behavior_id = q.behavior_id
|
|
# here is the place for a sampling protocol
|
|
if name_behavior_id == self.learning_behavior_name:
|
|
continue
|
|
elif np.random.uniform() < (1 - self.play_against_current_self_ratio):
|
|
x = np.random.randint(len(self.policy_snapshots))
|
|
snapshot = self.policy_snapshots[x]
|
|
else:
|
|
snapshot = self.current_policy_snapshot
|
|
x = "current"
|
|
self.policy_elos[-1] = self.current_elo
|
|
self.current_opponent = -1 if x == "current" else x
|
|
logger.debug(
|
|
"Step {}: Swapping snapshot {} to id {} with {} learning".format(
|
|
self.get_step, x, name_behavior_id, self.learning_behavior_name
|
|
)
|
|
)
|
|
policy = self.get_policy(name_behavior_id)
|
|
policy.load_weights(snapshot)
|
|
q.put(policy)
|
|
|
|
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None:
|
|
"""
|
|
Adds a policy queue to the list of queues to publish to when this Trainer
|
|
makes a policy update
|
|
:param queue: Policy queue to publish to.
|
|
"""
|
|
super().publish_policy_queue(policy_queue)
|
|
if policy_queue.behavior_id == self.learning_behavior_name:
|
|
|
|
internal_policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue(
|
|
policy_queue.behavior_id
|
|
)
|
|
|
|
self.internal_policy_queues.append(internal_policy_queue)
|
|
self.learning_policy_queues[policy_queue.behavior_id] = policy_queue
|
|
self.trainer.publish_policy_queue(internal_policy_queue)
|
|
|
|
def subscribe_trajectory_queue(
|
|
self, trajectory_queue: AgentManagerQueue[Trajectory]
|
|
) -> None:
|
|
"""
|
|
Adds a trajectory queue to the list of queues for the trainer to ingest Trajectories from.
|
|
:param queue: Trajectory queue to publish to.
|
|
"""
|
|
|
|
if trajectory_queue.behavior_id == self.learning_behavior_name:
|
|
super().subscribe_trajectory_queue(trajectory_queue)
|
|
|
|
internal_trajectory_queue: AgentManagerQueue[
|
|
Trajectory
|
|
] = AgentManagerQueue(trajectory_queue.behavior_id)
|
|
|
|
self.internal_trajectory_queues.append(internal_trajectory_queue)
|
|
self.trainer.subscribe_trajectory_queue(internal_trajectory_queue)
|
|
else:
|
|
self.ignored_trajectory_queues.append(trajectory_queue)
|
|
|
|
|
|
# Taken from https://github.com/Unity-Technologies/ml-agents/pull/1975 and
|
|
# https://metinmediamath.wordpress.com/2013/11/27/how-to-calculate-the-elo-rating-including-example/
|
|
# ELO calculation
|
|
|
|
|
|
def compute_elo_rating_changes(rating1: float, rating2: float, result: float) -> float:
|
|
r1 = pow(10, rating1 / 400)
|
|
r2 = pow(10, rating2 / 400)
|
|
|
|
summed = r1 + r2
|
|
e1 = r1 / summed
|
|
|
|
change = result - e1
|
|
return change
|