Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

284 行
10 KiB

# # Unity ML-Agents Toolkit
import logging
from typing import Dict, List, Deque, Any
import os
import tensorflow as tf
import numpy as np
from collections import deque, defaultdict
from mlagents.envs import UnityException, AllBrainInfo, ActionInfoOutputs, BrainInfo
from mlagents.envs.timers import set_gauge
from mlagents.trainers.trainer_metrics import TrainerMetrics
from mlagents.trainers.buffer import Buffer
from mlagents.trainers.tf_policy import Policy
from mlagents.envs import BrainParameters
LOGGER = logging.getLogger("mlagents.trainers")
class UnityTrainerException(UnityException):
"""
Related to errors with the Trainer.
"""
pass
class Trainer(object):
"""This class is the base class for the mlagents.envs.trainers"""
def __init__(
self,
brain: BrainParameters,
trainer_parameters: dict,
training: bool,
run_id: str,
reward_buff_cap: int = 1,
):
"""
Responsible for collecting experiences and training a neural network model.
:BrainParameters brain: Brain to be trained.
:dict trainer_parameters: The parameters for the trainer (dictionary).
:bool training: Whether the trainer is set for training.
:str run_id: The identifier of the current run
:int reward_buff_cap:
"""
self.param_keys: List[str] = []
self.brain_name = brain.brain_name
self.run_id = run_id
self.trainer_parameters = trainer_parameters
self.summary_path = trainer_parameters["summary_path"]
if not os.path.exists(self.summary_path):
os.makedirs(self.summary_path)
self.cumulative_returns_since_policy_update: List[float] = []
self.is_training = training
self.stats: Dict[str, List] = defaultdict(list)
self.trainer_metrics = TrainerMetrics(
path=self.summary_path + ".csv", brain_name=self.brain_name
)
self.summary_writer = tf.summary.FileWriter(self.summary_path)
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
self.policy: Policy = None
self.step: int = 0
def check_param_keys(self):
for k in self.param_keys:
if k not in self.trainer_parameters:
raise UnityTrainerException(
"The hyper-parameter {0} could not be found for the {1} trainer of "
"brain {2}.".format(k, self.__class__, self.brain_name)
)
def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
"""
Takes a parameter dictionary and converts it to a human-readable string.
Recurses if there are multiple levels of dict. Used to print out hyperaparameters.
param: param_dict: A Dictionary of key, value parameters.
return: A string version of this dictionary.
"""
if not isinstance(param_dict, dict):
return str(param_dict)
else:
append_newline = "\n" if num_tabs > 0 else ""
return append_newline + "\n".join(
[
"\t"
+ " " * num_tabs
+ "{0}:\t{1}".format(
x, self.dict_to_str(param_dict[x], num_tabs + 1)
)
for x in param_dict
]
)
def __str__(self) -> str:
return """Hyperparameters for the {0} of brain {1}: \n{2}""".format(
self.__class__.__name__,
self.brain_name,
self.dict_to_str(self.trainer_parameters, 0),
)
@property
def parameters(self) -> Dict[str, Any]:
"""
Returns the trainer parameters of the trainer.
"""
return self.trainer_parameters
@property
def get_max_steps(self) -> float:
"""
Returns the maximum number of steps. Is used to know when the trainer should be stopped.
:return: The maximum number of steps of the trainer
"""
return float(self.trainer_parameters["max_steps"])
@property
def get_step(self) -> int:
"""
Returns the number of steps the trainer has performed
:return: the step count of the trainer
"""
return self.step
@property
def reward_buffer(self) -> Deque[float]:
"""
Returns the reward buffer. The reward buffer contains the cumulative
rewards of the most recent episodes completed by agents using this
trainer.
:return: the reward buffer.
"""
return self._reward_buffer
def increment_step(self, n_steps: int) -> None:
"""
Increment the step count of the trainer
:param n_steps: number of steps to increment the step count by
"""
self.step = self.policy.increment_step(n_steps)
def save_model(self) -> None:
"""
Saves the model
"""
self.policy.save_model(self.get_step)
def export_model(self) -> None:
"""
Exports the model
"""
self.policy.export_model()
def write_training_metrics(self) -> None:
"""
Write training metrics to a CSV file
:return:
"""
self.trainer_metrics.write_training_metrics()
def write_summary(
self, global_step: int, delta_train_start: float, lesson_num: int = 0
) -> None:
"""
Saves training statistics to Tensorboard.
:param delta_train_start: Time elapsed since training started.
:param lesson_num: Current lesson number in curriculum.
:param global_step: The number of steps the simulation has been going for
"""
if (
global_step % self.trainer_parameters["summary_freq"] == 0
and global_step != 0
):
is_training = (
"Training."
if self.is_training and self.get_step <= self.get_max_steps
else "Not Training."
)
step = min(self.get_step, self.get_max_steps)
if len(self.stats["Environment/Cumulative Reward"]) > 0:
mean_reward = np.mean(self.stats["Environment/Cumulative Reward"])
LOGGER.info(
" {}: {}: Step: {}. "
"Time Elapsed: {:0.3f} s "
"Mean "
"Reward: {:0.3f}"
". Std of Reward: {:0.3f}. {}".format(
self.run_id,
self.brain_name,
step,
delta_train_start,
mean_reward,
np.std(self.stats["Environment/Cumulative Reward"]),
is_training,
)
)
set_gauge(f"{self.brain_name}.mean_reward", mean_reward)
else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(
self.run_id, self.brain_name, step, is_training
)
)
summary = tf.Summary()
for key in self.stats:
if len(self.stats[key]) > 0:
stat_mean = float(np.mean(self.stats[key]))
summary.value.add(tag="{}".format(key), simple_value=stat_mean)
self.stats[key] = []
summary.value.add(tag="Environment/Lesson", simple_value=lesson_num)
self.summary_writer.add_summary(summary, step)
self.summary_writer.flush()
def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None:
"""
Saves text to Tensorboard.
Note: Only works on tensorflow r1.2 or above.
:param key: The name of the text.
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
"""
try:
with tf.Session() as sess:
s_op = tf.summary.text(
key,
tf.convert_to_tensor(
([[str(x), str(input_dict[x])] for x in input_dict])
),
)
s = sess.run(s_op)
self.summary_writer.add_summary(s, self.get_step)
except Exception:
LOGGER.info(
"Cannot write text summary for Tensorboard. Tensorflow version must be r1.2 or above."
)
pass
def add_experiences(
self,
curr_all_info: AllBrainInfo,
next_all_info: AllBrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param next_all_info: Dictionary of all current brains and corresponding BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def process_experiences(
self, current_info: AllBrainInfo, next_info: AllBrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
Processing involves calculating value and advantage targets for model updating step.
:param current_info: Dictionary of all current-step brains and corresponding BrainInfo.
:param next_info: Dictionary of all next-step brains and corresponding BrainInfo.
"""
raise UnityTrainerException(
"The process_experiences method was not implemented."
)
def end_episode(self):
"""
A signal that the Episode has ended. The buffer must be reset.
Get only called when the academy resets.
"""
raise UnityTrainerException("The end_episode method was not implemented.")
def is_ready_update(self):
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to wether or not update_model() can be run
"""
raise UnityTrainerException("The is_ready_update method was not implemented.")
def update_policy(self):
"""
Uses demonstration_buffer to update model.
"""
raise UnityTrainerException("The update_model method was not implemented.")