浏览代码

Merge pull request #4707 from Unity-Technologies/develop-rm-tf

Removing TensorFlow Trainers
/MLA-1734-demo-provider
GitHub 4 年前
当前提交
903d3afe
共有 42 个文件被更改,包括 113 次插入2783 次删除
  1. 13
      .github/workflows/pytest.yml
  2. 1
      com.unity.ml-agents/CHANGELOG.md
  3. 4
      docs/ML-Agents-Overview.md
  4. 2
      docs/Training-Configuration-File.md
  5. 3
      docs/Training-ML-Agents.md
  6. 17
      docs/Unity-Inference-Engine.md
  7. 2
      ml-agents-envs/setup.py
  8. 25
      ml-agents/mlagents/trainers/cli_utils.py
  9. 7
      ml-agents/mlagents/trainers/learn.py
  10. 75
      ml-agents/mlagents/trainers/ppo/trainer.py
  11. 106
      ml-agents/mlagents/trainers/sac/trainer.py
  12. 10
      ml-agents/mlagents/trainers/settings.py
  13. 2
      ml-agents/mlagents/trainers/stats.py
  14. 2
      ml-agents/mlagents/trainers/tests/__init__.py
  15. 17
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  16. 19
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  17. 4
      ml-agents/mlagents/trainers/tests/test_training_status.py
  18. 6
      ml-agents/mlagents/trainers/tests/torch/test_ghost.py
  19. 6
      ml-agents/mlagents/trainers/tests/torch/test_hybrid.py
  20. 9
      ml-agents/mlagents/trainers/tests/torch/test_ppo.py
  21. 5
      ml-agents/mlagents/trainers/tests/torch/test_sac.py
  22. 5
      ml-agents/mlagents/trainers/tests/torch/test_simple_rl.py
  23. 56
      ml-agents/mlagents/trainers/trainer/rl_trainer.py
  24. 24
      ml-agents/mlagents/trainers/trainer/trainer_factory.py
  25. 14
      ml-agents/mlagents/trainers/trainer_controller.py
  26. 6
      ml-agents/mlagents/trainers/training_status.py
  27. 1
      ml-agents/setup.py
  28. 11
      ml-agents/tests/yamato/yamato_utils.py
  29. 4
      test_requirements.txt
  30. 175
      ml-agents/mlagents/trainers/model_saver/tf_model_saver.py
  31. 168
      ml-agents/mlagents/trainers/optimizer/tf_optimizer.py
  32. 630
      ml-agents/mlagents/trainers/policy/tf_policy.py
  33. 361
      ml-agents/mlagents/trainers/ppo/optimizer_tf.py
  34. 444
      ml-agents/mlagents/trainers/sac/network.py
  35. 641
      ml-agents/mlagents/trainers/sac/optimizer_tf.py
  36. 8
      test_constraints_min_version.txt
  37. 6
      test_constraints_max_tf2_version.txt
  38. 7
      test_constraints_max_tf1_version.txt
  39. 0
      /ml-agents/mlagents/torch_utils/globals.py

13
.github/workflows/pytest.yml


python-version: [3.6.x, 3.7.x, 3.8.x]
include:
- python-version: 3.6.x
pip_constraints: test_constraints_min_version.txt
pip_constraints: test_constraints_max_tf1_version.txt
pip_constraints: test_constraints_max_tf2_version.txt
steps:
- uses: actions/checkout@v2
- name: Set up Python

