from collections import defaultdict from enum import Enum from typing import List, Dict, NamedTuple, Any import numpy as np import abc import csv import os import time from mlagents_envs.logging_util import get_logger from mlagents_envs.timers import set_gauge from mlagents.tf_utils import tf, generate_session_config import horovod.tensorflow as hvd logger = get_logger(__name__) class StatsSummary(NamedTuple): mean: float std: float num: int @staticmethod def empty() -> "StatsSummary": return StatsSummary(0.0, 0.0, 0) class StatsPropertyType(Enum): HYPERPARAMETERS = "hyperparameters" SELF_PLAY = "selfplay" class StatsWriter(abc.ABC): """ A StatsWriter abstract class. A StatsWriter takes in a category, key, scalar value, and step and writes it out by some method. """ @abc.abstractmethod def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: pass def add_property( self, category: str, property_type: StatsPropertyType, value: Any ) -> None: """ Add a generic property to the StatsWriter. This could be e.g. a Dict of hyperparameters, a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible with all types of properties. For instance, a TB writer doesn't need a max step, nor should we write hyperparameters to the CSV. :param category: The category that the property belongs to. :param type: The type of property. :param value: The property itself. """ pass class GaugeWriter(StatsWriter): """ Write all stats that we recieve to the timer gauges, so we can track them offline easily """ @staticmethod def sanitize_string(s: str) -> str: """ Clean up special characters in the category and value names. """ return s.replace("/", ".").replace(" ", "") def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: for val, stats_summary in values.items(): set_gauge( GaugeWriter.sanitize_string(f"{category}.{val}.mean"), float(stats_summary.mean), ) class ConsoleWriter(StatsWriter): def __init__(self): self.training_start_time = time.time() # If self-play, we want to print ELO as well as reward self.self_play = False self.self_play_team = -1 def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: is_training = "Not Training." if "Is Training" in values: stats_summary = values["Is Training"] if stats_summary.mean > 0.0: is_training = "Training." if "Environment/Cumulative Reward" in values: stats_summary = values["Environment/Cumulative Reward"] rank = hvd.rank() logger.info( "Horovod Rank: {}, {}: Step: {}. " "Time Elapsed: {:0.3f} s " "Mean " "Reward: {:0.3f}" ". Std of Reward: {:0.3f}. {}".format( rank, category, step, time.time() - self.training_start_time, stats_summary.mean, stats_summary.std, is_training, ) ) if self.self_play and "Self-play/ELO" in values: elo_stats = values["Self-play/ELO"] logger.info("{} ELO: {:0.3f}. ".format(category, elo_stats.mean)) else: logger.info( "{}: Step: {}. No episode was completed since last summary. {}".format( category, step, is_training ) ) def add_property( self, category: str, property_type: StatsPropertyType, value: Any ) -> None: if property_type == StatsPropertyType.HYPERPARAMETERS: logger.info( """Hyperparameters for behavior name {0}: \n{1}""".format( category, self._dict_to_str(value, 0) ) ) elif property_type == StatsPropertyType.SELF_PLAY: assert isinstance(value, bool) self.self_play = value def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str: """ Takes a parameter dictionary and converts it to a human-readable string. Recurses if there are multiple levels of dict. Used to print out hyperparameters. param: param_dict: A Dictionary of key, value parameters. return: A string version of this dictionary. """ if not isinstance(param_dict, dict): return str(param_dict) else: append_newline = "\n" if num_tabs > 0 else "" return append_newline + "\n".join( [ "\t" + " " * num_tabs + "{0}:\t{1}".format( x, self._dict_to_str(param_dict[x], num_tabs + 1) ) for x in param_dict ] ) class TensorboardWriter(StatsWriter): def __init__(self, base_dir: str, clear_past_data: bool = False): """ A StatsWriter that writes to a Tensorboard summary. :param base_dir: The directory within which to place all the summaries. Tensorboard files will be written to a {base_dir}/{category} directory. :param clear_past_data: Whether or not to clean up existing Tensorboard files associated with the base_dir and category. """ self.summary_writers: Dict[str, tf.summary.FileWriter] = {} self.base_dir: str = base_dir self._clear_past_data = clear_past_data def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: self._maybe_create_summary_writer(category) for key, value in values.items(): summary = tf.Summary() summary.value.add(tag="{}".format(key), simple_value=value.mean) self.summary_writers[category].add_summary(summary, step) self.summary_writers[category].flush() def _maybe_create_summary_writer(self, category: str) -> None: if category not in self.summary_writers: filewriter_dir = "{basedir}/{category}".format( basedir=self.base_dir, category=category ) os.makedirs(filewriter_dir, exist_ok=True) if self._clear_past_data: self._delete_all_events_files(filewriter_dir) self.summary_writers[category] = tf.summary.FileWriter(filewriter_dir) def _delete_all_events_files(self, directory_name: str) -> None: for file_name in os.listdir(directory_name): if file_name.startswith("events.out"): logger.warning( "{} was left over from a previous run. Deleting.".format(file_name) ) full_fname = os.path.join(directory_name, file_name) try: os.remove(full_fname) except OSError: logger.warning( "{} was left over from a previous run and " "not deleted.".format(full_fname) ) def add_property( self, category: str, property_type: StatsPropertyType, value: Any ) -> None: if property_type == StatsPropertyType.HYPERPARAMETERS: assert isinstance(value, dict) text = self._dict_to_tensorboard("Hyperparameters", value) self._maybe_create_summary_writer(category) self.summary_writers[category].add_summary(text, 0) def _dict_to_tensorboard(self, name: str, input_dict: Dict[str, Any]) -> str: """ Convert a dict to a Tensorboard-encoded string. :param name: The name of the text. :param input_dict: A dictionary that will be displayed in a table on Tensorboard. """ try: with tf.Session(config=generate_session_config()) as sess: s_op = tf.summary.text( name, tf.convert_to_tensor( ([[str(x), str(input_dict[x])] for x in input_dict]) ), ) s = sess.run(s_op) return s except Exception: logger.warning("Could not write text summary for Tensorboard.") return "" class CSVWriter(StatsWriter): def __init__(self, base_dir: str, required_fields: List[str] = None): """ A StatsWriter that writes to a Tensorboard summary. :param base_dir: The directory within which to place the CSV file, which will be {base_dir}/{category}.csv. :param required_fields: If provided, the CSV writer won't write until these fields have statistics to write for them. """ # We need to keep track of the fields in the CSV, as all rows need the same fields. self.csv_fields: Dict[str, List[str]] = {} self.required_fields = required_fields if required_fields else [] self.base_dir: str = base_dir def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: if self._maybe_create_csv_file(category, list(values.keys())): row = [str(step)] # Only record the stats that showed up in the first valid row for key in self.csv_fields[category]: _val = values.get(key, None) row.append(str(_val.mean) if _val else "None") with open(self._get_filepath(category), "a") as file: writer = csv.writer(file) writer.writerow(row) def _maybe_create_csv_file(self, category: str, keys: List[str]) -> bool: """ If no CSV file exists and the keys have the required values, make the CSV file and write hte title row. Returns True if there is now (or already is) a valid CSV file. """ if category not in self.csv_fields: summary_dir = self.base_dir os.makedirs(summary_dir, exist_ok=True) # Only store if the row contains the required fields if all(item in keys for item in self.required_fields): self.csv_fields[category] = keys with open(self._get_filepath(category), "w") as file: title_row = ["Steps"] title_row.extend(keys) writer = csv.writer(file) writer.writerow(title_row) return True return False return True def _get_filepath(self, category: str) -> str: file_dir = os.path.join(self.base_dir, category + ".csv") return file_dir class StatsReporter: writers: List[StatsWriter] = [] stats_dict: Dict[str, Dict[str, List]] = defaultdict(lambda: defaultdict(list)) def __init__(self, category: str): """ Generic StatsReporter. A category is the broadest type of storage (would correspond the run name and trainer name, e.g. 3DBalltest_3DBall. A key is the type of stat it is (e.g. Environment/Reward). Finally the Value is the float value attached to this stat. """ self.category: str = category @staticmethod def add_writer(writer: StatsWriter) -> None: StatsReporter.writers.append(writer) def add_property(self, property_type: StatsPropertyType, value: Any) -> None: """ Add a generic property to the StatsReporter. This could be e.g. a Dict of hyperparameters, a max step count, a trainer type, etc. Note that not all StatsWriters need to be compatible with all types of properties. For instance, a TB writer doesn't need a max step, nor should we write hyperparameters to the CSV. :param key: The type of property. :param value: The property itself. """ for writer in StatsReporter.writers: writer.add_property(self.category, property_type, value) def add_stat(self, key: str, value: float) -> None: """ Add a float value stat to the StatsReporter. :param key: The type of statistic, e.g. Environment/Reward. :param value: the value of the statistic. """ StatsReporter.stats_dict[self.category][key].append(value) def set_stat(self, key: str, value: float) -> None: """ Sets a stat value to a float. This is for values that we don't want to average, and just want the latest. :param key: The type of statistic, e.g. Environment/Reward. :param value: the value of the statistic. """ StatsReporter.stats_dict[self.category][key] = [value] def write_stats(self, step: int) -> None: """ Write out all stored statistics that fall under the category specified. The currently stored values will be averaged, written out as a single value, and the buffer cleared. :param step: Training step which to write these stats as. """ values: Dict[str, StatsSummary] = {} for key in StatsReporter.stats_dict[self.category]: if len(StatsReporter.stats_dict[self.category][key]) > 0: stat_summary = self.get_stats_summaries(key) values[key] = stat_summary for writer in StatsReporter.writers: writer.write_stats(self.category, values, step) del StatsReporter.stats_dict[self.category] def get_stats_summaries(self, key: str) -> StatsSummary: """ Get the mean, std, and count of a particular statistic, since last write. :param key: The type of statistic, e.g. Environment/Reward. :returns: A StatsSummary NamedTuple containing (mean, std, count). """ if len(StatsReporter.stats_dict[self.category][key]) > 0: return StatsSummary( mean=np.mean(StatsReporter.stats_dict[self.category][key]), std=np.std(StatsReporter.stats_dict[self.category][key]), num=len(StatsReporter.stats_dict[self.category][key]), ) return StatsSummary.empty()