Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

289 行
12 KiB

# # Unity ML-Agents Toolkit
# ## ML-Agent Learning (Ghost Trainer)
from typing import Deque, Dict, List, Any, cast
import numpy as np
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.brain import BrainParameters
from mlagents.trainers.policy import Policy
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.stats import StatsPropertyType
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
logger = get_logger(__name__)
class GhostTrainer(Trainer):
def __init__(
self, trainer, brain_name, reward_buff_cap, trainer_parameters, training, run_id
):
"""
Responsible for collecting experiences and training trainer model via self_play.
:param trainer: The trainer of the policy/policies being trained with self_play
:param brain_name: The name of the brain associated with trainer config
:param reward_buff_cap: Max reward history to track in the reward buffer
:param trainer_parameters: The parameters for the trainer (dictionary).
:param training: Whether the trainer is set for training.
:param run_id: The identifier of the current run
"""
super(GhostTrainer, self).__init__(
brain_name, trainer_parameters, training, run_id, reward_buff_cap
)
self.trainer = trainer
self.internal_policy_queues: List[AgentManagerQueue[Policy]] = []
self.internal_trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
self.ignored_trajectory_queues: List[AgentManagerQueue[Trajectory]] = []
self.learning_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {}
# assign ghost's stats collection to wrapped trainer's
self._stats_reporter = self.trainer.stats_reporter
# Set the logging to print ELO in the console
self._stats_reporter.add_property(StatsPropertyType.SELF_PLAY, True)
self_play_parameters = trainer_parameters["self_play"]
self.window = self_play_parameters.get("window", 10)
self.play_against_current_self_ratio = self_play_parameters.get(
"play_against_current_self_ratio", 0.5
)
self.steps_between_save = self_play_parameters.get("save_steps", 20000)
self.steps_between_swap = self_play_parameters.get("swap_steps", 20000)
self.policies: Dict[str, TFPolicy] = {}
self.policy_snapshots: List[Any] = []
self.snapshot_counter: int = 0
self.learning_behavior_name: str = None
self.current_policy_snapshot = None
self.last_save = 0
self.last_swap = 0
# Chosen because it is the initial ELO in Chess
self.initial_elo: float = self_play_parameters.get("initial_elo", 1200.0)
self.current_elo: float = self.initial_elo
self.policy_elos: List[float] = [self.initial_elo] * (
self.window + 1
) # for learning policy
self.current_opponent: int = 0
@property
def get_step(self) -> int:
"""
Returns the number of steps the trainer has performed
:return: the step count of the trainer
"""
return self.trainer.get_step
@property
def reward_buffer(self) -> Deque[float]:
"""
Returns the reward buffer. The reward buffer contains the cumulative
rewards of the most recent episodes completed by agents using this
trainer.
:return: the reward buffer.
"""
return self.trainer.reward_buffer
def _process_trajectory(self, trajectory: Trajectory) -> None:
if trajectory.done_reached and not trajectory.max_step_reached:
# Assumption is that final reward is 1/.5/0 for win/draw/loss
final_reward = trajectory.steps[-1].reward
result = 0.5
if final_reward > 0:
result = 1.0
elif final_reward < 0:
result = 0.0
change = compute_elo_rating_changes(
self.current_elo, self.policy_elos[self.current_opponent], result
)
self.current_elo += change
self.policy_elos[self.current_opponent] -= change
opponents = np.array(self.policy_elos, dtype=np.float32)
self._stats_reporter.add_stat("Self-play/ELO", self.current_elo)
self._stats_reporter.add_stat(
"Self-play/Mean Opponent ELO", opponents.mean()
)
self._stats_reporter.add_stat("Self-play/Std Opponent ELO", opponents.std())
def advance(self) -> None:
"""
Steps the trainer, passing trajectories to wrapped trainer and calling trainer advance
"""
for traj_queue, internal_traj_queue in zip(
self.trajectory_queues, self.internal_trajectory_queues
):
try:
# We grab at most the maximum length of the queue.
# This ensures that even if the queue is being filled faster than it is
# being emptied, the trajectories in the queue are on-policy.
for _ in range(traj_queue.maxlen):
t = traj_queue.get_nowait()
# adds to wrapped trainers queue
internal_traj_queue.put(t)
self._process_trajectory(t)
except AgentManagerQueue.Empty:
pass
self.next_summary_step = self.trainer.next_summary_step
self.trainer.advance()
for internal_q in self.internal_policy_queues:
# Get policies that correspond to the policy queue in question
try:
policy = cast(TFPolicy, internal_q.get_nowait())
self.current_policy_snapshot = policy.get_weights()
self.learning_policy_queues[internal_q.behavior_id].put(policy)
except AgentManagerQueue.Empty:
pass
if self.get_step - self.last_save > self.steps_between_save:
self._save_snapshot(self.trainer.policy)
self.last_save = self.get_step
if self.get_step - self.last_swap > self.steps_between_swap:
self._swap_snapshots()
self.last_swap = self.get_step
# Dump trajectories from non-learning policy
for traj_queue in self.ignored_trajectory_queues:
try:
for _ in range(traj_queue.maxlen):
traj_queue.get_nowait()
except AgentManagerQueue.Empty:
pass
def end_episode(self):
self.trainer.end_episode()
def save_model(self, name_behavior_id: str) -> None:
self.trainer.save_model(name_behavior_id)
def export_model(self, name_behavior_id: str) -> None:
self.trainer.export_model(name_behavior_id)
def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy:
return self.trainer.create_policy(brain_parameters)
def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None:
"""
Adds policy to trainer. For the first policy added, add a trainer
to the policy and set the learning behavior name to name_behavior_id.
:param name_behavior_id: Behavior ID that the policy should belong to.
:param policy: Policy to associate with name_behavior_id.
"""
self.policies[name_behavior_id] = policy
policy.create_tf_graph()
# First policy encountered
if not self.learning_behavior_name:
weights = policy.get_weights()
self.current_policy_snapshot = weights
self.trainer.add_policy(name_behavior_id, policy)
self._save_snapshot(policy) # Need to save after trainer initializes policy
self.learning_behavior_name = name_behavior_id
behavior_id_parsed = BehaviorIdentifiers.from_name_behavior_id(
self.learning_behavior_name
)
team_id = behavior_id_parsed.behavior_ids["team"]
self._stats_reporter.add_property(StatsPropertyType.SELF_PLAY_TEAM, team_id)
else:
# for saving/swapping snapshots
policy.init_load_weights()
def get_policy(self, name_behavior_id: str) -> TFPolicy:
return self.policies[name_behavior_id]
def _save_snapshot(self, policy: TFPolicy) -> None:
weights = policy.get_weights()
try:
self.policy_snapshots[self.snapshot_counter] = weights
except IndexError:
self.policy_snapshots.append(weights)
self.policy_elos[self.snapshot_counter] = self.current_elo
self.snapshot_counter = (self.snapshot_counter + 1) % self.window
def _swap_snapshots(self) -> None:
for q in self.policy_queues:
name_behavior_id = q.behavior_id
# here is the place for a sampling protocol
if name_behavior_id == self.learning_behavior_name:
continue
elif np.random.uniform() < (1 - self.play_against_current_self_ratio):
x = np.random.randint(len(self.policy_snapshots))
snapshot = self.policy_snapshots[x]
else:
snapshot = self.current_policy_snapshot
x = "current"
self.policy_elos[-1] = self.current_elo
self.current_opponent = -1 if x == "current" else x
logger.debug(
"Step {}: Swapping snapshot {} to id {} with {} learning".format(
self.get_step, x, name_behavior_id, self.learning_behavior_name
)
)
policy = self.get_policy(name_behavior_id)
policy.load_weights(snapshot)
q.put(policy)
def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None:
"""
Adds a policy queue to the list of queues to publish to when this Trainer
makes a policy update
:param queue: Policy queue to publish to.
"""
super().publish_policy_queue(policy_queue)
if policy_queue.behavior_id == self.learning_behavior_name:
internal_policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue(
policy_queue.behavior_id
)
self.internal_policy_queues.append(internal_policy_queue)
self.learning_policy_queues[policy_queue.behavior_id] = policy_queue
self.trainer.publish_policy_queue(internal_policy_queue)
def subscribe_trajectory_queue(
self, trajectory_queue: AgentManagerQueue[Trajectory]
) -> None:
"""
Adds a trajectory queue to the list of queues for the trainer to ingest Trajectories from.
:param queue: Trajectory queue to publish to.
"""
if trajectory_queue.behavior_id == self.learning_behavior_name:
super().subscribe_trajectory_queue(trajectory_queue)
internal_trajectory_queue: AgentManagerQueue[
Trajectory
] = AgentManagerQueue(trajectory_queue.behavior_id)
self.internal_trajectory_queues.append(internal_trajectory_queue)
self.trainer.subscribe_trajectory_queue(internal_trajectory_queue)
else:
self.ignored_trajectory_queues.append(trajectory_queue)
# Taken from https://github.com/Unity-Technologies/ml-agents/pull/1975 and
# https://metinmediamath.wordpress.com/2013/11/27/how-to-calculate-the-elo-rating-including-example/
# ELO calculation
def compute_elo_rating_changes(rating1: float, rating2: float, result: float) -> float:
r1 = pow(10, rating1 / 400)
r2 = pow(10, rating2 / 400)
summed = r1 + r2
e1 = r1 / summed
change = result - e1
return change