您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
367 行
14 KiB
367 行
14 KiB
from collections import defaultdict
|
|
from enum import Enum
|
|
from typing import List, Dict, NamedTuple, Any
|
|
import numpy as np
|
|
import abc
|
|
import csv
|
|
import os
|
|
import time
|
|
|
|
from mlagents_envs.logging_util import get_logger
|
|
from mlagents_envs.timers import set_gauge
|
|
from mlagents.tf_utils import tf, generate_session_config
|
|
import horovod.tensorflow as hvd
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class StatsSummary(NamedTuple):
|
|
mean: float
|
|
std: float
|
|
num: int
|
|
|
|
@staticmethod
|
|
def empty() -> "StatsSummary":
|
|
return StatsSummary(0.0, 0.0, 0)
|
|
|
|
|
|
class StatsPropertyType(Enum):
|
|
HYPERPARAMETERS = "hyperparameters"
|
|
SELF_PLAY = "selfplay"
|
|
|
|
|
|
class StatsWriter(abc.ABC):
|
|
"""
|
|
A StatsWriter abstract class. A StatsWriter takes in a category, key, scalar value, and step
|
|
and writes it out by some method.
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def write_stats(
|
|
self, category: str, values: Dict[str, StatsSummary], step: int
|
|
) -> None:
|
|
pass
|
|
|
|
def add_property(
|
|
self, category: str, property_type: StatsPropertyType, value: Any
|
|
) -> None:
|
|
"""
|
|
Add a generic property to the StatsWriter. This could be e.g. a Dict of hyperparameters,
|
|
a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible
|
|
with all types of properties. For instance, a TB writer doesn't need a max step, nor should
|
|
we write hyperparameters to the CSV.
|
|
:param category: The category that the property belongs to.
|
|
:param type: The type of property.
|
|
:param value: The property itself.
|
|
"""
|
|
pass
|
|
|
|
|
|
class GaugeWriter(StatsWriter):
|
|
"""
|
|
Write all stats that we recieve to the timer gauges, so we can track them offline easily
|
|
"""
|
|
|
|
@staticmethod
|
|
def sanitize_string(s: str) -> str:
|
|
"""
|
|
Clean up special characters in the category and value names.
|
|
"""
|
|
return s.replace("/", ".").replace(" ", "")
|
|
|
|
def write_stats(
|
|
self, category: str, values: Dict[str, StatsSummary], step: int
|
|
) -> None:
|
|
for val, stats_summary in values.items():
|
|
set_gauge(
|
|
GaugeWriter.sanitize_string(f"{category}.{val}.mean"),
|
|
float(stats_summary.mean),
|
|
)
|
|
|
|
|
|
class ConsoleWriter(StatsWriter):
|
|
def __init__(self):
|
|
self.training_start_time = time.time()
|
|
# If self-play, we want to print ELO as well as reward
|
|
self.self_play = False
|
|
self.self_play_team = -1
|
|
|
|
def write_stats(
|
|
self, category: str, values: Dict[str, StatsSummary], step: int
|
|
) -> None:
|
|
is_training = "Not Training."
|
|
if "Is Training" in values:
|
|
stats_summary = values["Is Training"]
|
|
if stats_summary.mean > 0.0:
|
|
is_training = "Training."
|
|
|
|
if "Environment/Cumulative Reward" in values:
|
|
stats_summary = values["Environment/Cumulative Reward"]
|
|
rank = hvd.rank()
|
|
logger.info(
|
|
"Horovod Rank: {}, {}: Step: {}. "
|
|
"Time Elapsed: {:0.3f} s "
|
|
"Mean "
|
|
"Reward: {:0.3f}"
|
|
". Std of Reward: {:0.3f}. {}".format(
|
|
rank,
|
|
category,
|
|
step,
|
|
time.time() - self.training_start_time,
|
|
stats_summary.mean,
|
|
stats_summary.std,
|
|
is_training,
|
|
)
|
|
)
|
|
if self.self_play and "Self-play/ELO" in values:
|
|
elo_stats = values["Self-play/ELO"]
|
|
logger.info("{} ELO: {:0.3f}. ".format(category, elo_stats.mean))
|
|
else:
|
|
logger.info(
|
|
"{}: Step: {}. No episode was completed since last summary. {}".format(
|
|
category, step, is_training
|
|
)
|
|
)
|
|
|
|
def add_property(
|
|
self, category: str, property_type: StatsPropertyType, value: Any
|
|
) -> None:
|
|
if property_type == StatsPropertyType.HYPERPARAMETERS:
|
|
logger.info(
|
|
"""Hyperparameters for behavior name {0}: \n{1}""".format(
|
|
category, self._dict_to_str(value, 0)
|
|
)
|
|
)
|
|
elif property_type == StatsPropertyType.SELF_PLAY:
|
|
assert isinstance(value, bool)
|
|
self.self_play = value
|
|
|
|
def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str:
|
|
"""
|
|
Takes a parameter dictionary and converts it to a human-readable string.
|
|
Recurses if there are multiple levels of dict. Used to print out hyperparameters.
|
|
param: param_dict: A Dictionary of key, value parameters.
|
|
return: A string version of this dictionary.
|
|
"""
|
|
if not isinstance(param_dict, dict):
|
|
return str(param_dict)
|
|
else:
|
|
append_newline = "\n" if num_tabs > 0 else ""
|
|
return append_newline + "\n".join(
|
|
[
|
|
"\t"
|
|
+ " " * num_tabs
|
|
+ "{0}:\t{1}".format(
|
|
x, self._dict_to_str(param_dict[x], num_tabs + 1)
|
|
)
|
|
for x in param_dict
|
|
]
|
|
)
|
|
|
|
|
|
class TensorboardWriter(StatsWriter):
|
|
def __init__(self, base_dir: str, clear_past_data: bool = False):
|
|
"""
|
|
A StatsWriter that writes to a Tensorboard summary.
|
|
:param base_dir: The directory within which to place all the summaries. Tensorboard files will be written to a
|
|
{base_dir}/{category} directory.
|
|
:param clear_past_data: Whether or not to clean up existing Tensorboard files associated with the base_dir and
|
|
category.
|
|
"""
|
|
self.summary_writers: Dict[str, tf.summary.FileWriter] = {}
|
|
self.base_dir: str = base_dir
|
|
self._clear_past_data = clear_past_data
|
|
|
|
def write_stats(
|
|
self, category: str, values: Dict[str, StatsSummary], step: int
|
|
) -> None:
|
|
self._maybe_create_summary_writer(category)
|
|
for key, value in values.items():
|
|
summary = tf.Summary()
|
|
summary.value.add(tag="{}".format(key), simple_value=value.mean)
|
|
self.summary_writers[category].add_summary(summary, step)
|
|
self.summary_writers[category].flush()
|
|
|
|
def _maybe_create_summary_writer(self, category: str) -> None:
|
|
if category not in self.summary_writers:
|
|
filewriter_dir = "{basedir}/{category}".format(
|
|
basedir=self.base_dir, category=category
|
|
)
|
|
os.makedirs(filewriter_dir, exist_ok=True)
|
|
if self._clear_past_data:
|
|
self._delete_all_events_files(filewriter_dir)
|
|
self.summary_writers[category] = tf.summary.FileWriter(filewriter_dir)
|
|
|
|
def _delete_all_events_files(self, directory_name: str) -> None:
|
|
for file_name in os.listdir(directory_name):
|
|
if file_name.startswith("events.out"):
|
|
logger.warning(
|
|
"{} was left over from a previous run. Deleting.".format(file_name)
|
|
)
|
|
full_fname = os.path.join(directory_name, file_name)
|
|
try:
|
|
os.remove(full_fname)
|
|
except OSError:
|
|
logger.warning(
|
|
"{} was left over from a previous run and "
|
|
"not deleted.".format(full_fname)
|
|
)
|
|
|
|
def add_property(
|
|
self, category: str, property_type: StatsPropertyType, value: Any
|
|
) -> None:
|
|
if property_type == StatsPropertyType.HYPERPARAMETERS:
|
|
assert isinstance(value, dict)
|
|
text = self._dict_to_tensorboard("Hyperparameters", value)
|
|
self._maybe_create_summary_writer(category)
|
|
self.summary_writers[category].add_summary(text, 0)
|
|
|
|
def _dict_to_tensorboard(self, name: str, input_dict: Dict[str, Any]) -> str:
|
|
"""
|
|
Convert a dict to a Tensorboard-encoded string.
|
|
:param name: The name of the text.
|
|
:param input_dict: A dictionary that will be displayed in a table on Tensorboard.
|
|
"""
|
|
try:
|
|
with tf.Session(config=generate_session_config()) as sess:
|
|
s_op = tf.summary.text(
|
|
name,
|
|
tf.convert_to_tensor(
|
|
([[str(x), str(input_dict[x])] for x in input_dict])
|
|
),
|
|
)
|
|
s = sess.run(s_op)
|
|
return s
|
|
except Exception:
|
|
logger.warning("Could not write text summary for Tensorboard.")
|
|
return ""
|
|
|
|
|
|
class CSVWriter(StatsWriter):
|
|
def __init__(self, base_dir: str, required_fields: List[str] = None):
|
|
"""
|
|
A StatsWriter that writes to a Tensorboard summary.
|
|
:param base_dir: The directory within which to place the CSV file, which will be {base_dir}/{category}.csv.
|
|
:param required_fields: If provided, the CSV writer won't write until these fields have statistics to write for
|
|
them.
|
|
"""
|
|
# We need to keep track of the fields in the CSV, as all rows need the same fields.
|
|
self.csv_fields: Dict[str, List[str]] = {}
|
|
self.required_fields = required_fields if required_fields else []
|
|
self.base_dir: str = base_dir
|
|
|
|
def write_stats(
|
|
self, category: str, values: Dict[str, StatsSummary], step: int
|
|
) -> None:
|
|
if self._maybe_create_csv_file(category, list(values.keys())):
|
|
row = [str(step)]
|
|
# Only record the stats that showed up in the first valid row
|
|
for key in self.csv_fields[category]:
|
|
_val = values.get(key, None)
|
|
row.append(str(_val.mean) if _val else "None")
|
|
with open(self._get_filepath(category), "a") as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(row)
|
|
|
|
def _maybe_create_csv_file(self, category: str, keys: List[str]) -> bool:
|
|
"""
|
|
If no CSV file exists and the keys have the required values,
|
|
make the CSV file and write hte title row.
|
|
Returns True if there is now (or already is) a valid CSV file.
|
|
"""
|
|
if category not in self.csv_fields:
|
|
summary_dir = self.base_dir
|
|
os.makedirs(summary_dir, exist_ok=True)
|
|
# Only store if the row contains the required fields
|
|
if all(item in keys for item in self.required_fields):
|
|
self.csv_fields[category] = keys
|
|
with open(self._get_filepath(category), "w") as file:
|
|
title_row = ["Steps"]
|
|
title_row.extend(keys)
|
|
writer = csv.writer(file)
|
|
writer.writerow(title_row)
|
|
return True
|
|
return False
|
|
return True
|
|
|
|
def _get_filepath(self, category: str) -> str:
|
|
file_dir = os.path.join(self.base_dir, category + ".csv")
|
|
return file_dir
|
|
|
|
|
|
class StatsReporter:
|
|
writers: List[StatsWriter] = []
|
|
stats_dict: Dict[str, Dict[str, List]] = defaultdict(lambda: defaultdict(list))
|
|
|
|
def __init__(self, category: str):
|
|
"""
|
|
Generic StatsReporter. A category is the broadest type of storage (would
|
|
correspond the run name and trainer name, e.g. 3DBalltest_3DBall. A key is the
|
|
type of stat it is (e.g. Environment/Reward). Finally the Value is the float value
|
|
attached to this stat.
|
|
"""
|
|
self.category: str = category
|
|
|
|
@staticmethod
|
|
def add_writer(writer: StatsWriter) -> None:
|
|
StatsReporter.writers.append(writer)
|
|
|
|
def add_property(self, property_type: StatsPropertyType, value: Any) -> None:
|
|
"""
|
|
Add a generic property to the StatsReporter. This could be e.g. a Dict of hyperparameters,
|
|
a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible
|
|
with all types of properties. For instance, a TB writer doesn't need a max step, nor should
|
|
we write hyperparameters to the CSV.
|
|
:param key: The type of property.
|
|
:param value: The property itself.
|
|
"""
|
|
for writer in StatsReporter.writers:
|
|
writer.add_property(self.category, property_type, value)
|
|
|
|
def add_stat(self, key: str, value: float) -> None:
|
|
"""
|
|
Add a float value stat to the StatsReporter.
|
|
:param key: The type of statistic, e.g. Environment/Reward.
|
|
:param value: the value of the statistic.
|
|
"""
|
|
StatsReporter.stats_dict[self.category][key].append(value)
|
|
|
|
def set_stat(self, key: str, value: float) -> None:
|
|
"""
|
|
Sets a stat value to a float. This is for values that we don't want to average, and just
|
|
want the latest.
|
|
:param key: The type of statistic, e.g. Environment/Reward.
|
|
:param value: the value of the statistic.
|
|
"""
|
|
StatsReporter.stats_dict[self.category][key] = [value]
|
|
|
|
def write_stats(self, step: int) -> None:
|
|
"""
|
|
Write out all stored statistics that fall under the category specified.
|
|
The currently stored values will be averaged, written out as a single value,
|
|
and the buffer cleared.
|
|
:param step: Training step which to write these stats as.
|
|
"""
|
|
values: Dict[str, StatsSummary] = {}
|
|
for key in StatsReporter.stats_dict[self.category]:
|
|
if len(StatsReporter.stats_dict[self.category][key]) > 0:
|
|
stat_summary = self.get_stats_summaries(key)
|
|
values[key] = stat_summary
|
|
for writer in StatsReporter.writers:
|
|
writer.write_stats(self.category, values, step)
|
|
del StatsReporter.stats_dict[self.category]
|
|
|
|
def get_stats_summaries(self, key: str) -> StatsSummary:
|
|
"""
|
|
Get the mean, std, and count of a particular statistic, since last write.
|
|
:param key: The type of statistic, e.g. Environment/Reward.
|
|
:returns: A StatsSummary NamedTuple containing (mean, std, count).
|
|
"""
|
|
if len(StatsReporter.stats_dict[self.category][key]) > 0:
|
|
return StatsSummary(
|
|
mean=np.mean(StatsReporter.stats_dict[self.category][key]),
|
|
std=np.std(StatsReporter.stats_dict[self.category][key]),
|
|
num=len(StatsReporter.stats_dict[self.category][key]),
|
|
)
|
|
return StatsSummary.empty()
|