浏览代码

Remove TrainerMetrics and add CSVWriter using new StatsWriter API (#3108)

/asymm-envs
GitHub 5 年前
当前提交
2ac242f7
共有 10 个文件被更改,包括 145 次插入229 次删除
  1. 10
      ml-agents/mlagents/trainers/learn.py
  2. 5
      ml-agents/mlagents/trainers/ppo/trainer.py
  3. 5
      ml-agents/mlagents/trainers/sac/trainer.py
  4. 94
      ml-agents/mlagents/trainers/stats.py
  5. 59
      ml-agents/mlagents/trainers/tests/test_stats.py
  6. 3
      ml-agents/mlagents/trainers/tests/test_trainer_util.py
  7. 11
      ml-agents/mlagents/trainers/trainer.py
  8. 19
      ml-agents/mlagents/trainers/trainer_controller.py
  9. 46
      ml-agents/mlagents/trainers/tests/test_trainer_metrics.py
  10. 122
      ml-agents/mlagents/trainers/trainer_metrics.py

10
ml-agents/mlagents/trainers/learn.py


from mlagents.trainers.exception import TrainerError
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import load_config, TrainerFactory
from mlagents.trainers.stats import TensorboardWriter, StatsReporter
from mlagents.trainers.stats import TensorboardWriter, CSVWriter, StatsReporter
from mlagents_envs.environment import UnityEnvironment
from mlagents.trainers.sampler_class import SamplerManager
from mlagents.trainers.exception import SamplerException

trainer_config = load_config(trainer_config_path)
port = options.base_port + (sub_id * options.num_envs)
# Configure Tensorboard Writers and StatsReporter
# Configure CSV, Tensorboard Writers and StatsReporter
# We assume reward and episode length are needed in the CSV.
csv_writer = CSVWriter(
summaries_dir,
required_fields=["Environment/Cumulative Reward", "Environment/Episode Length"],
)
StatsReporter.add_writer(csv_writer)
if options.env_path is None:
port = 5004 # This is the in Editor Training Port

5
ml-agents/mlagents/trainers/ppo/trainer.py


The reward signal generators must be updated in this method at their own pace.
"""
buffer_length = self.update_buffer.num_experiences
self.trainer_metrics.start_policy_update_timer(
number_experiences=buffer_length,
mean_return=float(np.mean(self.cumulative_returns_since_policy_update)),
)
self.cumulative_returns_since_policy_update.clear()
# Make sure batch_size is a multiple of sequence length. During training, we

for stat, val in update_stats.items():
self.stats_reporter.add_stat(stat, val)
self.clear_update_buffer()
self.trainer_metrics.end_policy_update()
def discount_rewards(r, gamma=0.99, value_next=0.0):

5
ml-agents/mlagents/trainers/sac/trainer.py


If reward_signal_train_interval is met, update the reward signals from the buffer.
"""
if self.step % self.train_interval == 0:
self.trainer_metrics.start_policy_update_timer(
number_experiences=self.update_buffer.num_experiences,
mean_return=float(np.mean(self.cumulative_returns_since_policy_update)),
)
self.trainer_metrics.end_policy_update()
def update_sac_policy(self) -> None:
"""

94
ml-agents/mlagents/trainers/stats.py


from typing import List, Dict, NamedTuple
import numpy as np
import abc
import csv
class StatsSummary(NamedTuple):
mean: float
std: float
num: int
class StatsWriter(abc.ABC):
"""
A StatsWriter abstract class. A StatsWriter takes in a category, key, scalar value, and step

@abc.abstractmethod
def write_stats(self, category: str, key: str, value: float, step: int) -> None:
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
pass
@abc.abstractmethod

class TensorboardWriter(StatsWriter):
def __init__(self, base_dir: str):
"""
A StatsWriter that writes to a Tensorboard summary.
:param base_dir: The directory within which to place all the summaries. Tensorboard files will be written to a
{base_dir}/{category} directory.
"""
def write_stats(self, category: str, key: str, value: float, step: int) -> None:
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
summary = tf.Summary()
summary.value.add(tag="{}".format(key), simple_value=value)
self.summary_writers[category].add_summary(summary, step)
self.summary_writers[category].flush()
for key, value in values.items():
summary = tf.Summary()
summary.value.add(tag="{}".format(key), simple_value=value.mean)
self.summary_writers[category].add_summary(summary, step)
self.summary_writers[category].flush()
def _maybe_create_summary_writer(self, category: str) -> None:
if category not in self.summary_writers:

self.summary_writers[category].add_summary(text, step)
class StatsSummary(NamedTuple):
mean: float
std: float
num: int
class CSVWriter(StatsWriter):
def __init__(self, base_dir: str, required_fields: List[str] = None):
"""
A StatsWriter that writes to a Tensorboard summary.
:param base_dir: The directory within which to place the CSV file, which will be {base_dir}/{category}.csv.
:param required_fields: If provided, the CSV writer won't write until these fields have statistics to write for
them.
"""
# We need to keep track of the fields in the CSV, as all rows need the same fields.
self.csv_fields: Dict[str, List[str]] = {}
self.required_fields = required_fields if required_fields else []
self.base_dir: str = base_dir
def write_stats(
self, category: str, values: Dict[str, StatsSummary], step: int
) -> None:
if self._maybe_create_csv_file(category, list(values.keys())):
row = [str(step)]
# Only record the stats that showed up in the first valid row
for key in self.csv_fields[category]:
_val = values.get(key, None)
row.append(str(_val.mean) if _val else "None")
with open(self._get_filepath(category), "a") as file:
writer = csv.writer(file)
writer.writerow(row)
def _maybe_create_csv_file(self, category: str, keys: List[str]) -> bool:
"""
If no CSV file exists and the keys have the required values,
make the CSV file and write hte title row.
Returns True if there is now (or already is) a valid CSV file.
"""
if category not in self.csv_fields:
summary_dir = self.base_dir
os.makedirs(summary_dir, exist_ok=True)
# Only store if the row contains the required fields
if all(item in keys for item in self.required_fields):
self.csv_fields[category] = keys
with open(self._get_filepath(category), "w") as file:
title_row = ["Steps"]
title_row.extend(keys)
writer = csv.writer(file)
writer.writerow(title_row)
return True
return False
return True
def _get_filepath(self, category: str) -> str:
file_dir = os.path.join(self.base_dir, category + ".csv")
return file_dir
def write_text(self, category: str, text: str, step: int) -> None:
pass
class StatsReporter:

:param category: The category which to write out the stats.
:param step: Training step which to write these stats as.
"""
values: Dict[str, StatsSummary] = {}
stat_mean = float(np.mean(StatsReporter.stats_dict[self.category][key]))
for writer in StatsReporter.writers:
writer.write_stats(self.category, key, stat_mean, step)
stat_summary = self.get_stats_summaries(key)
values[key] = stat_summary
for writer in StatsReporter.writers:
writer.write_stats(self.category, values, step)
del StatsReporter.stats_dict[self.category]
def write_text(self, text: str, step: int) -> None:

59
ml-agents/mlagents/trainers/tests/test_stats.py


import os
import pytest
import tempfile
import csv
from mlagents.trainers.stats import StatsReporter, TensorboardWriter
from mlagents.trainers.stats import (
StatsReporter,
TensorboardWriter,
CSVWriter,
StatsSummary,
)
def test_stat_reporter_add_summary_write():

# Test write_stats
step = 10
statsreporter1.write_stats(step)
mock_writer1.write_stats.assert_called_once_with("category1", "key1", 4.5, step)
mock_writer2.write_stats.assert_called_once_with("category1", "key1", 4.5, step)
mock_writer1.write_stats.assert_called_once_with(
"category1", {"key1": statssummary1}, step
)
mock_writer2.write_stats.assert_called_once_with(
"category1", {"key1": statssummary1}, step
)
def test_stat_reporter_text():

category = "category1"
with tempfile.TemporaryDirectory(prefix="unittest-") as base_dir:
tb_writer = TensorboardWriter(base_dir)
tb_writer.write_stats("category1", "key1", 1.0, 10)
statssummary1 = StatsSummary(mean=1.0, std=1.0, num=1)
tb_writer.write_stats("category1", {"key1": statssummary1}, 10)
# Test that the filewriter has been created and the directory has been created.
filewriter_dir = "{basedir}/{category}".format(

mock_summary.return_value, 10
)
mock_filewriter.return_value.flush.assert_called_once()
def test_csv_writer():
# Test write_stats
category = "category1"
with tempfile.TemporaryDirectory(prefix="unittest-") as base_dir:
csv_writer = CSVWriter(base_dir, required_fields=["key1", "key2"])
statssummary1 = StatsSummary(mean=1.0, std=1.0, num=1)
csv_writer.write_stats("category1", {"key1": statssummary1}, 10)
# Test that the filewriter has been created and the directory has been created.
filewriter_dir = "{basedir}/{category}.csv".format(
basedir=base_dir, category=category
)
# The required keys weren't in the stats
assert not os.path.exists(filewriter_dir)
csv_writer.write_stats(
"category1", {"key1": statssummary1, "key2": statssummary1}, 10
)
csv_writer.write_stats(
"category1", {"key1": statssummary1, "key2": statssummary1}, 20
)
# The required keys were in the stats
assert os.path.exists(filewriter_dir)
with open(filewriter_dir) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
line_count = 0
for row in csv_reader:
if line_count == 0:
assert "key1" in row
assert "key2" in row
assert "Steps" in row
line_count += 1
else:
assert len(row) == 3
line_count += 1
assert line_count == 3

3
ml-agents/mlagents/trainers/tests/test_trainer_util.py


import mlagents.trainers.trainer_util as trainer_util
from mlagents.trainers.trainer_util import load_config, _load_config
from mlagents.trainers.trainer_metrics import TrainerMetrics
from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.exception import TrainerConfigError
from mlagents.trainers.brain import BrainParameters

run_id,
multi_gpu,
):
self.trainer_metrics = TrainerMetrics("", "")
assert brain == brain_params_mock
assert trainer_parameters == expected_config
assert reward_buff_cap == expected_reward_buff_cap

run_id,
multi_gpu,
):
self.trainer_metrics = TrainerMetrics("", "")
assert brain == brain_params_mock
assert trainer_parameters == expected_config
assert reward_buff_cap == expected_reward_buff_cap

11
ml-agents/mlagents/trainers/trainer.py


from mlagents_envs.exception import UnityException
from mlagents_envs.timers import set_gauge
from mlagents.trainers.trainer_metrics import TrainerMetrics
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.stats import StatsReporter
from mlagents.trainers.trajectory import Trajectory

self.stats_reporter = StatsReporter(self.summary_path)
self.cumulative_returns_since_policy_update: List[float] = []
self.is_training = training
self.trainer_metrics = TrainerMetrics(
path=self.summary_path + ".csv", brain_name=self.brain_name
)
self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap)
self.policy: TFPolicy = None # type: ignore # this will always get set
self.step: int = 0

Exports the model
"""
self.policy.export_model()
def write_training_metrics(self) -> None:
"""
Write training metrics to a CSV file
:return:
"""
self.trainer_metrics.write_training_metrics()
def write_summary(self, global_step: int, delta_train_start: float) -> None:
"""

19
ml-agents/mlagents/trainers/trainer_controller.py


)
from mlagents.trainers.sampler_class import SamplerManager
from mlagents_envs.timers import hierarchical_timer, get_timer_tree, timed
from mlagents.trainers.trainer import Trainer, TrainerMetrics
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.agent_processor import AgentProcessor

self.run_id = run_id
self.save_freq = save_freq
self.train_model = train
self.trainer_metrics: Dict[str, TrainerMetrics] = {}
self.meta_curriculum = meta_curriculum
self.training_start_time = time()
self.sampler_manager = sampler_manager

)
self._save_model()
def _write_training_metrics(self):
"""
Write all CSV metrics
:return:
"""
for brain_name in self.trainers.keys():
if brain_name in self.trainer_metrics:
self.trainers[brain_name].write_training_metrics()
def _write_timing_tree(self) -> None:
timing_path = f"{self.summaries_dir}/{self.run_id}_timers.json"
try:

self._save_model_when_interrupted()
pass
if self.train_model:
self._write_training_metrics()
self._export_graph()
self._write_timing_tree()

@timed
def advance(self, env: EnvManager) -> int:
with hierarchical_timer("env_step"):
time_start_step = time()
delta_time_step = time() - time_start_step
if brain_name in self.trainer_metrics:
self.trainer_metrics[brain_name].add_delta_step(delta_time_step)
if step_info.has_actions_for_brain(brain_name):
_processor = self.managers[brain_name].processor
_processor.add_experiences(

)
for brain_name, trainer in self.trainers.items():
if brain_name in self.trainer_metrics:
self.trainer_metrics[brain_name].add_delta_step(delta_time_step)
if self.train_model and trainer.get_step <= trainer.get_max_steps:
trainer.increment_step(len(new_step_infos))
if trainer.is_ready_update():

46
ml-agents/mlagents/trainers/tests/test_trainer_metrics.py


import unittest.mock as mock
from mlagents.trainers.trainer_metrics import TrainerMetrics
class TestTrainerMetrics:
def test_field_names(self):
field_names = [
"Brain name",
"Time to update policy",
"Time since start of training",
"Time for last experience collection",
"Number of experiences used for training",
"Mean return",
]
from mlagents.trainers.trainer_metrics import FIELD_NAMES
assert FIELD_NAMES == field_names
@mock.patch(
"mlagents.trainers.trainer_metrics.time", mock.MagicMock(return_value=42)
)
def test_experience_collection_timer(self):
mock_path = "fake"
mock_brain_name = "fake"
trainer_metrics = TrainerMetrics(path=mock_path, brain_name=mock_brain_name)
trainer_metrics.start_experience_collection_timer()
trainer_metrics.end_experience_collection_timer()
assert trainer_metrics.delta_last_experience_collection == 0
@mock.patch(
"mlagents.trainers.trainer_metrics.time", mock.MagicMock(return_value=42)
)
def test_policy_update_timer(self):
mock_path = "fake"
mock_brain_name = "fake"
fake_buffer_length = 350
fake_mean_return = 0.3
trainer_metrics = TrainerMetrics(path=mock_path, brain_name=mock_brain_name)
trainer_metrics.start_experience_collection_timer()
trainer_metrics.end_experience_collection_timer()
trainer_metrics.start_policy_update_timer(
number_experiences=fake_buffer_length, mean_return=fake_mean_return
)
trainer_metrics.end_policy_update()
fake_row = [mock_brain_name, 0, 0, 0, 350, "0.300"]
assert trainer_metrics.rows[0] == fake_row

122
ml-agents/mlagents/trainers/trainer_metrics.py


# # Unity ML-Agents Toolkit
import logging
import csv
from time import time
from typing import List, Optional
LOGGER = logging.getLogger("mlagents.trainers")
FIELD_NAMES = [
"Brain name",
"Time to update policy",
"Time since start of training",
"Time for last experience collection",
"Number of experiences used for training",
"Mean return",
]
class TrainerMetrics:
"""
Helper class to track, write training metrics. Tracks time since object
of this class is initialized.
"""
def __init__(self, path: str, brain_name: str):
"""
:str path: Fully qualified path where CSV is stored.
:str brain_name: Identifier for the Brain which we are training
"""
self.path = path
self.brain_name = brain_name
self.rows: List[List[Optional[str]]] = []
self.time_start_experience_collection: Optional[float] = None
self.time_training_start = time()
self.last_buffer_length: Optional[int] = None
self.last_mean_return: Optional[float] = None
self.time_policy_update_start: Optional[float] = None
self.delta_last_experience_collection: Optional[float] = None
self.delta_policy_update: Optional[float] = None
def start_experience_collection_timer(self) -> None:
"""
Inform Metrics class that experience collection is starting. Intended to be idempotent
"""
if self.time_start_experience_collection is None:
self.time_start_experience_collection = time()
def end_experience_collection_timer(self) -> None:
"""
Inform Metrics class that experience collection is done.
"""
if self.time_start_experience_collection:
curr_delta = time() - self.time_start_experience_collection
if self.delta_last_experience_collection is None:
self.delta_last_experience_collection = curr_delta
else:
self.delta_last_experience_collection += curr_delta
self.time_start_experience_collection = None
def add_delta_step(self, delta: float) -> None:
"""
Inform Metrics class about time to step in environment.
"""
if self.delta_last_experience_collection:
self.delta_last_experience_collection += delta
else:
self.delta_last_experience_collection = delta
def start_policy_update_timer(
self, number_experiences: int, mean_return: float
) -> None:
"""
Inform Metrics class that policy update has started.
:int number_experiences: Number of experiences in Buffer at this point.
:float mean_return: Return averaged across all cumulative returns since last policy update
"""
self.last_buffer_length = number_experiences
self.last_mean_return = mean_return
self.time_policy_update_start = time()
def _add_row(self, delta_train_start: float) -> None:
row: List[Optional[str]] = [self.brain_name]
row.extend(
format(c, ".3f") if isinstance(c, float) else c
for c in [
self.delta_policy_update,
delta_train_start,
self.delta_last_experience_collection,
self.last_buffer_length,
self.last_mean_return,
]
)
self.delta_last_experience_collection = None
self.rows.append(row)
def end_policy_update(self) -> None:
"""
Inform Metrics class that policy update has started.
"""
if self.time_policy_update_start:
self.delta_policy_update = time() - self.time_policy_update_start
else:
self.delta_policy_update = 0
delta_train_start = time() - self.time_training_start
LOGGER.debug(
f" Policy Update Training Metrics for {self.brain_name}: "
f"\n\t\tTime to update Policy: {self.delta_policy_update:0.3f} s \n"
f"\t\tTime elapsed since training: {delta_train_start:0.3f} s \n"
f"\t\tTime for experience collection: {(self.delta_last_experience_collection or 0):0.3f} s \n"
f"\t\tBuffer Length: {self.last_buffer_length or 0} \n"
f"\t\tReturns : {(self.last_mean_return or 0):0.3f}\n"
)
self._add_row(delta_train_start)
def write_training_metrics(self) -> None:
"""
Write Training Metrics to CSV
"""
with open(self.path, "w") as file:
writer = csv.writer(file)
writer.writerow(FIELD_NAMES)
for row in self.rows:
writer.writerow(row)
正在加载...
取消
保存