Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

419 行
16 KiB

from collections import defaultdict
from enum import Enum
from typing import List, Dict, NamedTuple, Any, Optional
import numpy as np
import abc
import csv
import os
import time
from threading import RLock
from mlagents_envs.logging_util import get_logger
from mlagents_envs.timers import set_gauge
from mlagents.tf_utils import tf, generate_session_config
logger = get_logger(__name__)
class StatsSummary(NamedTuple):
mean: float
std: float
num: int
@staticmethod
def empty() -> "StatsSummary":
return StatsSummary(0.0, 0.0, 0)
class StatsPropertyType(Enum):
HYPERPARAMETERS = "hyperparameters"
SELF_PLAY = "selfplay"
SALIENCY = "saliency"
class StatsWriter(abc.ABC):
"""
A StatsWriter abstract class. A StatsWriter takes in a category, key, scalar value, and step
and writes it out by some method.
"""
@abc.abstractmethod
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
pass
def add_property(
self, category: str, property_type: StatsPropertyType, value: Any
) -> None:
"""
Add a generic property to the StatsWriter. This could be e.g. a Dict of hyperparameters,
a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible
with all types of properties. For instance, a TB writer doesn't need a max step, nor should
we write hyperparameters to the CSV.
:param category: The category that the property belongs to.
:param type: The type of property.
:param value: The property itself.
"""
pass
class GaugeWriter(StatsWriter):
"""
Write all stats that we recieve to the timer gauges, so we can track them offline easily
"""
@staticmethod
def sanitize_string(s: str) -> str:
"""
Clean up special characters in the category and value names.
"""
return s.replace("/", ".").replace(" ", "")
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
for val, stats_summary in values.items():
set_gauge(
GaugeWriter.sanitize_string(f"{category}.{val}.mean"),
float(stats_summary.mean),
)
class ConsoleWriter(StatsWriter):
def __init__(self):
self.training_start_time = time.time()
# If self-play, we want to print ELO as well as reward
self.self_play = False
self.self_play_team = -1
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
is_training = "Not Training."
if "Is Training" in values:
stats_summary = stats_summary = values["Is Training"]
if stats_summary.mean > 0.0:
is_training = "Training."
if "Environment/Cumulative Reward" in values:
stats_summary = values["Environment/Cumulative Reward"]
logger.info(
"{}: Step: {}. "
"Time Elapsed: {:0.3f} s "
"Mean "
"Reward: {:0.3f}"
". Std of Reward: {:0.3f}. {}".format(
category,
step,
time.time() - self.training_start_time,
stats_summary.mean,
stats_summary.std,
is_training,
)
)
if self.self_play and "Self-play/ELO" in values:
elo_stats = values["Self-play/ELO"]
logger.info(f"{category} ELO: {elo_stats.mean:0.3f}. ")
else:
logger.info(
"{}: Step: {}. No episode was completed since last summary. {}".format(
category, step, is_training
)
)
def add_property(
self, category: str, property_type: StatsPropertyType, value: Any
) -> None:
if property_type == StatsPropertyType.HYPERPARAMETERS:
logger.info(
"""Hyperparameters for behavior name {}: \n{}""".format(
category, self._dict_to_str(value, 0)
)
)
elif property_type == StatsPropertyType.SELF_PLAY:
assert isinstance(value, bool)
self.self_play = value
def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
"""
Takes a parameter dictionary and converts it to a human-readable string.
Recurses if there are multiple levels of dict. Used to print out hyperparameters.
param: param_dict: A Dictionary of key, value parameters.
return: A string version of this dictionary.
"""
if not isinstance(param_dict, dict):
return str(param_dict)
else:
append_newline = "\n" if num_tabs > 0 else ""
return append_newline + "\n".join(
[
"\t"
+ " " * num_tabs
+ "{}:\t{}".format(
x, self._dict_to_str(param_dict[x], num_tabs + 1)
)
for x in param_dict
]
)
class TensorboardWriter(StatsWriter):
def __init__(self, base_dir: str, clear_past_data: bool = False):
"""
A StatsWriter that writes to a Tensorboard summary.
:param base_dir: The directory within which to place all the summaries. Tensorboard files will be written to a
{base_dir}/{category} directory.
:param clear_past_data: Whether or not to clean up existing Tensorboard files associated with the base_dir and
category.
"""
self.summary_writers: Dict[str, tf.summary.FileWriter] = {}
self.base_dir: str = base_dir
self._clear_past_data = clear_past_data
self.trajectories = 0
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
self._maybe_create_summary_writer(category)
for key, value in values.items():
summary = tf.Summary()
summary.value.add(tag=f"{key}", simple_value=value.mean)
self.summary_writers[category].add_summary(summary, step)
self.summary_writers[category].flush()
def _maybe_create_summary_writer(self, category: str) -> None:
if category not in self.summary_writers:
filewriter_dir = "{basedir}/{category}".format(
basedir=self.base_dir, category=category
)
os.makedirs(filewriter_dir, exist_ok=True)
if self._clear_past_data:
self._delete_all_events_files(filewriter_dir)
self.summary_writers[category] = tf.summary.FileWriter(filewriter_dir)
def _delete_all_events_files(self, directory_name: str) -> None:
for file_name in os.listdir(directory_name):
if file_name.startswith("events.out"):
logger.warning(
f"{file_name} was left over from a previous run. Deleting."
)
full_fname = os.path.join(directory_name, file_name)
try:
os.remove(full_fname)
except OSError:
logger.warning(
"{} was left over from a previous run and "
"not deleted.".format(full_fname)
)
def add_property(
self, category: str, property_type: StatsPropertyType, value: Any
) -> None:
if property_type == StatsPropertyType.HYPERPARAMETERS:
assert isinstance(value, dict)
summary = self._dict_to_tensorboard("Hyperparameters", value)
self._maybe_create_summary_writer(category)
if summary is not None:
self.summary_writers[category].add_summary(summary, 0)
elif property_type == StatsPropertyType.SALIENCY:
self._maybe_create_summary_writer(category)
# adapted from https://gist.github.com/gyglim/1f8dfb1b5c82627ae3efcfbbadb9f514
def create_summary(label, values):
values = np.array(values)
counts, bin_edges = np.histogram(values, bins=len(values))
hist = tf.HistogramProto()
# value = value / np.sum(value)
# value = np.log(value)
# value = value - np.min(value)
# value = value / np.sum(value)
# for obs, grad in sorted(enumerate(value), reverse=True, key=lambda x: x[1]):
# print(f"Observation {obs} has relevance {grad}")
hist.min = float(np.min(values))
hist.max = float(np.max(values))
hist.num = int(np.prod(values.shape))
hist.sum = float(np.sum(values))
hist.sum_squares = float(np.sum(np.square(values)))
# hist.min = 0.0
# hist.max = float(np.max(value))
# hist.num = len(value)
# hist.sum = float(np.sum(value))
# hist.sum_squares = float(np.sum(value ** 2))
bin_edges = bin_edges[1:]
for edge in bin_edges:
hist.bucket_limit.append(edge)
for c in counts:
hist.bucket.append(c)
return tf.Summary.Value(tag=label, histo=hist)
if isinstance(value, dict):
svals = [create_summary(k,v) for k,v in value.items()]
else:
svals = create_summary("Saliency", value)
# Create and write Summary
# summary = tf.Summary(value=[tf.Summary.Value(tag="Saliency", histo=hist)])
summary = tf.Summary(value=svals)
self.summary_writers[category].add_summary(summary, self.trajectories)
self.summary_writers[category].flush()
self.trajectories += 1
def _dict_to_tensorboard(
self, name: str, input_dict: Dict[str, Any]
) -> Optional[bytes]:
"""
Convert a dict to a Tensorboard-encoded string.
:param name: The name of the text.
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
"""
try:
with tf.Session(config=generate_session_config()) as sess:
s_op = tf.summary.text(
name,
tf.convert_to_tensor(
[[str(x), str(input_dict[x])] for x in input_dict]
),
)
s = sess.run(s_op)
return s
except Exception:
logger.warning(
f"Could not write {name} summary for Tensorboard: {input_dict}"
)
return None
class CSVWriter(StatsWriter):
def __init__(self, base_dir: str, required_fields: List[str] = None):
"""
A StatsWriter that writes to a Tensorboard summary.
:param base_dir: The directory within which to place the CSV file, which will be {base_dir}/{category}.csv.
:param required_fields: If provided, the CSV writer won't write until these fields have statistics to write for
them.
"""
# We need to keep track of the fields in the CSV, as all rows need the same fields.
self.csv_fields: Dict[str, List[str]] = {}
self.required_fields = required_fields if required_fields else []
self.base_dir: str = base_dir
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
if self._maybe_create_csv_file(category, list(values.keys())):
row = [str(step)]
# Only record the stats that showed up in the first valid row
for key in self.csv_fields[category]:
_val = values.get(key, None)
row.append(str(_val.mean) if _val else "None")
with open(self._get_filepath(category), "a") as file:
writer = csv.writer(file)
writer.writerow(row)
def _maybe_create_csv_file(self, category: str, keys: List[str]) -> bool:
"""
If no CSV file exists and the keys have the required values,
make the CSV file and write hte title row.
Returns True if there is now (or already is) a valid CSV file.
"""
if category not in self.csv_fields:
summary_dir = self.base_dir
os.makedirs(summary_dir, exist_ok=True)
# Only store if the row contains the required fields
if all(item in keys for item in self.required_fields):
self.csv_fields[category] = keys
with open(self._get_filepath(category), "w") as file:
title_row = ["Steps"]
title_row.extend(keys)
writer = csv.writer(file)
writer.writerow(title_row)
return True
return False
return True
def _get_filepath(self, category: str) -> str:
file_dir = os.path.join(self.base_dir, category + ".csv")
return file_dir
class StatsReporter:
writers: List[StatsWriter] = []
stats_dict: Dict[str, Dict[str, List]] = defaultdict(lambda: defaultdict(list))
lock = RLock()
def __init__(self, category: str):
"""
Generic StatsReporter. A category is the broadest type of storage (would
correspond the run name and trainer name, e.g. 3DBalltest_3DBall. A key is the
type of stat it is (e.g. Environment/Reward). Finally the Value is the float value
attached to this stat.
"""
self.category: str = category
@staticmethod
def add_writer(writer: StatsWriter) -> None:
with StatsReporter.lock:
StatsReporter.writers.append(writer)
def add_property(self, property_type: StatsPropertyType, value: Any) -> None:
"""
Add a generic property to the StatsReporter. This could be e.g. a Dict of hyperparameters,
a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible
with all types of properties. For instance, a TB writer doesn't need a max step, nor should
we write hyperparameters to the CSV.
:param key: The type of property.
:param value: The property itself.
"""
with StatsReporter.lock:
for writer in StatsReporter.writers:
writer.add_property(self.category, property_type, value)
def add_stat(self, key: str, value: float) -> None:
"""
Add a float value stat to the StatsReporter.
:param key: The type of statistic, e.g. Environment/Reward.
:param value: the value of the statistic.
"""
with StatsReporter.lock:
StatsReporter.stats_dict[self.category][key].append(value)
def set_stat(self, key: str, value: float) -> None:
"""
Sets a stat value to a float. This is for values that we don't want to average, and just
want the latest.
:param key: The type of statistic, e.g. Environment/Reward.
:param value: the value of the statistic.
"""
with StatsReporter.lock:
StatsReporter.stats_dict[self.category][key] = [value]
def write_stats(self, step: int) -> None:
"""
Write out all stored statistics that fall under the category specified.
The currently stored values will be averaged, written out as a single value,
and the buffer cleared.
:param step: Training step which to write these stats as.
"""
with StatsReporter.lock:
values: Dict[str, StatsSummary] = {}
for key in StatsReporter.stats_dict[self.category]:
if len(StatsReporter.stats_dict[self.category][key]) > 0:
stat_summary = self.get_stats_summaries(key)
values[key] = stat_summary
for writer in StatsReporter.writers:
writer.write_stats(self.category, values, step)
del StatsReporter.stats_dict[self.category]
def get_stats_summaries(self, key: str) -> StatsSummary:
"""
Get the mean, std, and count of a particular statistic, since last write.
:param key: The type of statistic, e.g. Environment/Reward.
:returns: A StatsSummary NamedTuple containing (mean, std, count).
"""
if len(StatsReporter.stats_dict[self.category][key]) > 0:
return StatsSummary(
mean=np.mean(StatsReporter.stats_dict[self.category][key]),
std=np.std(StatsReporter.stats_dict[self.category][key]),
num=len(StatsReporter.stats_dict[self.category][key]),
)
return StatsSummary.empty()