# This path is specific to Ubuntu
path: ~/.cache/pip
# Look to see if there is a cache hit for the corresponding requirements file
key: ${{ runner.os }}-pip-${{ hashFiles('ml-agents/setup.py', 'ml-agents-envs/setup.py', 'gym-unity/setup.py', 'test_requirements.txt', matrix.pip_constraints) }}
key: ${{ runner.os }}-pip-${{ hashFiles('ml-agents/setup.py', 'ml-agents-envs/setup.py', 'gym-unity/setup.py', 'test_requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
${{ runner.os }}-

# pin pip to workaround https://github.com/pypa/pip/issues/9180
python -m pip install pip==20.2
python -m pip install --upgrade setuptools
python -m pip install --progress-bar=off -e ./ml-agents-envs -c ${{ matrix.pip_constraints }}
python -m pip install --progress-bar=off -e ./ml-agents -c ${{ matrix.pip_constraints }}
python -m pip install --progress-bar=off -r test_requirements.txt -c ${{ matrix.pip_constraints }}
python -m pip install --progress-bar=off -e ./gym-unity -c ${{ matrix.pip_constraints }}
python -m pip install --progress-bar=off -e ./ml-agents-envs
python -m pip install --progress-bar=off -e ./ml-agents
python -m pip install --progress-bar=off -r test_requirements.txt
python -m pip install --progress-bar=off -e ./gym-unity
- name: Save python dependencies
run: |
pip freeze > pip_versions-${{ matrix.python-version }}.txt

1
com.unity.ml-agents/CHANGELOG.md


### Major Changes
#### com.unity.ml-agents (C#)
#### ml-agents / ml-agents-envs / gym-unity (Python)
- TensorFlow trainers have been removed, please use the Torch trainers instead. (#4707)
- PyTorch trainers now support training agents with both continuous and discrete action spaces. (#4702)
### Minor Changes
#### com.unity.ml-agents / com.unity.ml-agents.extensions (C#)

4
docs/ML-Agents-Overview.md


below).
- `rnd`: represents an intrinsic reward signal that encourages exploration
in sparse-reward environments that is defined by the Curiosity module (see
below). (Not available for TensorFlow trainers)
below).
### Deep Reinforcement Learning

of the trained model is used as intrinsic reward. The more an Agent visits a state, the
more accurate the predictions and the lower the rewards which encourages the Agent to
explore new states with higher prediction errors.
__Note:__ RND is not available for TensorFlow trainers (only PyTorch trainers)
### Imitation Learning

2
docs/Training-Configuration-File.md


| `time_horizon` | (default = `64`) How many steps of experience to collect per-agent before adding it to the experience buffer. When this limit is reached before the end of an episode, a value estimate is used to predict the overall expected reward from the agent's current state. As such, this parameter trades off between a less biased, but higher variance estimate (long time horizon) and more biased, but less varied estimate (short time horizon). In cases where there are frequent rewards within an episode, or episodes are prohibitively large, a smaller number can be more ideal. This number should be large enough to capture all the important behavior within a sequence of an agent's actions. <br><br> Typical range: `32` - `2048` |
| `max_steps` | (default = `500000`) Total number of steps (i.e., observation collected and action taken) that must be taken in the environment (or across all environments if using multiple in parallel) before ending the training process. If you have multiple agents with the same behavior name within your environment, all steps taken by those agents will contribute to the same `max_steps` count. <br><br>Typical range: `5e5` - `1e7` |
| `keep_checkpoints` | (default = `5`) The maximum number of model checkpoints to keep. Checkpoints are saved after the number of steps specified by the checkpoint_interval option. Once the maximum number of checkpoints has been reached, the oldest checkpoint is deleted when saving a new checkpoint. |
| `checkpoint_interval` | (default = `500000`) The number of experiences collected between each checkpoint by the trainer. A maximum of `keep_checkpoints` checkpoints are saved before old ones are deleted. Each checkpoint saves the `.onnx` (and `.nn` if using TensorFlow) files in `results/` folder.|
| `checkpoint_interval` | (default = `500000`) The number of experiences collected between each checkpoint by the trainer. A maximum of `keep_checkpoints` checkpoints are saved before old ones are deleted. Each checkpoint saves the `.onnx` files in `results/` folder.|
| `init_path` | (default = None) Initialize trainer from a previously saved model. Note that the prior run should have used the same trainer configurations as the current run, and have been saved with the same version of ML-Agents. <br><br>You should provide the full path to the folder where the checkpoints were saved, e.g. `./models/{run-id}/{behavior_name}`. This option is provided in case you want to initialize different behaviors from different runs; in most cases, it is sufficient to use the `--initialize-from` CLI parameter to initialize all models from the same run. |
| `threaded` | (default = `true`) By default, model updates can happen while the environment is being stepped. This violates the [on-policy](https://spinningup.openai.com/en/latest/user/algorithms.html#the-on-policy-algorithms) assumption of PPO slightly in exchange for a training speedup. To maintain the strict on-policyness of PPO, you can disable parallel updates by setting `threaded` to `false`. There is usually no reason to turn `threaded` off for SAC. |
| `hyperparameters -> learning_rate` | (default = `3e-4`) Initial learning rate for gradient descent. Corresponds to the strength of each gradient descent update step. This should typically be decreased if training is unstable, and the reward does not consistently increase. <br><br>Typical range: `1e-5` - `1e-3` |

3
docs/Training-ML-Agents.md


save_steps: 50000
swap_steps: 2000
team_change: 100000
# use TensorFlow backend
framework: tensorflow
```
Here is an equivalent file if we use an SAC trainer instead. Notice that the

17
docs/Unity-Inference-Engine.md


might be non-fatal build time errors when target platform includes Graphics API
that does not support **Unity Compute Shaders**.
## Supported formats
There are currently two supported model formats:
- Barracuda (`.nn`) files use a proprietary format produced by the
[`tensorflow_to_barracuda.py`]() script.
- ONNX (`.onnx`) files use an
[industry-standard open format](https://onnx.ai/about.html) produced by the
[tf2onnx package](https://github.com/onnx/tensorflow-onnx).
Export to ONNX is used if using PyTorch (the default). To enable it
while using TensorFlow, make sure `tf2onnx>=1.6.1` is installed in pip.
## Using the Unity Inference Engine
When using a model, drag the model file into the **Model** field in the

Barracuda directly, instead of trying to run it through ML-Agents.
## Model inference outside of Unity
We do not provide support for inference anywhere outside of Unity. The
`frozen_graph_def.pb` and `.onnx` files produced by training are open formats
for TensorFlow and ONNX respectively; if you wish to convert these to another
We do not provide support for inference anywhere outside of Unity. The `.onnx` files produced by training use the open format ONNX; if you wish to convert a `.onnx` file to another
format or run inference with them, refer to their documentation.

2
ml-agents-envs/setup.py


install_requires=[
"cloudpickle",
"grpcio>=1.11.0",
"numpy>=1.14.1,<1.19.0",
"numpy>=1.14.1",
"Pillow>=4.2.1",
"protobuf>=3.6",
"pyyaml>=3.1.0",

25
ml-agents/mlagents/trainers/cli_utils.py


from mlagents.trainers.exception import TrainerConfigError
from mlagents_envs.environment import UnityEnvironment
import argparse
from mlagents_envs import logging_util
logger = logging_util.get_logger(__name__)
class RaiseRemovedWarning(argparse.Action):
"""
Internal custom Action to raise warning when argument is called.
"""
def __init__(self, nargs=0, **kwargs):
super().__init__(nargs=nargs, **kwargs)
def __call__(self, arg_parser, namespace, values, option_string=None):
logger.warning(f"The command line argument {option_string} was removed.")
class DetectDefault(argparse.Action):

argparser.add_argument(
"--torch",
default=False,
action=DetectDefaultStoreTrue,
help="Use the PyTorch framework. Note that this option is not required anymore as PyTorch is the"
"default framework, and will be removed in the next release.",
action=RaiseRemovedWarning,
help="(Removed) Use the PyTorch framework.",
action=DetectDefaultStoreTrue,
help="(Deprecated) Use the TensorFlow framework instead of PyTorch. Install TensorFlow "
"before using this option.",
action=RaiseRemovedWarning,
help="(Removed) Use the TensorFlow framework.",
)
eng_conf = argparser.add_argument_group(title="Engine Configuration")

7
ml-agents/mlagents/trainers/learn.py


import mlagents.trainers
import mlagents_envs
from mlagents import tf_utils
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents.trainers.trainer import TrainerFactory

GaugeWriter,
ConsoleWriter,
)
from mlagents.trainers.cli_utils import parser, DetectDefault
from mlagents.trainers.cli_utils import parser
from mlagents_envs.environment import UnityEnvironment
from mlagents.trainers.settings import RunOptions

param_manager=env_parameter_manager,
init_path=maybe_init_path,
multi_gpu=False,
force_torch="torch" in DetectDefault.non_default_args,
force_tensorflow="tensorflow" in DetectDefault.non_default_args,
)
# Create controller and begin training.
tc = TrainerController(

log_level = logging_util.DEBUG
else:
log_level = logging_util.INFO
# disable noisy warnings from tensorflow
tf_utils.set_warnings_enabled(False)
logging_util.set_log_level(log_level)

75
ml-agents/mlagents/trainers/ppo/trainer.py


from mlagents.trainers.ppo.optimizer_torch import TorchPPOOptimizer
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.settings import TrainerSettings, PPOSettings, FrameworkType
from mlagents.trainers.torch.components.reward_providers.base_reward_provider import (
BaseRewardProvider,
)
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.ppo.optimizer_tf import PPOOptimizer
else:
TFPolicy = None # type: ignore
PPOOptimizer = None # type: ignore
from mlagents.trainers.settings import TrainerSettings, PPOSettings
logger = get_logger(__name__)

for name, v in value_estimates.items():
agent_buffer_trajectory[f"{name}_value_estimates"].extend(v)
if isinstance(self.optimizer.reward_signals[name], BaseRewardProvider):
self._stats_reporter.add_stat(
f"Policy/{self.optimizer.reward_signals[name].name.capitalize()} Value Estimate",
np.mean(v),
)
else:
self._stats_reporter.add_stat(
self.optimizer.reward_signals[name].value_name, np.mean(v)
)
self._stats_reporter.add_stat(
f"Policy/{self.optimizer.reward_signals[name].name.capitalize()} Value Estimate",
np.mean(v),
)
# Evaluate all reward functions
self.collected_rewards["environment"][agent_id] += np.sum(

# BaseRewardProvider is a PyTorch-based reward signal
if isinstance(reward_signal, BaseRewardProvider):
evaluate_result = (
reward_signal.evaluate(agent_buffer_trajectory)
* reward_signal.strength
)
else: # reward_signal is a TensorFlow-based RewardSignal class
evaluate_result = reward_signal.evaluate_batch(
agent_buffer_trajectory
).scaled_reward
evaluate_result = (
reward_signal.evaluate(agent_buffer_trajectory) * reward_signal.strength
)
agent_buffer_trajectory[f"{name}_rewards"].extend(evaluate_result)
# Report the reward signals
self.collected_rewards[name][agent_id] += np.sum(evaluate_result)

self._clear_update_buffer()
return True
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Creates a policy with a Tensorflow backend and PPO hyperparameters
:param parsed_behavior_id:
:param behavior_spec: specifications for policy construction
:param create_graph: whether to create the Tensorflow graph on construction
:return policy
"""
policy = TFPolicy(
self.seed,
behavior_spec,
self.trainer_settings,
condition_sigma_on_obs=False, # Faster training for PPO
create_tf_graph=create_graph,
)
return policy
def create_torch_policy(
self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec
) -> TorchPolicy:

)
return policy
def create_ppo_optimizer(self) -> PPOOptimizer:
if self.framework == FrameworkType.PYTORCH:
return TorchPPOOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
else:
return PPOOptimizer( # type: ignore
cast(TFPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def create_ppo_optimizer(self) -> TorchPPOOptimizer:
return TorchPPOOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def add_policy(
self, parsed_behavior_id: BehaviorIdentifiers, policy: Policy

106
ml-agents/mlagents/trainers/sac/trainer.py


from mlagents.trainers.sac.optimizer_torch import TorchSACOptimizer
from mlagents.trainers.trajectory import Trajectory, SplitObservations
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.settings import TrainerSettings, SACSettings, FrameworkType
from mlagents.trainers.torch.components.reward_providers import BaseRewardProvider
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.sac.optimizer_tf import SACOptimizer
else:
TFPolicy = None # type: ignore
SACOptimizer = None # type: ignore
from mlagents.trainers.settings import TrainerSettings, SACSettings
logger = get_logger(__name__)

agent_buffer_trajectory["environment_rewards"]
)
for name, reward_signal in self.optimizer.reward_signals.items():
# BaseRewardProvider is a PyTorch-based reward signal
if isinstance(reward_signal, BaseRewardProvider):
evaluate_result = (
reward_signal.evaluate(agent_buffer_trajectory)
* reward_signal.strength
)
else: # reward_signal uses TensorFlow
evaluate_result = reward_signal.evaluate_batch(
agent_buffer_trajectory
).scaled_reward
evaluate_result = (
reward_signal.evaluate(agent_buffer_trajectory) * reward_signal.strength
)
# Report the reward signals
self.collected_rewards[name][agent_id] += np.sum(evaluate_result)

agent_buffer_trajectory, trajectory.next_obs, trajectory.done_reached
)
for name, v in value_estimates.items():
# BaseRewardProvider is a PyTorch-based reward signal
if isinstance(self.optimizer.reward_signals[name], BaseRewardProvider):
self._stats_reporter.add_stat(
f"Policy/{self.optimizer.reward_signals[name].name.capitalize()} Value",
np.mean(v),
)
else: # TensorFlow reward signal
self._stats_reporter.add_stat(
self.optimizer.reward_signals[name].value_name, np.mean(v)
)
self._stats_reporter.add_stat(
f"Policy/{self.optimizer.reward_signals[name].name.capitalize()} Value",
np.mean(v),
)
# Bootstrap using the last step rather than the bootstrap step if max step is reached.
# Set last element to duplicate obs and remove dones.

)
)
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Creates a policy with a Tensorflow backend and SAC hyperparameters
:param parsed_behavior_id:
:param behavior_spec: specifications for policy construction
:param create_graph: whether to create the Tensorflow graph on construction
:return policy
"""
policy = TFPolicy(
self.seed,
behavior_spec,
self.trainer_settings,
tanh_squash=True,
reparameterize=True,
create_tf_graph=create_graph,
)
self.maybe_load_replay_buffer()
return policy
def create_torch_policy(
self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec
) -> TorchPolicy:

)
# Get rewards for each reward
for name, signal in self.optimizer.reward_signals.items():
# BaseRewardProvider is a PyTorch-based reward signal
if isinstance(signal, BaseRewardProvider):
sampled_minibatch[f"{name}_rewards"] = (
signal.evaluate(sampled_minibatch) * signal.strength
)
else: # reward_signal is a TensorFlow-based RewardSignal class
sampled_minibatch[f"{name}_rewards"] = signal.evaluate_batch(
sampled_minibatch
).scaled_reward
sampled_minibatch[f"{name}_rewards"] = (
signal.evaluate(sampled_minibatch) * signal.strength
)
update_stats = self.optimizer.update(sampled_minibatch, n_sequences)
for stat_name, value in update_stats.items():

) / self.reward_signal_update_steps > self.reward_signal_steps_per_update:
# Get minibatches for reward signal update if needed
reward_signal_minibatches = {}
for name, signal in self.optimizer.reward_signals.items():
for name in self.optimizer.reward_signals.keys():
# BaseRewardProvider is a PyTorch-based reward signal
if not isinstance(signal, BaseRewardProvider):
# Some signals don't need a minibatch to be sampled - so we don't!
if signal.update_dict:
reward_signal_minibatches[name] = buffer.sample_mini_batch(
self.hyperparameters.batch_size,
sequence_length=self.policy.sequence_length,
)
else: # TensorFlow reward signal
if name != "extrinsic":
reward_signal_minibatches[name] = buffer.sample_mini_batch(
self.hyperparameters.batch_size,
sequence_length=self.policy.sequence_length,
)
if name != "extrinsic":
reward_signal_minibatches[name] = buffer.sample_mini_batch(
self.hyperparameters.batch_size,
sequence_length=self.policy.sequence_length,
)
update_stats = self.optimizer.update_reward_signals(
reward_signal_minibatches, n_sequences
)

self._stats_reporter.add_stat(stat, np.mean(stat_list))
def create_sac_optimizer(self) -> TorchSACOptimizer:
if self.framework == FrameworkType.PYTORCH:
return TorchSACOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
else:
return SACOptimizer( # type: ignore
cast(TFPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
return TorchSACOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def add_policy(
self, parsed_behavior_id: BehaviorIdentifiers, policy: Policy

10
ml-agents/mlagents/trainers/settings.py


return _mapping[self]
class FrameworkType(Enum):
TENSORFLOW: str = "tensorflow"
PYTORCH: str = "pytorch"
@attr.s(auto_attribs=True)
class TrainerSettings(ExportableSettings):
default_override: ClassVar[Optional["TrainerSettings"]] = None

threaded: bool = True
self_play: Optional[SelfPlaySettings] = None
behavioral_cloning: Optional[BehavioralCloningSettings] = None
framework: FrameworkType = FrameworkType.PYTORCH
cattr.register_structure_hook(
Dict[RewardSignalType, RewardSignalSettings], RewardSignalSettings.structure

d_copy.update(cattr.unstructure(TrainerSettings.default_override))
deep_update_dict(d_copy, d)
if "framework" in d_copy:
logger.warning("Framework option was deprecated but was specified")
d_copy.pop("framework", None)
for key, val in d_copy.items():
if attr.has(type(val)):

2
ml-agents/mlagents/trainers/stats.py


from mlagents_envs.logging_util import get_logger
from mlagents_envs.timers import set_gauge
from torch.utils.tensorboard import SummaryWriter
from mlagents.tf_utils.globals import get_rank
from mlagents.torch_utils.globals import get_rank
logger = get_logger(__name__)

2
ml-agents/mlagents/trainers/tests/__init__.py


# tb[-2] is the wrapper function, e.g. np_array_no_float64
# we want the calling function, so use tb[-3]
filename = tb[-3].filename
# Only raise if this came from mlagents code, not tensorflow
# Only raise if this came from mlagents code
if (
"ml-agents/mlagents" in filename
or "ml-agents-envs/mlagents" in filename

17
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


from mlagents.trainers.trainer.rl_trainer import RLTrainer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.settings import TrainerSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings
from mlagents_envs.base_env import ActionSpec

super()._process_trajectory(trajectory)
def create_rl_trainer(framework=FrameworkType.TENSORFLOW):
def create_rl_trainer():
TrainerSettings(
max_steps=100, checkpoint_interval=10, summary_freq=20, framework=framework
),
TrainerSettings(max_steps=100, checkpoint_interval=10, summary_freq=20),
True,
False,
"mock_model_path",

assert mocked_save_model.call_count == 0
@pytest.mark.parametrize(
"framework", [FrameworkType.TENSORFLOW, FrameworkType.PYTORCH], ids=["tf", "torch"]
)
def test_summary_checkpoint(mock_add_checkpoint, mock_write_summary, framework):
trainer = create_rl_trainer(framework)
def test_summary_checkpoint(mock_add_checkpoint, mock_write_summary):
trainer = create_rl_trainer()
mock_policy = mock.Mock()
trainer.add_policy("TestBrain", mock_policy)
trajectory_queue = AgentManagerQueue("testbrain")

calls = [mock.call(trainer.brain_name, step) for step in checkpoint_range]
trainer.model_saver.save_checkpoint.assert_has_calls(calls, any_order=True)
export_ext = "nn" if trainer.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
add_checkpoint_calls = [
mock.call(

19
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


from unittest.mock import MagicMock, patch
import pytest
from mlagents.torch_utils import torch
from mlagents.tf_utils import tf
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents.trainers.ghost.controller import GhostController

@patch("numpy.random.seed")
@patch.object(tf, "set_random_seed")
def test_initialization_seed(numpy_random_seed, tensorflow_set_seed):
@patch.object(torch, "manual_seed")
def test_initialization_seed(numpy_random_seed, torch_set_seed):
seed = 27
trainer_factory_mock = MagicMock()
trainer_factory_mock.ghost_controller = GhostController()

training_seed=seed,
)
numpy_random_seed.assert_called_with(seed)
tensorflow_set_seed.assert_called_with(seed)
torch_set_seed.assert_called_with(seed)
@pytest.fixture

return tc, trainer_mock
@patch.object(tf, "reset_default_graph")
tf_reset_graph, trainer_controller_with_start_learning_mocks
trainer_controller_with_start_learning_mocks
tf_reset_graph.return_value = None
env_mock = MagicMock()
env_mock.close = MagicMock()

tc.start_learning(env_mock)
tf_reset_graph.assert_called_once()
@patch.object(tf, "reset_default_graph")
tf_reset_graph, trainer_controller_with_start_learning_mocks
trainer_controller_with_start_learning_mocks
tf_reset_graph.return_value = None
brain_info_mock = MagicMock()
env_mock = MagicMock()

tc.start_learning(env_mock)
tf_reset_graph.assert_called_once()
env_mock.reset.assert_called_once()
assert tc.advance.call_count == trainer_mock.get_max_steps + 1
tc._save_models.assert_called_once()

4
ml-agents/mlagents/trainers/tests/test_training_status.py


version_statsmetadata = StatusMetaData(mlagents_version="test")
default_metadata.check_compatibility(version_statsmetadata)
tf_version_statsmetadata = StatusMetaData(tensorflow_version="test")
default_metadata.check_compatibility(tf_version_statsmetadata)
torch_version_statsmetadata = StatusMetaData(torch_version="test")
default_metadata.check_compatibility(torch_version_statsmetadata)
# Assert that 2 warnings have been thrown
assert len(cm.output) == 2

6
ml-agents/mlagents/trainers/tests/torch/test_ghost.py


from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
from mlagents.trainers.settings import TrainerSettings, SelfPlaySettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings, SelfPlaySettings
return TrainerSettings(
self_play=SelfPlaySettings(), framework=FrameworkType.PYTORCH
)
return TrainerSettings(self_play=SelfPlaySettings())
VECTOR_ACTION_SPACE = 1

6
ml-agents/mlagents/trainers/tests/torch/test_hybrid.py


MemoryEnvironment,
)
from mlagents.trainers.settings import NetworkSettings, FrameworkType
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ppo_dummy_config, sac_dummy_config
from mlagents.trainers.tests.check_env_trains import check_environment_trains

PPO_TORCH_CONFIG = attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
SAC_TORCH_CONFIG = attr.evolve(sac_dummy_config(), framework=FrameworkType.PYTORCH)
PPO_TORCH_CONFIG = ppo_dummy_config()
SAC_TORCH_CONFIG = sac_dummy_config()
@pytest.mark.parametrize("action_size", [(1, 1), (2, 2), (1, 2), (2, 1)])

9
ml-agents/mlagents/trainers/tests/torch/test_ppo.py


import pytest
import numpy as np
from mlagents.tf_utils import tf
import attr
from mlagents.trainers.ppo.optimizer_torch import TorchPPOOptimizer

from mlagents.trainers.settings import NetworkSettings, FrameworkType
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ( # noqa: F401; pylint: disable=unused-variable
ppo_dummy_config,
curiosity_dummy_config,

@pytest.fixture
def dummy_config():
return attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
return ppo_dummy_config()
VECTOR_ACTION_SPACE = 2

@pytest.mark.parametrize("rnn", [True, False], ids=["rnn", "no_rnn"])
def test_ppo_optimizer_update(dummy_config, rnn, visual, discrete):
# Test evaluate
tf.reset_default_graph()
optimizer = create_test_ppo_optimizer(
dummy_config, use_rnn=rnn, use_discrete=discrete, use_visual=visual
)

dummy_config, curiosity_dummy_config, rnn, visual, discrete # noqa: F811
):
# Test evaluate
tf.reset_default_graph()
dummy_config.reward_signals = curiosity_dummy_config
optimizer = create_test_ppo_optimizer(
dummy_config, use_rnn=rnn, use_discrete=discrete, use_visual=visual

def test_ppo_optimizer_update_gail(gail_dummy_config, dummy_config): # noqa: F811
# Test evaluate
dummy_config.reward_signals = gail_dummy_config
config = attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
config = ppo_dummy_config()
optimizer = create_test_ppo_optimizer(
config, use_rnn=False, use_discrete=False, use_visual=False
)

5
ml-agents/mlagents/trainers/tests/torch/test_sac.py


import pytest
from mlagents.torch_utils import torch
import attr
from mlagents.trainers.settings import NetworkSettings, FrameworkType
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ( # noqa: F401; pylint: disable=unused-variable
sac_dummy_config,
curiosity_dummy_config,

@pytest.fixture
def dummy_config():
return attr.evolve(sac_dummy_config(), framework=FrameworkType.PYTORCH)
return sac_dummy_config()
VECTOR_ACTION_SPACE = 2

5
ml-agents/mlagents/trainers/tests/torch/test_simple_rl.py


GAILSettings,
RewardSignalType,
EncoderType,
FrameworkType,
)
from mlagents_envs.communicator_objects.demonstration_meta_pb2 import (

BRAIN_NAME = "1D"
PPO_TORCH_CONFIG = attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
SAC_TORCH_CONFIG = attr.evolve(sac_dummy_config(), framework=FrameworkType.PYTORCH)
PPO_TORCH_CONFIG = ppo_dummy_config()
SAC_TORCH_CONFIG = sac_dummy_config()
@pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)])

56
ml-agents/mlagents/trainers/trainer/rl_trainer.py


from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.settings import TrainerSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings
from mlagents.trainers.exception import UnityTrainerException
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.model_saver.tf_model_saver import TFModelSaver
else:
TFPolicy = None # type: ignore
TFModelSaver = None # type: ignore
logger = get_logger(__name__)

self._stats_reporter.add_property(
StatsPropertyType.HYPERPARAMETERS, self.trainer_settings.as_dict()
)
self.framework = self.trainer_settings.framework
if self.framework == FrameworkType.TENSORFLOW and not tf_utils.is_available():
raise UnityTrainerException(
"To use the TensorFlow backend, install the TensorFlow Python package first."
)
logger.debug(f"Using framework {self.framework.value}")
self.framework, self.trainer_settings, self.artifact_path, self.load
self.trainer_settings, self.artifact_path, self.load
)
def end_episode(self) -> None:

behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> Policy:
if self.framework == FrameworkType.PYTORCH:
return self.create_torch_policy(parsed_behavior_id, behavior_spec)
else:
return self.create_tf_policy(
parsed_behavior_id, behavior_spec, create_graph=create_graph
)
return self.create_torch_policy(parsed_behavior_id, behavior_spec)
@abc.abstractmethod
def create_torch_policy(

"""
pass
@abc.abstractmethod
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Create a Policy object that uses the TensorFlow backend.
"""
pass
framework: str, trainer_settings: TrainerSettings, model_path: str, load: bool
trainer_settings: TrainerSettings, model_path: str, load: bool
if framework == FrameworkType.PYTORCH:
model_saver = TorchModelSaver( # type: ignore
trainer_settings, model_path, load
)
else:
model_saver = TFModelSaver( # type: ignore
trainer_settings, model_path, load
)
model_saver = TorchModelSaver( # type: ignore
trainer_settings, model_path, load
)
return model_saver
def _policy_mean_reward(self) -> Optional[float]:

"Trainer has multiple policies, but default behavior only saves the first."
)
checkpoint_path = self.model_saver.save_checkpoint(self.brain_name, self.step)
export_ext = "nn" if self.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
new_checkpoint = ModelCheckpoint(
int(self.step),
f"{checkpoint_path}.{export_ext}",

model_checkpoint = self._checkpoint()
self.model_saver.copy_final_model(model_checkpoint.file_path)
export_ext = "nn" if self.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
final_checkpoint = attr.evolve(
model_checkpoint, file_path=f"{self.model_saver.model_path}.{export_ext}"
)

24
ml-agents/mlagents/trainers/trainer/trainer_factory.py


from mlagents.trainers.sac.trainer import SACTrainer
from mlagents.trainers.ghost.trainer import GhostTrainer
from mlagents.trainers.ghost.controller import GhostController
from mlagents.trainers.settings import TrainerSettings, TrainerType, FrameworkType
from mlagents.trainers.settings import TrainerSettings, TrainerType
logger = get_logger(__name__)

param_manager: EnvironmentParameterManager,
init_path: str = None,
multi_gpu: bool = False,
force_torch: bool = False,
force_tensorflow: bool = False,
):
"""
The TrainerFactory generates the Trainers based on the configuration passed as

the EnvironmentParameters must change.
:param init_path: Path from which to load model.
:param multi_gpu: If True, multi-gpu will be used. (currently not available)
:param force_torch: If True, the Trainers will all use the PyTorch framework
instead of what is specified in the config YAML.
:param force_tensorflow: If True, thee Trainers will all use the TensorFlow
framework.
"""
self.trainer_config = trainer_config
self.output_path = output_path

self.param_manager = param_manager
self.multi_gpu = multi_gpu
self.ghost_controller = GhostController()
self._force_torch = force_torch
self._force_tf = force_tensorflow
def generate(self, behavior_name: str) -> Trainer:
if behavior_name not in self.trainer_config.keys():

)
trainer_settings = self.trainer_config[behavior_name]
if self._force_torch:
trainer_settings.framework = FrameworkType.PYTORCH
logger.warning(
"Note that specifying --torch is not required anymore as PyTorch is the default framework."
)
if self._force_tf:
trainer_settings.framework = FrameworkType.TENSORFLOW
logger.warning(
"Setting the framework to TensorFlow. TensorFlow trainers will be deprecated in the future."
)
if self._force_torch:
logger.warning(
"Both --torch and --tensorflow CLI options were specified. Using TensorFlow."
)
return TrainerFactory._initialize_trainer(
trainer_settings,
behavior_name,

14
ml-agents/mlagents/trainers/trainer_controller.py


from collections import defaultdict
import numpy as np
from mlagents.tf_utils import tf
from mlagents import tf_utils
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.env_manager import EnvManager, EnvironmentStep

from mlagents.trainers.trainer import TrainerFactory
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.agent_processor import AgentManager
from mlagents.tf_utils.globals import get_rank
from mlagents.torch_utils.globals import get_rank
class TrainerController:

:param param_manager: EnvironmentParameterManager object which stores information about all
environment parameters.
:param train: Whether to train model, or only run inference.
:param training_seed: Seed to use for Numpy and Tensorflow random number generation.
:param training_seed: Seed to use for Numpy and Torch random number generation.
:param threaded: Whether or not to run trainers in a separate thread. Disable for testing/debugging.
"""
self.trainers: Dict[str, Trainer] = {}

self.trainer_threads: List[threading.Thread] = []
self.kill_trainers = False
np.random.seed(training_seed)
if tf_utils.is_available():
tf.set_random_seed(training_seed)
torch_utils.torch.manual_seed(training_seed)
self.rank = get_rank()

self.trainer_threads.append(trainerthread)
policy = trainer.create_policy(
parsed_behavior_id, env_manager.training_behaviors[name_behavior_id]
parsed_behavior_id,
env_manager.training_behaviors[name_behavior_id],
create_graph=True,
)
trainer.add_policy(parsed_behavior_id, policy)

@timed
def start_learning(self, env_manager: EnvManager) -> None:
self._create_output_path(self.output_path)
if tf_utils.is_available():
tf.reset_default_graph()
try:
# Initial reset
self._reset_env(env_manager)

6
ml-agents/mlagents/trainers/training_status.py


import cattr
from mlagents.torch_utils import torch
from mlagents.tf_utils import tf, is_available as tf_is_available
from mlagents_envs.logging_util import get_logger
from mlagents.trainers import __version__
from mlagents.trainers.exception import TrainerError

stats_format_version: str = STATUS_FORMAT_VERSION
mlagents_version: str = __version__
torch_version: str = torch.__version__
tensorflow_version: str = tf.__version__ if tf_is_available() else -1
def to_dict(self) -> Dict[str, str]:
return cattr.unstructure(self)

if self.mlagents_version != other.mlagents_version:
logger.warning(
"Checkpoint was loaded from a different version of ML-Agents. Some things may not resume properly."
)
if self.tensorflow_version != other.tensorflow_version:
logger.warning(
"Tensorflow checkpoint was saved with a different version of Tensorflow. Model may not resume properly."
)
if self.torch_version != other.torch_version:
logger.warning(

1
ml-agents/setup.py


]
},
cmdclass={"verify": VerifyVersionCommand},
extras_require={"tensorflow": ["tensorflow>=1.14,<3.0", "six>=1.12.0"]},
)

11
ml-agents/tests/yamato/yamato_utils.py


# Set up the venv and install mlagents
subprocess.check_call(f"python -m venv {venv_path}", shell=True)
pip_commands = [
"--upgrade pip",
"--upgrade setuptools",
# TODO build these and publish to internal pypi
"~/tensorflow_pkg/tensorflow-2.0.0-cp37-cp37m-macosx_10_14_x86_64.whl",
"tf2onnx==1.6.1",
]
pip_commands = ["--upgrade pip", "--upgrade setuptools"]
# TODO build these and publish to internal pypi
"~/tensorflow_pkg/tensorflow-2.0.0-cp37-cp37m-macosx_10_14_x86_64.whl",
"tf2onnx==1.6.1",
]
else:
# Local install

4
test_requirements.txt


pytest-cov==2.6.1
pytest-xdist==1.34.0
# Tensorflow tests are here for the time being, before they are used in the codebase.
tensorflow>=1.14,<3.0
tf2onnx>=1.5.5

175
ml-agents/mlagents/trainers/model_saver/tf_model_saver.py


import os
import shutil
from typing import Optional, Union, cast
from mlagents_envs.exception import UnityPolicyException
from mlagents_envs.logging_util import get_logger
from mlagents.tf_utils import tf
from mlagents.trainers.model_saver.model_saver import BaseModelSaver
from mlagents.trainers.tf.model_serialization import export_policy_model
from mlagents.trainers.settings import TrainerSettings, SerializationSettings
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers import __version__
logger = get_logger(__name__)
class TFModelSaver(BaseModelSaver):
"""
ModelSaver class for TensorFlow
"""
def __init__(
self, trainer_settings: TrainerSettings, model_path: str, load: bool = False
):
super().__init__()
self.model_path = model_path
self.initialize_path = trainer_settings.init_path
self._keep_checkpoints = trainer_settings.keep_checkpoints
self.load = load
# Currently only support saving one policy. This is the one to be saved.
self.policy: Optional[TFPolicy] = None
self.graph = None
self.sess = None
self.tf_saver = None
def register(self, module: Union[TFPolicy, TFOptimizer]) -> None:
if isinstance(module, TFPolicy):
self._register_policy(module)
elif isinstance(module, TFOptimizer):
self._register_optimizer(module)
else:
raise UnityPolicyException(
"Registering Object of unsupported type {} to Saver ".format(
type(module)
)
)
def _register_policy(self, policy: TFPolicy) -> None:
if self.policy is None:
self.policy = policy
self.graph = self.policy.graph
self.sess = self.policy.sess
with self.policy.graph.as_default():
self.tf_saver = tf.train.Saver(max_to_keep=self._keep_checkpoints)
def save_checkpoint(self, behavior_name: str, step: int) -> str:
checkpoint_path = os.path.join(self.model_path, f"{behavior_name}-{step}")
# Save the TF checkpoint and graph definition
if self.graph:
with self.graph.as_default():
if self.tf_saver:
self.tf_saver.save(self.sess, f"{checkpoint_path}.ckpt")
tf.train.write_graph(
self.graph, self.model_path, "raw_graph_def.pb", as_text=False
)
# also save the policy so we have optimized model files for each checkpoint
self.export(checkpoint_path, behavior_name)
return checkpoint_path
def export(self, output_filepath: str, behavior_name: str) -> None:
# save model if there is only one worker or
# only on worker-0 if there are multiple workers
if self.policy and self.policy.rank is not None and self.policy.rank != 0:
return
if self.graph is None:
logger.info("No model to export")
return
export_policy_model(
self.model_path, output_filepath, behavior_name, self.graph, self.sess
)
def initialize_or_load(self, policy: Optional[TFPolicy] = None) -> None:
# If there is an initialize path, load from that. Else, load from the set model path.
# If load is set to True, don't reset steps to 0. Else, do. This allows a user to,
# e.g., resume from an initialize path.
if policy is None:
policy = self.policy
policy = cast(TFPolicy, policy)
reset_steps = not self.load
if self.initialize_path is not None:
self._load_graph(
policy, self.initialize_path, reset_global_steps=reset_steps
)
elif self.load:
self._load_graph(policy, self.model_path, reset_global_steps=reset_steps)
else:
policy.initialize()
TFPolicy.broadcast_global_variables(0)
def _load_graph(
self, policy: TFPolicy, model_path: str, reset_global_steps: bool = False
) -> None:
# This prevents normalizer init up from executing on load
policy.first_normalization_update = False
with policy.graph.as_default():
logger.info(f"Loading model from {model_path}.")
ckpt = tf.train.get_checkpoint_state(model_path)
if ckpt is None:
raise UnityPolicyException(
"The model {} could not be loaded. Make "
"sure you specified the right "
"--run-id and that the previous run you are loading from had the same "
"behavior names.".format(model_path)
)
if self.tf_saver:
try:
self.tf_saver.restore(policy.sess, ckpt.model_checkpoint_path)
except tf.errors.NotFoundError:
raise UnityPolicyException(
"The model {} was found but could not be loaded. Make "
"sure the model is from the same version of ML-Agents, has the same behavior parameters, "
"and is using the same trainer configuration as the current run.".format(
model_path
)
)
self._check_model_version(__version__)
if reset_global_steps:
policy.set_step(0)
logger.info(
"Starting training from step 0 and saving to {}.".format(
self.model_path
)
)
else:
logger.info(f"Resuming training from step {policy.get_current_step()}.")
def _check_model_version(self, version: str) -> None:
"""
Checks whether the model being loaded was created with the same version of
ML-Agents, and throw a warning if not so.
"""
if self.policy is not None and self.policy.version_tensors is not None:
loaded_ver = tuple(
num.eval(session=self.sess) for num in self.policy.version_tensors
)
if loaded_ver != TFPolicy._convert_version_string(version):
logger.warning(
f"The model checkpoint you are loading from was saved with ML-Agents version "
f"{loaded_ver[0]}.{loaded_ver[1]}.{loaded_ver[2]} but your current ML-Agents"
f"version is {version}. Model may not behave properly."
)
def copy_final_model(self, source_nn_path: str) -> None:
"""
Copy the .nn file at the given source to the destination.
Also copies the corresponding .onnx file if it exists.
"""
final_model_name = os.path.splitext(source_nn_path)[0]
if SerializationSettings.convert_to_barracuda:
source_path = f"{final_model_name}.nn"
destination_path = f"{self.model_path}.nn"
shutil.copyfile(source_path, destination_path)
logger.info(f"Copied {source_path} to {destination_path}.")
if SerializationSettings.convert_to_onnx:
try:
source_path = f"{final_model_name}.onnx"
destination_path = f"{self.model_path}.onnx"
shutil.copyfile(source_path, destination_path)
logger.info(f"Copied {source_path} to {destination_path}.")
except OSError:
pass

168
ml-agents/mlagents/trainers/optimizer/tf_optimizer.py


from typing import Dict, Any, List, Tuple, Optional
import numpy as np
from mlagents.tf_utils.tf import tf
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer import Optimizer
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.tf.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)
from mlagents.trainers.settings import TrainerSettings, RewardSignalType
from mlagents.trainers.tf.components.bc.module import BCModule
class TFOptimizer(Optimizer): # pylint: disable=W0223
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
super().__init__()
self.sess = policy.sess
self.policy = policy
self.update_dict: Dict[str, tf.Tensor] = {}
self.value_heads: Dict[str, tf.Tensor] = {}
self.create_reward_signals(trainer_params.reward_signals)
self.memory_in: tf.Tensor = None
self.memory_out: tf.Tensor = None
self.m_size: int = 0
self.bc_module: Optional[BCModule] = None
# Create pretrainer if needed
if trainer_params.behavioral_cloning is not None:
self.bc_module = BCModule(
self.policy,
trainer_params.behavioral_cloning,
policy_learning_rate=trainer_params.hyperparameters.learning_rate,
default_batch_size=trainer_params.hyperparameters.batch_size,
default_num_epoch=3,
)
def get_trajectory_value_estimates(
self, batch: AgentBuffer, next_obs: List[np.ndarray], done: bool
) -> Tuple[Dict[str, np.ndarray], Dict[str, float]]:
feed_dict: Dict[tf.Tensor, Any] = {
self.policy.batch_size_ph: batch.num_experiences,
self.policy.sequence_length_ph: batch.num_experiences, # We want to feed data in batch-wise, not time-wise.
}
if self.policy.vec_obs_size > 0:
feed_dict[self.policy.vector_in] = batch["vector_obs"]
if self.policy.vis_obs_size > 0:
for i in range(len(self.policy.visual_in)):
_obs = batch["visual_obs%d" % i]
feed_dict[self.policy.visual_in[i]] = _obs
if self.policy.use_recurrent:
feed_dict[self.policy.memory_in] = [
np.zeros((self.policy.m_size), dtype=np.float32)
]
feed_dict[self.memory_in] = [np.zeros((self.m_size), dtype=np.float32)]
if self.policy.prev_action is not None:
feed_dict[self.policy.prev_action] = batch["prev_action"]
if self.policy.use_recurrent:
value_estimates, policy_mem, value_mem = self.sess.run(
[self.value_heads, self.policy.memory_out, self.memory_out], feed_dict
)
prev_action = (
batch["discrete_action"][-1]
if not self.policy.use_continuous_act
else None
)
else:
value_estimates = self.sess.run(self.value_heads, feed_dict)
prev_action = None
policy_mem = None
value_mem = None
value_estimates = {k: np.squeeze(v, axis=1) for k, v in value_estimates.items()}
# We do this in a separate step to feed the memory outs - a further optimization would
# be to append to the obs before running sess.run.
final_value_estimates = self._get_value_estimates(
next_obs, done, policy_mem, value_mem, prev_action
)
return value_estimates, final_value_estimates
def _get_value_estimates(
self,
next_obs: List[np.ndarray],
done: bool,
policy_memory: np.ndarray = None,
value_memory: np.ndarray = None,
prev_action: np.ndarray = None,
) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param experience: AgentExperience to be used for bootstrapping.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.policy.batch_size_ph: 1,
self.policy.sequence_length_ph: 1,
}
vec_vis_obs = SplitObservations.from_observations(next_obs)
for i in range(len(vec_vis_obs.visual_observations)):
feed_dict[self.policy.visual_in[i]] = [vec_vis_obs.visual_observations[i]]
if self.policy.vec_obs_size > 0:
feed_dict[self.policy.vector_in] = [vec_vis_obs.vector_observations]
if policy_memory is not None:
feed_dict[self.policy.memory_in] = policy_memory
if value_memory is not None:
feed_dict[self.memory_in] = value_memory
if prev_action is not None:
feed_dict[self.policy.prev_action] = [prev_action]
value_estimates = self.sess.run(self.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates
def create_reward_signals(
self, reward_signal_configs: Dict[RewardSignalType, Any]
) -> None:
"""
Create reward signals
:param reward_signal_configs: Reward signal config.
"""
# Create reward signals
for reward_signal, settings in reward_signal_configs.items():
# Name reward signals by string in case we have duplicates later
self.reward_signals[reward_signal.value] = create_reward_signal(
self.policy, reward_signal, settings
)
self.update_dict.update(
self.reward_signals[reward_signal.value].update_dict
)
@classmethod
def create_optimizer_op(
cls, learning_rate: tf.Tensor, name: str = "Adam"
) -> tf.train.Optimizer:
return tf.train.AdamOptimizer(learning_rate=learning_rate, name=name)
def _execute_model(
self, feed_dict: Dict[tf.Tensor, np.ndarray], out_dict: Dict[str, tf.Tensor]
) -> Dict[str, np.ndarray]:
"""
Executes model.
:param feed_dict: Input dictionary mapping nodes to input data.
:param out_dict: Output dictionary mapping names to nodes.
:return: Dictionary mapping names to input data.
"""
network_out = self.sess.run(list(out_dict.values()), feed_dict=feed_dict)
run_out = dict(zip(list(out_dict.keys()), network_out))
return run_out
def _make_zero_mem(self, m_size: int, length: int) -> List[np.ndarray]:
return [
np.zeros((m_size), dtype=np.float32)
for i in range(0, length, self.policy.sequence_length)
]

630
ml-agents/mlagents/trainers/policy/tf_policy.py


from typing import Any, Dict, List, Optional, Tuple, Callable
import numpy as np
from distutils.version import LooseVersion
from mlagents_envs.timers import timed
from mlagents.tf_utils import tf
from mlagents import tf_utils
from mlagents_envs.exception import UnityException
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.policy import Policy
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.torch.action_log_probs import LogProbsTuple
from mlagents.trainers.behavior_id_utils import get_global_agent_id
from mlagents_envs.base_env import DecisionSteps, ActionTuple, BehaviorSpec
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.settings import TrainerSettings, EncoderType
from mlagents.trainers import __version__
from mlagents.trainers.tf.distributions import (
GaussianDistribution,
MultiCategoricalDistribution,
)
from mlagents.tf_utils.globals import get_rank
logger = get_logger(__name__)
# This is the version number of the inputs and outputs of the model, and
# determines compatibility with inference in Barracuda.
MODEL_FORMAT_VERSION = 2
EPSILON = 1e-6 # Small value to avoid divide by zero
class UnityPolicyException(UnityException):
"""
Related to errors with the Trainer.
"""
pass
class TFPolicy(Policy):
"""
Contains a learning model, and the necessary
functions to save/load models and create the input placeholders.
"""
# Callback function used at the start of training to synchronize weights.
# By default, this nothing.
# If this needs to be used, it should be done from outside ml-agents.
broadcast_global_variables: Callable[[int], None] = lambda root_rank: None
def __init__(
self,
seed: int,
behavior_spec: BehaviorSpec,
trainer_settings: TrainerSettings,
tanh_squash: bool = False,
reparameterize: bool = False,
condition_sigma_on_obs: bool = True,
create_tf_graph: bool = True,
):
"""
Initialized the policy.
:param seed: Random seed to use for TensorFlow.
:param brain: The corresponding Brain for this policy.
:param trainer_settings: The trainer parameters.
"""
super().__init__(
seed,
behavior_spec,
trainer_settings,
tanh_squash,
reparameterize,
condition_sigma_on_obs,
)
if (
self.behavior_spec.action_spec.continuous_size > 0
and self.behavior_spec.action_spec.discrete_size > 0
):
raise UnityPolicyException(
"TensorFlow does not support continuous and discrete actions on the same behavior. "
"Please run with the Torch framework."
)
# for ghost trainer save/load snapshots
self.assign_phs: List[tf.Tensor] = []
self.assign_ops: List[tf.Operation] = []
self.update_dict: Dict[str, tf.Tensor] = {}
self.inference_dict: Dict[str, tf.Tensor] = {}
self.first_normalization_update: bool = False
self.graph = tf.Graph()
self.sess = tf.Session(
config=tf_utils.generate_session_config(), graph=self.graph
)
self._initialize_tensorflow_references()
self.grads = None
self.update_batch: Optional[tf.Operation] = None
self.trainable_variables: List[tf.Variable] = []
self.rank = get_rank()
if create_tf_graph:
self.create_tf_graph()
def get_trainable_variables(self) -> List[tf.Variable]:
"""
Returns a List of the trainable variables in this policy. if create_tf_graph hasn't been called,
returns empty list.
"""
return self.trainable_variables
def create_tf_graph(self) -> None:
"""
Builds the tensorflow graph needed for this policy.
"""
with self.graph.as_default():
tf.set_random_seed(self.seed)
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
if len(_vars) > 0:
# We assume the first thing created in the graph is the Policy. If
# already populated, don't create more tensors.
return
self.create_input_placeholders()
encoded = self._create_encoder(
self.visual_in,
self.processed_vector_in,
self.h_size,
self.num_layers,
self.vis_encode_type,
)
if self.use_continuous_act:
self._create_cc_actor(
encoded,
self.tanh_squash,
self.reparameterize,
self.condition_sigma_on_obs,
)
else:
self._create_dc_actor(encoded)
self.trainable_variables = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, scope="policy"
)
self.trainable_variables += tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, scope="lstm"
) # LSTMs need to be root scope for Barracuda export
self.inference_dict = {
"action": self.output,
"log_probs": self.all_log_probs,
"entropy": self.entropy,
}
if self.use_continuous_act:
self.inference_dict["pre_action"] = self.output_pre
if self.use_recurrent:
self.inference_dict["memory_out"] = self.memory_out
# We do an initialize to make the Policy usable out of the box. If an optimizer is needed,
# it will re-load the full graph
self.initialize()
# Create assignment ops for Ghost Trainer
self.init_load_weights()
def _create_encoder(
self,
visual_in: List[tf.Tensor],
vector_in: tf.Tensor,
h_size: int,
num_layers: int,
vis_encode_type: EncoderType,
) -> tf.Tensor:
"""
Creates an encoder for visual and vector observations.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
:return: The hidden layer (tf.Tensor) after the encoder.
"""
with tf.variable_scope("policy"):
encoded = ModelUtils.create_observation_streams(
self.visual_in,
self.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
return encoded
@staticmethod
def _convert_version_string(version_string: str) -> Tuple[int, ...]:
"""
Converts the version string into a Tuple of ints (major_ver, minor_ver, patch_ver).
:param version_string: The semantic-versioned version string (X.Y.Z).
:return: A Tuple containing (major_ver, minor_ver, patch_ver).
"""
ver = LooseVersion(version_string)
return tuple(map(int, ver.version[0:3]))
def initialize(self):
with self.graph.as_default():
init = tf.global_variables_initializer()
self.sess.run(init)
def get_weights(self):
with self.graph.as_default():
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
values = [v.eval(session=self.sess) for v in _vars]
return values
def init_load_weights(self):
with self.graph.as_default():
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
values = [v.eval(session=self.sess) for v in _vars]
for var, value in zip(_vars, values):
assign_ph = tf.placeholder(var.dtype, shape=value.shape)
self.assign_phs.append(assign_ph)
self.assign_ops.append(tf.assign(var, assign_ph))
def load_weights(self, values):
if len(self.assign_ops) == 0:
logger.warning(
"Calling load_weights in tf_policy but assign_ops is empty. Did you forget to call init_load_weights?"
)
with self.graph.as_default():
feed_dict = {}
for assign_ph, value in zip(self.assign_phs, values):
feed_dict[assign_ph] = value
self.sess.run(self.assign_ops, feed_dict=feed_dict)
@timed
def evaluate(
self, decision_requests: DecisionSteps, global_agent_ids: List[str]
) -> Dict[str, Any]:
"""
Evaluates policy for the agent experiences provided.
:param decision_requests: DecisionSteps object containing inputs.
:param global_agent_ids: The global (with worker ID) agent ids of the data in the batched_step_result.
:return: Outputs from network as defined by self.inference_dict.
"""
feed_dict = {
self.batch_size_ph: len(decision_requests),
self.sequence_length_ph: 1,
}
if self.use_recurrent:
if not self.use_continuous_act:
feed_dict[self.prev_action] = self.retrieve_previous_action(
global_agent_ids
)
feed_dict[self.memory_in] = self.retrieve_memories(global_agent_ids)
feed_dict = self.fill_eval_dict(feed_dict, decision_requests)
run_out = self._execute_model(feed_dict, self.inference_dict)
return run_out
def get_action(
self, decision_requests: DecisionSteps, worker_id: int = 0
) -> ActionInfo:
"""
Decides actions given observations information, and takes them in environment.
:param decision_requests: A dictionary of brain names and DecisionSteps from environment.
:param worker_id: In parallel environment training, the unique id of the environment worker that
the DecisionSteps came from. Used to construct a globally unique id for each agent.
:return: an ActionInfo containing action, memories, values and an object
to be passed to add experiences
"""
if len(decision_requests) == 0:
return ActionInfo.empty()
global_agent_ids = [
get_global_agent_id(worker_id, int(agent_id))
for agent_id in decision_requests.agent_id
] # For 1-D array, the iterator order is correct.
run_out = self.evaluate( # pylint: disable=assignment-from-no-return
decision_requests, global_agent_ids
)
self.save_memories(global_agent_ids, run_out.get("memory_out"))
# For Compatibility with buffer changes for hybrid action support
if "log_probs" in run_out:
log_probs_tuple = LogProbsTuple()
if self.behavior_spec.action_spec.is_continuous():
log_probs_tuple.add_continuous(run_out["log_probs"])
else:
log_probs_tuple.add_discrete(run_out["log_probs"])
run_out["log_probs"] = log_probs_tuple
if "action" in run_out:
action_tuple = ActionTuple()
env_action_tuple = ActionTuple()
if self.behavior_spec.action_spec.is_continuous():
action_tuple.add_continuous(run_out["pre_action"])
env_action_tuple.add_continuous(run_out["action"])
else:
action_tuple.add_discrete(run_out["action"])
env_action_tuple.add_discrete(run_out["action"])
run_out["action"] = action_tuple
run_out["env_action"] = env_action_tuple
self.check_nan_action(run_out.get("action"))
return ActionInfo(
action=run_out.get("action"),
env_action=run_out.get("env_action"),
value=run_out.get("value"),
outputs=run_out,
agent_ids=decision_requests.agent_id,
)
def update(self, mini_batch, num_sequences):
"""
Performs update of the policy.
:param num_sequences: Number of experience trajectories in batch.
:param mini_batch: Batch of experiences.
:return: Results of update.
"""
raise UnityPolicyException("The update function was not implemented.")
def _execute_model(self, feed_dict, out_dict):
"""
Executes model.
:param feed_dict: Input dictionary mapping nodes to input data.
:param out_dict: Output dictionary mapping names to nodes.
:return: Dictionary mapping names to input data.
"""
network_out = self.sess.run(list(out_dict.values()), feed_dict=feed_dict)
run_out = dict(zip(list(out_dict.keys()), network_out))
return run_out
def fill_eval_dict(self, feed_dict, batched_step_result):
vec_vis_obs = SplitObservations.from_observations(batched_step_result.obs)
for i, _ in enumerate(vec_vis_obs.visual_observations):
feed_dict[self.visual_in[i]] = vec_vis_obs.visual_observations[i]
if self.use_vec_obs:
feed_dict[self.vector_in] = vec_vis_obs.vector_observations
if not self.use_continuous_act:
mask = np.ones(
(
len(batched_step_result),
sum(self.behavior_spec.action_spec.discrete_branches),
),
dtype=np.float32,
)
if batched_step_result.action_mask is not None:
mask = 1 - np.concatenate(batched_step_result.action_mask, axis=1)
feed_dict[self.action_masks] = mask
return feed_dict
def get_current_step(self):
"""
Gets current model step.
:return: current model step.
"""
step = self.sess.run(self.global_step)
return step
def set_step(self, step: int) -> int:
"""
Sets current model step to step without creating additional ops.
:param step: Step to set the current model step to.
:return: The step the model was set to.
"""
current_step = self.get_current_step()
# Increment a positive or negative number of steps.
return self.increment_step(step - current_step)
def increment_step(self, n_steps):
"""
Increments model step.
"""
out_dict = {
"global_step": self.global_step,
"increment_step": self.increment_step_op,
}
feed_dict = {self.steps_to_increment: n_steps}
return self.sess.run(out_dict, feed_dict=feed_dict)["global_step"]
def get_inference_vars(self):
"""
:return:list of inference var names
"""
return list(self.inference_dict.keys())
def get_update_vars(self):
"""
:return:list of update var names
"""
return list(self.update_dict.keys())
def update_normalization(self, vector_obs: np.ndarray) -> None:
"""
If this policy normalizes vector observations, this will update the norm values in the graph.
:param vector_obs: The vector observations to add to the running estimate of the distribution.
"""
if self.use_vec_obs and self.normalize:
if self.first_normalization_update:
self.sess.run(
self.init_normalization_op, feed_dict={self.vector_in: vector_obs}
)
self.first_normalization_update = False
else:
self.sess.run(
self.update_normalization_op, feed_dict={self.vector_in: vector_obs}
)
@property
def use_vis_obs(self):
return self.vis_obs_size > 0
@property
def use_vec_obs(self):
return self.vec_obs_size > 0
def _initialize_tensorflow_references(self):
self.value_heads: Dict[str, tf.Tensor] = {}
self.normalization_steps: Optional[tf.Variable] = None
self.running_mean: Optional[tf.Variable] = None
self.running_variance: Optional[tf.Variable] = None
self.init_normalization_op: Optional[tf.Operation] = None
self.update_normalization_op: Optional[tf.Operation] = None
self.value: Optional[tf.Tensor] = None
self.all_log_probs: tf.Tensor = None
self.total_log_probs: Optional[tf.Tensor] = None
self.entropy: Optional[tf.Tensor] = None
self.output_pre: Optional[tf.Tensor] = None
self.output: Optional[tf.Tensor] = None
self.selected_actions: tf.Tensor = None
self.action_masks: Optional[tf.Tensor] = None
self.prev_action: Optional[tf.Tensor] = None
self.memory_in: Optional[tf.Tensor] = None
self.memory_out: Optional[tf.Tensor] = None
self.version_tensors: Optional[Tuple[tf.Tensor, tf.Tensor, tf.Tensor]] = None
def create_input_placeholders(self):
with self.graph.as_default():
(
self.global_step,
self.increment_step_op,
self.steps_to_increment,
) = ModelUtils.create_global_steps()
self.vector_in, self.visual_in = ModelUtils.create_input_placeholders(
self.behavior_spec.observation_shapes
)
if self.normalize:
self.first_normalization_update = True
normalization_tensors = ModelUtils.create_normalizer(self.vector_in)
self.update_normalization_op = normalization_tensors.update_op
self.init_normalization_op = normalization_tensors.init_op
self.normalization_steps = normalization_tensors.steps
self.running_mean = normalization_tensors.running_mean
self.running_variance = normalization_tensors.running_variance
self.processed_vector_in = ModelUtils.normalize_vector_obs(
self.vector_in,
self.running_mean,
self.running_variance,
self.normalization_steps,
)
else:
self.processed_vector_in = self.vector_in
self.update_normalization_op = None
self.batch_size_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="batch_size"
)
self.sequence_length_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="sequence_length"
)
self.mask_input = tf.placeholder(
shape=[None], dtype=tf.float32, name="masks"
)
# Only needed for PPO, but needed for BC module
self.epsilon = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="epsilon"
)
self.mask = tf.cast(self.mask_input, tf.int32)
tf.Variable(
int(self.behavior_spec.action_spec.is_continuous()),
name="is_continuous_control",
trainable=False,
dtype=tf.int32,
)
int_version = TFPolicy._convert_version_string(__version__)
major_ver_t = tf.Variable(
int_version[0],
name="trainer_major_version",
trainable=False,
dtype=tf.int32,
)
minor_ver_t = tf.Variable(
int_version[1],
name="trainer_minor_version",
trainable=False,
dtype=tf.int32,
)
patch_ver_t = tf.Variable(
int_version[2],
name="trainer_patch_version",
trainable=False,
dtype=tf.int32,
)
self.version_tensors = (major_ver_t, minor_ver_t, patch_ver_t)
tf.Variable(
MODEL_FORMAT_VERSION,
name="version_number",
trainable=False,
dtype=tf.int32,
)
tf.Variable(
self.m_size, name="memory_size", trainable=False, dtype=tf.int32
)
if self.behavior_spec.action_spec.is_continuous():
tf.Variable(
self.act_size[0],
name="action_output_shape",
trainable=False,
dtype=tf.int32,
)
else:
tf.Variable(
sum(self.act_size),
name="action_output_shape",
trainable=False,
dtype=tf.int32,
)
def _create_cc_actor(
self,
encoded: tf.Tensor,
tanh_squash: bool = False,
reparameterize: bool = False,
condition_sigma_on_obs: bool = True,
) -> None:
"""
Creates Continuous control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
:param tanh_squash: Whether to use a tanh function, or a clipped output.
:param reparameterize: Whether we are using the resampling trick to update the policy.
"""
if self.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder(
encoded, self.memory_in, self.sequence_length_ph, name="lstm_policy"
)
self.memory_out = tf.identity(memory_policy_out, name="recurrent_out")
else:
hidden_policy = encoded
with tf.variable_scope("policy"):
distribution = GaussianDistribution(
hidden_policy,
self.act_size,
reparameterize=reparameterize,
tanh_squash=tanh_squash,
condition_sigma=condition_sigma_on_obs,
)
if tanh_squash:
self.output_pre = distribution.sample
self.output = tf.identity(self.output_pre, name="action")
else:
self.output_pre = distribution.sample
# Clip and scale output to ensure actions are always within [-1, 1] range.
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3
self.output = tf.identity(output_post, name="action")
self.selected_actions = tf.stop_gradient(self.output)
self.all_log_probs = tf.identity(distribution.log_probs, name="action_probs")
self.entropy = distribution.entropy
# We keep these tensors the same name, but use new nodes to keep code parallelism with discrete control.
self.total_log_probs = distribution.total_log_probs
def _create_dc_actor(self, encoded: tf.Tensor) -> None:
"""
Creates Discrete control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
"""
if self.use_recurrent:
self.prev_action = tf.placeholder(
shape=[None, len(self.act_size)], dtype=tf.int32, name="prev_action"
)
prev_action_oh = tf.concat(
[
tf.one_hot(self.prev_action[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
hidden_policy = tf.concat([encoded, prev_action_oh], axis=1)
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder(
hidden_policy,
self.memory_in,
self.sequence_length_ph,
name="lstm_policy",
)
self.memory_out = tf.identity(memory_policy_out, "recurrent_out")
else:
hidden_policy = encoded
self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
with tf.variable_scope("policy"):
distribution = MultiCategoricalDistribution(
hidden_policy, self.act_size, self.action_masks
)
# It's important that we are able to feed_dict a value into this tensor to get the
# right one-hot encoding, so we can't do identity on it.
self.output = distribution.sample
self.all_log_probs = tf.identity(distribution.log_probs, name="action")
self.selected_actions = tf.stop_gradient(
distribution.sample_onehot
) # In discrete, these are onehot
self.entropy = distribution.entropy
self.total_log_probs = distribution.total_log_probs

361
ml-agents/mlagents/trainers/ppo/optimizer_tf.py


from typing import Optional, Any, Dict, cast
import numpy as np
from mlagents.tf_utils import tf
from mlagents_envs.timers import timed
from mlagents.trainers.tf.models import ModelUtils, EncoderType
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.settings import TrainerSettings, PPOSettings
class PPOOptimizer(TFOptimizer):
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
"""
Takes a Policy and a Dict of trainer parameters and creates an Optimizer around the policy.
The PPO optimizer has a value estimator and a loss function.
:param policy: A TFPolicy object that will be updated by this PPO Optimizer.
:param trainer_params: Trainer parameters dictionary that specifies the properties of the trainer.
"""
# Create the graph here to give more granular control of the TF graph to the Optimizer.
policy.create_tf_graph()
with policy.graph.as_default():
with tf.variable_scope("optimizer/"):
super().__init__(policy, trainer_params)
hyperparameters: PPOSettings = cast(
PPOSettings, trainer_params.hyperparameters
)
lr = float(hyperparameters.learning_rate)
self._schedule = hyperparameters.learning_rate_schedule
epsilon = float(hyperparameters.epsilon)
beta = float(hyperparameters.beta)
max_step = float(trainer_params.max_steps)
policy_network_settings = policy.network_settings
h_size = int(policy_network_settings.hidden_units)
num_layers = policy_network_settings.num_layers
vis_encode_type = policy_network_settings.vis_encode_type
self.burn_in_ratio = 0.0
self.stream_names = list(self.reward_signals.keys())
self.tf_optimizer_op: Optional[tf.train.Optimizer] = None
self.grads = None
self.update_batch: Optional[tf.Operation] = None
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
"Policy/Learning Rate": "learning_rate",
"Policy/Epsilon": "decay_epsilon",
"Policy/Beta": "decay_beta",
}
if self.policy.use_recurrent:
self.m_size = self.policy.m_size
self.memory_in = tf.placeholder(
shape=[None, self.m_size],
dtype=tf.float32,
name="recurrent_value_in",
)
if num_layers < 1:
num_layers = 1
if policy.use_continuous_act:
self._create_cc_critic(h_size, num_layers, vis_encode_type)
else:
self._create_dc_critic(h_size, num_layers, vis_encode_type)
self.learning_rate = ModelUtils.create_schedule(
self._schedule,
lr,
self.policy.global_step,
int(max_step),
min_value=1e-10,
)
self._create_losses(
self.policy.total_log_probs,
self.old_log_probs,
self.value_heads,
self.policy.entropy,
beta,
epsilon,
lr,
max_step,
)
self._create_ppo_optimizer_ops()
self.update_dict.update(
{
"value_loss": self.value_loss,
"policy_loss": self.abs_policy_loss,
"update_batch": self.update_batch,
"learning_rate": self.learning_rate,
"decay_epsilon": self.decay_epsilon,
"decay_beta": self.decay_beta,
}
)
def _create_cc_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Continuous control critic (value) network.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: The type of visual encoder to use.
"""
hidden_stream = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
if self.policy.use_recurrent:
hidden_value, memory_value_out = ModelUtils.create_recurrent_encoder(
hidden_stream,
self.memory_in,
self.policy.sequence_length_ph,
name="lstm_value",
)
self.memory_out = memory_value_out
else:
hidden_value = hidden_stream
self.value_heads, self.value = ModelUtils.create_value_heads(
self.stream_names, hidden_value
)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.policy.act_size)],
dtype=tf.float32,
name="old_probabilities",
)
self.old_log_probs = tf.reduce_sum(
(tf.identity(self.all_old_log_probs)), axis=1, keepdims=True
)
def _create_dc_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Discrete control critic (value) network.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: The type of visual encoder to use.
"""
hidden_stream = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
if self.policy.use_recurrent:
hidden_value, memory_value_out = ModelUtils.create_recurrent_encoder(
hidden_stream,
self.memory_in,
self.policy.sequence_length_ph,
name="lstm_value",
)
self.memory_out = memory_value_out
else:
hidden_value = hidden_stream
self.value_heads, self.value = ModelUtils.create_value_heads(
self.stream_names, hidden_value
)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.policy.act_size)],
dtype=tf.float32,
name="old_probabilities",
)
# Break old log log_probs into separate branches
old_log_prob_branches = ModelUtils.break_into_branches(
self.all_old_log_probs, self.policy.act_size
)
_, _, old_normalized_logits = ModelUtils.create_discrete_action_masking_layer(
old_log_prob_branches, self.policy.action_masks, self.policy.act_size
)
action_idx = [0] + list(np.cumsum(self.policy.act_size))
self.old_log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.policy.selected_actions[
:, action_idx[i] : action_idx[i + 1]
],
logits=old_normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.policy.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)
def _create_losses(
self, probs, old_probs, value_heads, entropy, beta, epsilon, lr, max_step
):
"""
Creates training-specific Tensorflow ops for PPO models.
:param probs: Current policy probabilities
:param old_probs: Past policy probabilities
:param value_heads: Value estimate tensors from each value stream
:param beta: Entropy regularization strength
:param entropy: Current policy entropy
:param epsilon: Value for policy-divergence threshold
:param lr: Learning rate
:param max_step: Total number of training steps.
"""
self.returns_holders = {}
self.old_values = {}
for name in value_heads.keys():
returns_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_returns"
)
old_value = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_value_estimate"
)
self.returns_holders[name] = returns_holder
self.old_values[name] = old_value
self.advantage = tf.placeholder(
shape=[None], dtype=tf.float32, name="advantages"
)
advantage = tf.expand_dims(self.advantage, -1)
self.decay_epsilon = ModelUtils.create_schedule(
self._schedule, epsilon, self.policy.global_step, max_step, min_value=0.1
)
self.decay_beta = ModelUtils.create_schedule(
self._schedule, beta, self.policy.global_step, max_step, min_value=1e-5
)
value_losses = []
for name, head in value_heads.items():
clipped_value_estimate = self.old_values[name] + tf.clip_by_value(
tf.reduce_sum(head, axis=1) - self.old_values[name],
-self.decay_epsilon,
self.decay_epsilon,
)
v_opt_a = tf.squared_difference(
self.returns_holders[name], tf.reduce_sum(head, axis=1)
)
v_opt_b = tf.squared_difference(
self.returns_holders[name], clipped_value_estimate
)
value_loss = tf.reduce_mean(
tf.dynamic_partition(tf.maximum(v_opt_a, v_opt_b), self.policy.mask, 2)[
1
]
)
value_losses.append(value_loss)
self.value_loss = tf.reduce_mean(value_losses)
r_theta = tf.exp(probs - old_probs)
p_opt_a = r_theta * advantage
p_opt_b = (
tf.clip_by_value(
r_theta, 1.0 - self.decay_epsilon, 1.0 + self.decay_epsilon
)
* advantage
)
self.policy_loss = -tf.reduce_mean(
tf.dynamic_partition(tf.minimum(p_opt_a, p_opt_b), self.policy.mask, 2)[1]
)
# For cleaner stats reporting
self.abs_policy_loss = tf.abs(self.policy_loss)
self.loss = (
self.policy_loss
+ 0.5 * self.value_loss
- self.decay_beta
* tf.reduce_mean(tf.dynamic_partition(entropy, self.policy.mask, 2)[1])
)
def _create_ppo_optimizer_ops(self):
self.tf_optimizer_op = self.create_optimizer_op(self.learning_rate)
self.grads = self.tf_optimizer_op.compute_gradients(self.loss)
self.update_batch = self.tf_optimizer_op.minimize(self.loss)
@timed
def update(self, batch: AgentBuffer, num_sequences: int) -> Dict[str, float]:
"""
Performs update on model.
:param mini_batch: Batch of experiences.
:param num_sequences: Number of sequences to process.
:return: Results of update.
"""
feed_dict = self._construct_feed_dict(batch, num_sequences)
stats_needed = self.stats_name_to_update_name
update_stats = {}
# Collect feed dicts for all reward signals.
for _, reward_signal in self.reward_signals.items():
feed_dict.update(
reward_signal.prepare_update(self.policy, batch, num_sequences)
)
stats_needed.update(reward_signal.stats_name_to_update_name)
update_vals = self._execute_model(feed_dict, self.update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
return update_stats
def _construct_feed_dict(
self, mini_batch: AgentBuffer, num_sequences: int
) -> Dict[tf.Tensor, Any]:
# Do an optional burn-in for memories
num_burn_in = int(self.burn_in_ratio * self.policy.sequence_length)
burn_in_mask = np.ones((self.policy.sequence_length), dtype=np.float32)
burn_in_mask[range(0, num_burn_in)] = 0
burn_in_mask = np.tile(burn_in_mask, num_sequences)
feed_dict = {
self.policy.batch_size_ph: num_sequences,
self.policy.sequence_length_ph: self.policy.sequence_length,
self.policy.mask_input: mini_batch["masks"] * burn_in_mask,
self.advantage: mini_batch["advantages"],
}
for name in self.reward_signals:
feed_dict[self.returns_holders[name]] = mini_batch[f"{name}_returns"]
feed_dict[self.old_values[name]] = mini_batch[f"{name}_value_estimates"]
if self.policy.use_continuous_act: # For hybrid action buffer support
feed_dict[self.all_old_log_probs] = mini_batch["continuous_log_probs"]
feed_dict[self.policy.output_pre] = mini_batch["continuous_action"]
else:
feed_dict[self.all_old_log_probs] = mini_batch["discrete_log_probs"]
feed_dict[self.policy.output] = mini_batch["discrete_action"]
if self.policy.use_recurrent:
feed_dict[self.policy.prev_action] = mini_batch["prev_action"]
feed_dict[self.policy.action_masks] = mini_batch["action_mask"]
if "vector_obs" in mini_batch:
feed_dict[self.policy.vector_in] = mini_batch["vector_obs"]
if self.policy.vis_obs_size > 0:
for i, _ in enumerate(self.policy.visual_in):
feed_dict[self.policy.visual_in[i]] = mini_batch["visual_obs%d" % i]
if self.policy.use_recurrent:
feed_dict[self.policy.memory_in] = [
mini_batch["memory"][i]
for i in range(
0, len(mini_batch["memory"]), self.policy.sequence_length
)
]
feed_dict[self.memory_in] = self._make_zero_mem(
self.m_size, mini_batch.num_experiences
)
return feed_dict

444
ml-agents/mlagents/trainers/sac/network.py


from typing import Dict, Optional
from mlagents.tf_utils import tf
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.settings import EncoderType
LOG_STD_MAX = 2
LOG_STD_MIN = -20
EPSILON = 1e-6 # Small value to avoid divide by zero
DISCRETE_TARGET_ENTROPY_SCALE = 0.2 # Roughly equal to e-greedy 0.05
CONTINUOUS_TARGET_ENTROPY_SCALE = 1.0 # TODO: Make these an optional hyperparam.
POLICY_SCOPE = ""
TARGET_SCOPE = "target_network"
class SACNetwork:
"""
Base class for an SAC network. Implements methods for creating the actor and critic heads.
"""
def __init__(
self,
policy=None,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
self.normalize = normalize
self.use_recurrent = use_recurrent
self.num_layers = num_layers
self.stream_names = stream_names
self.h_size = h_size
self.activ_fn = ModelUtils.swish
self.sequence_length_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="sac_sequence_length"
)
self.policy_memory_in: Optional[tf.Tensor] = None
self.policy_memory_out: Optional[tf.Tensor] = None
self.value_memory_in: Optional[tf.Tensor] = None
self.value_memory_out: Optional[tf.Tensor] = None
self.q1: Optional[tf.Tensor] = None
self.q2: Optional[tf.Tensor] = None
self.q1_p: Optional[tf.Tensor] = None
self.q2_p: Optional[tf.Tensor] = None
self.q1_memory_in: Optional[tf.Tensor] = None
self.q2_memory_in: Optional[tf.Tensor] = None
self.q1_memory_out: Optional[tf.Tensor] = None
self.q2_memory_out: Optional[tf.Tensor] = None
self.prev_action: Optional[tf.Tensor] = None
self.action_masks: Optional[tf.Tensor] = None
self.external_action_in: Optional[tf.Tensor] = None
self.log_sigma_sq: Optional[tf.Tensor] = None
self.entropy: Optional[tf.Tensor] = None
self.deterministic_output: Optional[tf.Tensor] = None
self.normalized_logprobs: Optional[tf.Tensor] = None
self.action_probs: Optional[tf.Tensor] = None
self.output_oh: Optional[tf.Tensor] = None
self.output_pre: Optional[tf.Tensor] = None
self.value_vars = None
self.q_vars = None
self.critic_vars = None
self.policy_vars = None
self.q1_heads: Dict[str, tf.Tensor] = None
self.q2_heads: Dict[str, tf.Tensor] = None
self.q1_pheads: Dict[str, tf.Tensor] = None
self.q2_pheads: Dict[str, tf.Tensor] = None
self.policy = policy
def get_vars(self, scope):
return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope)
def join_scopes(self, scope_1, scope_2):
"""
Joins two scopes. Does so safetly (i.e., if one of the two scopes doesn't
exist, don't add any backslashes)
"""
if not scope_1:
return scope_2
if not scope_2:
return scope_1
else:
return "/".join(filter(None, [scope_1, scope_2]))
def create_value_heads(self, stream_names, hidden_input):
"""
Creates one value estimator head for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
"""
self.value_heads = {}
for name in stream_names:
value = tf.layers.dense(hidden_input, 1, name=f"{name}_value")
self.value_heads[name] = value
self.value = tf.reduce_mean(list(self.value_heads.values()), 0)
def _create_cc_critic(self, hidden_value, scope, create_qs=True):
"""
Creates just the critic network
"""
scope = self.join_scopes(scope, "critic")
self.create_sac_value_head(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "value"),
)
self.external_action_in = tf.placeholder(
shape=[None, self.policy.act_size[0]],
dtype=tf.float32,
name="external_action_in",
)
self.value_vars = self.get_vars(self.join_scopes(scope, "value"))
if create_qs:
hidden_q = tf.concat([hidden_value, self.external_action_in], axis=-1)
hidden_qp = tf.concat([hidden_value, self.policy.output], axis=-1)
self.q1_heads, self.q2_heads, self.q1, self.q2 = self.create_q_heads(
self.stream_names,
hidden_q,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
)
self.q1_pheads, self.q2_pheads, self.q1_p, self.q2_p = self.create_q_heads(
self.stream_names,
hidden_qp,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
reuse=True,
)
self.q_vars = self.get_vars(self.join_scopes(scope, "q"))
self.critic_vars = self.get_vars(scope)
def _create_dc_critic(self, hidden_value, scope, create_qs=True):
"""
Creates just the critic network
"""
scope = self.join_scopes(scope, "critic")
self.create_sac_value_head(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "value"),
)
self.value_vars = self.get_vars("/".join([scope, "value"]))
if create_qs:
self.q1_heads, self.q2_heads, self.q1, self.q2 = self.create_q_heads(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
num_outputs=sum(self.policy.act_size),
)
self.q1_pheads, self.q2_pheads, self.q1_p, self.q2_p = self.create_q_heads(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
reuse=True,
num_outputs=sum(self.policy.act_size),
)
self.q_vars = self.get_vars(scope)
self.critic_vars = self.get_vars(scope)
def create_sac_value_head(
self, stream_names, hidden_input, num_layers, h_size, scope
):
"""
Creates one value estimator head for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
:param num_layers: Number of hidden layers for value network
:param h_size: size of hidden layers for value network
:param scope: TF scope for value network.
"""
with tf.variable_scope(scope):
value_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "encoder", False
)
if self.use_recurrent:
value_hidden, memory_out = ModelUtils.create_recurrent_encoder(
value_hidden,
self.value_memory_in,
self.sequence_length_ph,
name="lstm_value",
)
self.value_memory_out = memory_out
self.create_value_heads(stream_names, value_hidden)
def create_q_heads(
self,
stream_names,
hidden_input,
num_layers,
h_size,
scope,
reuse=False,
num_outputs=1,
):
"""
Creates two q heads for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
:param num_layers: Number of hidden layers for Q network
:param h_size: size of hidden layers for Q network
:param scope: TF scope for Q network.
:param reuse: Whether or not to reuse variables. Useful for creating Q of policy.
:param num_outputs: Number of outputs of each Q function. If discrete, equal to number of actions.
"""
with tf.variable_scope(self.join_scopes(scope, "q1_encoding"), reuse=reuse):
q1_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q1_encoder", reuse
)
if self.use_recurrent:
q1_hidden, memory_out = ModelUtils.create_recurrent_encoder(
q1_hidden,
self.q1_memory_in,
self.sequence_length_ph,
name="lstm_q1",
)
self.q1_memory_out = memory_out
q1_heads = {}
for name in stream_names:
_q1 = tf.layers.dense(q1_hidden, num_outputs, name=f"{name}_q1")
q1_heads[name] = _q1
q1 = tf.reduce_mean(list(q1_heads.values()), axis=0)
with tf.variable_scope(self.join_scopes(scope, "q2_encoding"), reuse=reuse):
q2_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q2_encoder", reuse
)
if self.use_recurrent:
q2_hidden, memory_out = ModelUtils.create_recurrent_encoder(
q2_hidden,
self.q2_memory_in,
self.sequence_length_ph,
name="lstm_q2",
)
self.q2_memory_out = memory_out
q2_heads = {}
for name in stream_names:
_q2 = tf.layers.dense(q2_hidden, num_outputs, name=f"{name}_q2")
q2_heads[name] = _q2
q2 = tf.reduce_mean(list(q2_heads.values()), axis=0)
return q1_heads, q2_heads, q1, q2
class SACTargetNetwork(SACNetwork):
"""
Instantiation for the SAC target network. Only contains a single
value estimator and is updated from the Policy Network.
"""
def __init__(
self,
policy,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
super().__init__(
policy,
m_size,
h_size,
normalize,
use_recurrent,
num_layers,
stream_names,
vis_encode_type,
)
with tf.variable_scope(TARGET_SCOPE):
self.vector_in, self.visual_in = ModelUtils.create_input_placeholders(
self.policy.behavior_spec.observation_shapes
)
if self.policy.normalize:
normalization_tensors = ModelUtils.create_normalizer(self.vector_in)
self.update_normalization_op = normalization_tensors.update_op
self.normalization_steps = normalization_tensors.steps
self.running_mean = normalization_tensors.running_mean
self.running_variance = normalization_tensors.running_variance
self.processed_vector_in = ModelUtils.normalize_vector_obs(
self.vector_in,
self.running_mean,
self.running_variance,
self.normalization_steps,
)
else:
self.processed_vector_in = self.vector_in
self.update_normalization_op = None
if self.policy.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, m_size], dtype=tf.float32, name="target_recurrent_in"
)
self.value_memory_in = self.memory_in
hidden_streams = ModelUtils.create_observation_streams(
self.visual_in,
self.processed_vector_in,
1,
self.h_size,
0,
vis_encode_type=vis_encode_type,
stream_scopes=["critic/value/"],
)
if self.policy.use_continuous_act:
self._create_cc_critic(hidden_streams[0], TARGET_SCOPE, create_qs=False)
else:
self._create_dc_critic(hidden_streams[0], TARGET_SCOPE, create_qs=False)
if self.use_recurrent:
self.memory_out = tf.concat(
self.value_memory_out, axis=1
) # Needed for Barracuda to work
def copy_normalization(self, mean, variance, steps):
"""
Copies the mean, variance, and steps into the normalizers of the
input of this SACNetwork. Used to copy the normalizer from the policy network
to the target network.
param mean: Tensor containing the mean.
param variance: Tensor containing the variance
param steps: Tensor containing the number of steps.
"""
update_mean = tf.assign(self.running_mean, mean)
update_variance = tf.assign(self.running_variance, variance)
update_norm_step = tf.assign(self.normalization_steps, steps)
return tf.group([update_mean, update_variance, update_norm_step])
class SACPolicyNetwork(SACNetwork):
"""
Instantiation for SAC policy network. Contains a dual Q estimator,
a value estimator, and a reference to the actual policy network.
"""
def __init__(
self,
policy,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
super().__init__(
policy,
m_size,
h_size,
normalize,
use_recurrent,
num_layers,
stream_names,
vis_encode_type,
)
if self.policy.use_recurrent:
self._create_memory_ins(m_size)
hidden_critic = self._create_observation_in(vis_encode_type)
# Use the sequence length of the policy
self.sequence_length_ph = self.policy.sequence_length_ph
if self.policy.use_continuous_act:
self._create_cc_critic(hidden_critic, POLICY_SCOPE)
else:
self._create_dc_critic(hidden_critic, POLICY_SCOPE)
if self.use_recurrent:
mem_outs = [self.value_memory_out, self.q1_memory_out, self.q2_memory_out]
self.memory_out = tf.concat(mem_outs, axis=1)
def _create_memory_ins(self, m_size):
"""
Creates the memory input placeholders for LSTM.
:param m_size: the total size of the memory.
"""
self.memory_in = tf.placeholder(
shape=[None, m_size * 3], dtype=tf.float32, name="value_recurrent_in"
)
# Re-break-up for each network
num_mems = 3
input_size = self.memory_in.get_shape().as_list()[1]
mem_ins = []
for i in range(num_mems):
_start = input_size // num_mems * i
_end = input_size // num_mems * (i + 1)
mem_ins.append(self.memory_in[:, _start:_end])
self.value_memory_in = mem_ins[0]
self.q1_memory_in = mem_ins[1]
self.q2_memory_in = mem_ins[2]
def _create_observation_in(self, vis_encode_type):
"""
Creates the observation inputs, and a CNN if needed,
:param vis_encode_type: Type of CNN encoder.
:param share_ac_cnn: Whether or not to share the actor and critic CNNs.
:return A tuple of (hidden_policy, hidden_critic). We don't save it to self since they're used
once and thrown away.
"""
with tf.variable_scope(POLICY_SCOPE):
hidden_streams = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
self.h_size,
0,
vis_encode_type=vis_encode_type,
stream_scopes=["critic/value/"],
)
hidden_critic = hidden_streams[0]
return hidden_critic

641
ml-agents/mlagents/trainers/sac/optimizer_tf.py


import numpy as np
from typing import Dict, List, Optional, Any, Mapping, cast
from mlagents.tf_utils import tf
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.sac.network import SACPolicyNetwork, SACTargetNetwork
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.buffer import AgentBuffer
from mlagents_envs.timers import timed
from mlagents.trainers.settings import TrainerSettings, SACSettings
EPSILON = 1e-6 # Small value to avoid divide by zero
logger = get_logger(__name__)
POLICY_SCOPE = ""
TARGET_SCOPE = "target_network"
class SACOptimizer(TFOptimizer):
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
"""
Takes a Unity environment and model-specific hyper-parameters and returns the
appropriate PPO agent model for the environment.
:param brain: Brain parameters used to generate specific network graph.
:param lr: Learning rate.
:param lr_schedule: Learning rate decay schedule.
:param h_size: Size of hidden layers
:param init_entcoef: Initial value for entropy coefficient. Set lower to learn faster,
set higher to explore more.
:return: a sub-class of PPOAgent tailored to the environment.
:param max_step: Total number of training steps.
:param normalize: Whether to normalize vector observation input.
:param use_recurrent: Whether to use an LSTM layer in the network.
:param num_layers: Number of hidden layers between encoded input and policy & value layers
:param tau: Strength of soft-Q update.
:param m_size: Size of brain memory.
"""
# Create the graph here to give more granular control of the TF graph to the Optimizer.
policy.create_tf_graph()
with policy.graph.as_default():
with tf.variable_scope(""):
super().__init__(policy, trainer_params)
hyperparameters: SACSettings = cast(
SACSettings, trainer_params.hyperparameters
)
lr = hyperparameters.learning_rate
lr_schedule = hyperparameters.learning_rate_schedule
max_step = trainer_params.max_steps
self.tau = hyperparameters.tau
self.init_entcoef = hyperparameters.init_entcoef
self.policy = policy
self.act_size = policy.act_size
policy_network_settings = policy.network_settings
h_size = policy_network_settings.hidden_units
num_layers = policy_network_settings.num_layers
vis_encode_type = policy_network_settings.vis_encode_type
self.tau = hyperparameters.tau
self.burn_in_ratio = 0.0
# Non-exposed SAC parameters
self.discrete_target_entropy_scale = (
0.2 # Roughly equal to e-greedy 0.05
)
self.continuous_target_entropy_scale = 1.0
stream_names = list(self.reward_signals.keys())
# Use to reduce "survivor bonus" when using Curiosity or GAIL.
self.gammas = [
_val.gamma for _val in trainer_params.reward_signals.values()
]
self.use_dones_in_backup = {
name: tf.Variable(1.0) for name in stream_names
}
self.disable_use_dones = {
name: self.use_dones_in_backup[name].assign(0.0)
for name in stream_names
}
if num_layers < 1:
num_layers = 1
self.target_init_op: List[tf.Tensor] = []
self.target_update_op: List[tf.Tensor] = []
self.update_batch_policy: Optional[tf.Operation] = None
self.update_batch_value: Optional[tf.Operation] = None
self.update_batch_entropy: Optional[tf.Operation] = None
self.policy_network = SACPolicyNetwork(
policy=self.policy,
m_size=self.policy.m_size, # 3x policy.m_size
h_size=h_size,
normalize=self.policy.normalize,
use_recurrent=self.policy.use_recurrent,
num_layers=num_layers,
stream_names=stream_names,
vis_encode_type=vis_encode_type,
)
self.target_network = SACTargetNetwork(
policy=self.policy,
m_size=self.policy.m_size, # 1x policy.m_size
h_size=h_size,
normalize=self.policy.normalize,
use_recurrent=self.policy.use_recurrent,
num_layers=num_layers,
stream_names=stream_names,
vis_encode_type=vis_encode_type,
)
# The optimizer's m_size is 3 times the policy (Q1, Q2, and Value)
self.m_size = 3 * self.policy.m_size
self._create_inputs_and_outputs()
self.learning_rate = ModelUtils.create_schedule(
lr_schedule,
lr,
self.policy.global_step,
int(max_step),
min_value=1e-10,
)
self._create_losses(
self.policy_network.q1_heads,
self.policy_network.q2_heads,
lr,
int(max_step),
stream_names,
discrete=not self.policy.use_continuous_act,
)
self._create_sac_optimizer_ops()
self.selected_actions = (
self.policy.selected_actions
) # For GAIL and other reward signals
if self.policy.normalize:
target_update_norm = self.target_network.copy_normalization(
self.policy.running_mean,
self.policy.running_variance,
self.policy.normalization_steps,
)
# Update the normalization of the optimizer when the policy does.
self.policy.update_normalization_op = tf.group(
[self.policy.update_normalization_op, target_update_norm]
)
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
"Losses/Q1 Loss": "q1_loss",
"Losses/Q2 Loss": "q2_loss",
"Policy/Entropy Coeff": "entropy_coef",
"Policy/Learning Rate": "learning_rate",
}
self.update_dict = {
"value_loss": self.total_value_loss,
"policy_loss": self.policy_loss,
"q1_loss": self.q1_loss,
"q2_loss": self.q2_loss,
"entropy_coef": self.ent_coef,
"update_batch": self.update_batch_policy,
"update_value": self.update_batch_value,
"update_entropy": self.update_batch_entropy,
"learning_rate": self.learning_rate,
}
def _create_inputs_and_outputs(self) -> None:
"""
Assign the higher-level SACModel's inputs and outputs to those of its policy or
target network.
"""
self.vector_in = self.policy.vector_in
self.visual_in = self.policy.visual_in
self.next_vector_in = self.target_network.vector_in
self.next_visual_in = self.target_network.visual_in
self.sequence_length_ph = self.policy.sequence_length_ph
self.next_sequence_length_ph = self.target_network.sequence_length_ph
if not self.policy.use_continuous_act:
self.action_masks = self.policy_network.action_masks
else:
self.output_pre = self.policy_network.output_pre
# Don't use value estimate during inference.
self.value = tf.identity(
self.policy_network.value, name="value_estimate_unused"
)
self.value_heads = self.policy_network.value_heads
self.dones_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name="dones_holder"
)
if self.policy.use_recurrent:
self.memory_in = self.policy_network.memory_in
self.memory_out = self.policy_network.memory_out
if not self.policy.use_continuous_act:
self.prev_action = self.policy_network.prev_action
self.next_memory_in = self.target_network.memory_in
def _create_losses(
self,
q1_streams: Dict[str, tf.Tensor],
q2_streams: Dict[str, tf.Tensor],
lr: tf.Tensor,
max_step: int,
stream_names: List[str],
discrete: bool = False,
) -> None:
"""
Creates training-specific Tensorflow ops for SAC models.
:param q1_streams: Q1 streams from policy network
:param q1_streams: Q2 streams from policy network
:param lr: Learning rate
:param max_step: Total number of training steps.
:param stream_names: List of reward stream names.
:param discrete: Whether or not to use discrete action losses.
"""
if discrete:
self.target_entropy = [
self.discrete_target_entropy_scale * np.log(i).astype(np.float32)
for i in self.act_size
]
discrete_action_probs = tf.exp(self.policy.all_log_probs)
per_action_entropy = discrete_action_probs * self.policy.all_log_probs
else:
self.target_entropy = (
-1
* self.continuous_target_entropy_scale
* np.prod(self.act_size[0]).astype(np.float32)
)
self.rewards_holders = {}
self.min_policy_qs = {}
for name in stream_names:
if discrete:
_branched_mpq1 = ModelUtils.break_into_branches(
self.policy_network.q1_pheads[name] * discrete_action_probs,
self.act_size,
)
branched_mpq1 = tf.stack(
[
tf.reduce_sum(_br, axis=1, keep_dims=True)
for _br in _branched_mpq1
]
)
_q1_p_mean = tf.reduce_mean(branched_mpq1, axis=0)
_branched_mpq2 = ModelUtils.break_into_branches(
self.policy_network.q2_pheads[name] * discrete_action_probs,
self.act_size,
)
branched_mpq2 = tf.stack(
[
tf.reduce_sum(_br, axis=1, keep_dims=True)
for _br in _branched_mpq2
]
)
_q2_p_mean = tf.reduce_mean(branched_mpq2, axis=0)
self.min_policy_qs[name] = tf.minimum(_q1_p_mean, _q2_p_mean)
else:
self.min_policy_qs[name] = tf.minimum(
self.policy_network.q1_pheads[name],
self.policy_network.q2_pheads[name],
)
rewards_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_rewards"
)
self.rewards_holders[name] = rewards_holder
q1_losses = []
q2_losses = []
# Multiple q losses per stream
expanded_dones = tf.expand_dims(self.dones_holder, axis=-1)
for i, name in enumerate(stream_names):
_expanded_rewards = tf.expand_dims(self.rewards_holders[name], axis=-1)
q_backup = tf.stop_gradient(
_expanded_rewards
+ (1.0 - self.use_dones_in_backup[name] * expanded_dones)
* self.gammas[i]
* self.target_network.value_heads[name]
)
if discrete:
# We need to break up the Q functions by branch, and update them individually.
branched_q1_stream = ModelUtils.break_into_branches(
self.policy.selected_actions * q1_streams[name], self.act_size
)
branched_q2_stream = ModelUtils.break_into_branches(
self.policy.selected_actions * q2_streams[name], self.act_size
)
# Reduce each branch into scalar
branched_q1_stream = [
tf.reduce_sum(_branch, axis=1, keep_dims=True)
for _branch in branched_q1_stream
]
branched_q2_stream = [
tf.reduce_sum(_branch, axis=1, keep_dims=True)
for _branch in branched_q2_stream
]
q1_stream = tf.reduce_mean(branched_q1_stream, axis=0)
q2_stream = tf.reduce_mean(branched_q2_stream, axis=0)
else:
q1_stream = q1_streams[name]
q2_stream = q2_streams[name]
_q1_loss = 0.5 * tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(q_backup, q1_stream)
)
_q2_loss = 0.5 * tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(q_backup, q2_stream)
)
q1_losses.append(_q1_loss)
q2_losses.append(_q2_loss)
self.q1_loss = tf.reduce_mean(q1_losses)
self.q2_loss = tf.reduce_mean(q2_losses)
# Learn entropy coefficient
if discrete:
# Create a log_ent_coef for each branch
self.log_ent_coef = tf.get_variable(
"log_ent_coef",
dtype=tf.float32,
initializer=np.log([self.init_entcoef] * len(self.act_size)).astype(
np.float32
),
trainable=True,
)
else:
self.log_ent_coef = tf.get_variable(
"log_ent_coef",
dtype=tf.float32,
initializer=np.log(self.init_entcoef).astype(np.float32),
trainable=True,
)
self.ent_coef = tf.exp(self.log_ent_coef)
if discrete:
# We also have to do a different entropy and target_entropy per branch.
branched_per_action_ent = ModelUtils.break_into_branches(
per_action_entropy, self.act_size
)
branched_ent_sums = tf.stack(
[
tf.reduce_sum(_lp, axis=1, keep_dims=True) + _te
for _lp, _te in zip(branched_per_action_ent, self.target_entropy)
],
axis=1,
)
self.entropy_loss = -tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.reduce_mean(
self.log_ent_coef
* tf.squeeze(tf.stop_gradient(branched_ent_sums), axis=2),
axis=1,
)
)
# Same with policy loss, we have to do the loss per branch and average them,
# so that larger branches don't get more weight.
# The equivalent KL divergence from Eq 10 of Haarnoja et al. is also pi*log(pi) - Q
branched_q_term = ModelUtils.break_into_branches(
discrete_action_probs * self.policy_network.q1_p, self.act_size
)
branched_policy_loss = tf.stack(
[
tf.reduce_sum(self.ent_coef[i] * _lp - _qt, axis=1, keep_dims=True)
for i, (_lp, _qt) in enumerate(
zip(branched_per_action_ent, branched_q_term)
)
]
)
self.policy_loss = tf.reduce_mean(
tf.to_float(self.policy.mask) * tf.squeeze(branched_policy_loss)
)
# Do vbackup entropy bonus per branch as well.
branched_ent_bonus = tf.stack(
[
tf.reduce_sum(self.ent_coef[i] * _lp, axis=1, keep_dims=True)
for i, _lp in enumerate(branched_per_action_ent)
]
)
value_losses = []
for name in stream_names:
v_backup = tf.stop_gradient(
self.min_policy_qs[name]
- tf.reduce_mean(branched_ent_bonus, axis=0)
)
value_losses.append(
0.5
* tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(
self.policy_network.value_heads[name], v_backup
)
)
)
else:
self.entropy_loss = -tf.reduce_mean(
self.log_ent_coef
* tf.to_float(self.policy.mask)
* tf.stop_gradient(
tf.reduce_sum(
self.policy.all_log_probs + self.target_entropy,
axis=1,
keep_dims=True,
)
)
)
batch_policy_loss = tf.reduce_mean(
self.ent_coef * self.policy.all_log_probs - self.policy_network.q1_p,
axis=1,
)
self.policy_loss = tf.reduce_mean(
tf.to_float(self.policy.mask) * batch_policy_loss
)
value_losses = []
for name in stream_names:
v_backup = tf.stop_gradient(
self.min_policy_qs[name]
- tf.reduce_sum(self.ent_coef * self.policy.all_log_probs, axis=1)
)
value_losses.append(
0.5
* tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(
self.policy_network.value_heads[name], v_backup
)
)
)
self.value_loss = tf.reduce_mean(value_losses)
self.total_value_loss = self.q1_loss + self.q2_loss + self.value_loss
self.entropy = self.policy_network.entropy
def _create_sac_optimizer_ops(self) -> None:
"""
Creates the Adam optimizers and update ops for SAC, including
the policy, value, and entropy updates, as well as the target network update.
"""
policy_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_policy_opt"
)
entropy_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_entropy_opt"
)
value_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_value_opt"
)
self.target_update_op = [
tf.assign(target, (1 - self.tau) * target + self.tau * source)
for target, source in zip(
self.target_network.value_vars, self.policy_network.value_vars
)
]
logger.debug("value_vars")
self.print_all_vars(self.policy_network.value_vars)
logger.debug("targvalue_vars")
self.print_all_vars(self.target_network.value_vars)
logger.debug("critic_vars")
self.print_all_vars(self.policy_network.critic_vars)
logger.debug("q_vars")
self.print_all_vars(self.policy_network.q_vars)
logger.debug("policy_vars")
policy_vars = self.policy.get_trainable_variables()
self.print_all_vars(policy_vars)
self.target_init_op = [
tf.assign(target, source)
for target, source in zip(
self.target_network.value_vars, self.policy_network.value_vars
)
]
self.update_batch_policy = policy_optimizer.minimize(
self.policy_loss, var_list=policy_vars
)
# Make sure policy is updated first, then value, then entropy.
with tf.control_dependencies([self.update_batch_policy]):
self.update_batch_value = value_optimizer.minimize(
self.total_value_loss, var_list=self.policy_network.critic_vars
)
# Add entropy coefficient optimization operation
with tf.control_dependencies([self.update_batch_value]):
self.update_batch_entropy = entropy_optimizer.minimize(
self.entropy_loss, var_list=self.log_ent_coef
)
def print_all_vars(self, variables):
for _var in variables:
logger.debug(_var)
@timed
def update(self, batch: AgentBuffer, num_sequences: int) -> Dict[str, float]:
"""
Updates model using buffer.
:param num_sequences: Number of trajectories in batch.
:param batch: Experience mini-batch.
:param update_target: Whether or not to update target value network
:param reward_signal_batches: Minibatches to use for updating the reward signals,
indexed by name. If none, don't update the reward signals.
:return: Output from update process.
"""
feed_dict = self._construct_feed_dict(self.policy, batch, num_sequences)
stats_needed = self.stats_name_to_update_name
update_stats: Dict[str, float] = {}
update_vals = self._execute_model(feed_dict, self.update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
# Update target network. By default, target update happens at every policy update.
self.sess.run(self.target_update_op)
return update_stats
def update_reward_signals(
self, reward_signal_minibatches: Mapping[str, AgentBuffer], num_sequences: int
) -> Dict[str, float]:
"""
Only update the reward signals.
:param reward_signal_batches: Minibatches to use for updating the reward signals,
indexed by name. If none, don't update the reward signals.
"""
# Collect feed dicts for all reward signals.
feed_dict: Dict[tf.Tensor, Any] = {}
update_dict: Dict[str, tf.Tensor] = {}
update_stats: Dict[str, float] = {}
stats_needed: Dict[str, str] = {}
if reward_signal_minibatches:
self.add_reward_signal_dicts(
feed_dict,
update_dict,
stats_needed,
reward_signal_minibatches,
num_sequences,
)
update_vals = self._execute_model(feed_dict, update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
return update_stats
def add_reward_signal_dicts(
self,
feed_dict: Dict[tf.Tensor, Any],
update_dict: Dict[str, tf.Tensor],
stats_needed: Dict[str, str],
reward_signal_minibatches: Mapping[str, AgentBuffer],
num_sequences: int,
) -> None:
"""
Adds the items needed for reward signal updates to the feed_dict and stats_needed dict.
:param feed_dict: Feed dict needed update
:param update_dit: Update dict that needs update
:param stats_needed: Stats needed to get from the update.
:param reward_signal_minibatches: Minibatches to use for updating the reward signals,
indexed by name.
"""
for name, r_batch in reward_signal_minibatches.items():
feed_dict.update(
self.reward_signals[name].prepare_update(
self.policy, r_batch, num_sequences
)
)
update_dict.update(self.reward_signals[name].update_dict)
stats_needed.update(self.reward_signals[name].stats_name_to_update_name)
def _construct_feed_dict(
self, policy: TFPolicy, batch: AgentBuffer, num_sequences: int
) -> Dict[tf.Tensor, Any]:
"""
Builds the feed dict for updating the SAC model.
:param model: The model to update. May be different when, e.g. using multi-GPU.
:param batch: Mini-batch to use to update.
:param num_sequences: Number of LSTM sequences in batch.
"""
# Do an optional burn-in for memories
num_burn_in = int(self.burn_in_ratio * self.policy.sequence_length)
burn_in_mask = np.ones((self.policy.sequence_length), dtype=np.float32)
burn_in_mask[range(0, num_burn_in)] = 0
burn_in_mask = np.tile(burn_in_mask, num_sequences)
feed_dict = {
policy.batch_size_ph: num_sequences,
policy.sequence_length_ph: self.policy.sequence_length,
self.next_sequence_length_ph: self.policy.sequence_length,
self.policy.mask_input: batch["masks"] * burn_in_mask,
}
for name in self.reward_signals:
feed_dict[self.rewards_holders[name]] = batch[f"{name}_rewards"]
if self.policy.use_continuous_act:
feed_dict[self.policy_network.external_action_in] = batch[
"continuous_action"
]
else:
feed_dict[policy.output] = batch["discrete_action"]
if self.policy.use_recurrent:
feed_dict[policy.prev_action] = batch["prev_action"]
feed_dict[policy.action_masks] = batch["action_mask"]
if self.policy.use_vec_obs:
feed_dict[policy.vector_in] = batch["vector_obs"]
feed_dict[self.next_vector_in] = batch["next_vector_in"]
if self.policy.vis_obs_size > 0:
for i, _ in enumerate(policy.visual_in):
_obs = batch["visual_obs%d" % i]
feed_dict[policy.visual_in[i]] = _obs
for i, _ in enumerate(self.next_visual_in):
_obs = batch["next_visual_obs%d" % i]
feed_dict[self.next_visual_in[i]] = _obs
if self.policy.use_recurrent:
feed_dict[policy.memory_in] = [
batch["memory"][i]
for i in range(0, len(batch["memory"]), self.policy.sequence_length)
]
feed_dict[self.policy_network.memory_in] = self._make_zero_mem(
self.m_size, batch.num_experiences
)
feed_dict[self.target_network.memory_in] = self._make_zero_mem(
self.m_size // 3, batch.num_experiences
)
feed_dict[self.dones_holder] = batch["done"]
return feed_dict

8
test_constraints_min_version.txt


# pip constraints to use the *lowest* versions allowed in ml-agents/setup.py
grpcio==1.11.0
numpy==1.14.1
Pillow==4.2.1
protobuf==3.6
tensorflow==1.14.0
h5py==2.9.0
tensorboard==1.15.0

6
test_constraints_max_tf2_version.txt


# pip constraints to use the *highest* versions allowed in ml-agents/setup.py
# For projects with upper bounds, we should periodically update this list to the latest release version
grpcio>=1.23.0
numpy>=1.17.2
tensorflow==2.3.0
h5py>=2.10.0

7
test_constraints_max_tf1_version.txt


# pip constraints to use the *highest* versions allowed in ml-agents/setup.py
# with the exception of tensorflow, which is constrained to <2
# For projects with upper bounds, we should periodically update this list to the latest release version
grpcio>=1.23.0
numpy>=1.17.2
tensorflow>=1.15.2,<2.0.0
h5py>=2.10.0

/ml-agents/mlagents/tf_utils/globals.py → /ml-agents/mlagents/torch_utils/globals.py

正在加载...
取消
保存