浏览代码

Removing TensorFlow Trainers

/develop/rm-rf-new-models
vincentpierre 4 年前
当前提交
b863af57
共有 28 个文件被更改,包括 70 次插入2608 次删除
  1. 21
      ml-agents/mlagents/trainers/cli_utils.py
  2. 7
      ml-agents/mlagents/trainers/learn.py
  3. 46
      ml-agents/mlagents/trainers/ppo/trainer.py
  4. 45
      ml-agents/mlagents/trainers/sac/trainer.py
  5. 10
      ml-agents/mlagents/trainers/settings.py
  6. 2
      ml-agents/mlagents/trainers/stats.py
  7. 2
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  8. 17
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  9. 15
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  10. 4
      ml-agents/mlagents/trainers/tests/test_training_status.py
  11. 6
      ml-agents/mlagents/trainers/tests/torch/test_ghost.py
  12. 9
      ml-agents/mlagents/trainers/tests/torch/test_ppo.py
  13. 5
      ml-agents/mlagents/trainers/tests/torch/test_sac.py
  14. 5
      ml-agents/mlagents/trainers/tests/torch/test_simple_rl.py
  15. 56
      ml-agents/mlagents/trainers/trainer/rl_trainer.py
  16. 24
      ml-agents/mlagents/trainers/trainer/trainer_factory.py
  17. 12
      ml-agents/mlagents/trainers/trainer_controller.py
  18. 6
      ml-agents/mlagents/trainers/training_status.py
  19. 175
      ml-agents/mlagents/trainers/model_saver/tf_model_saver.py
  20. 166
      ml-agents/mlagents/trainers/optimizer/tf_optimizer.py
  21. 602
      ml-agents/mlagents/trainers/policy/tf_policy.py
  22. 360
      ml-agents/mlagents/trainers/ppo/optimizer_tf.py
  23. 444
      ml-agents/mlagents/trainers/sac/network.py
  24. 639
      ml-agents/mlagents/trainers/sac/optimizer_tf.py
  25. 0
      /ml-agents/mlagents/torch_utils/globals.py

21
ml-agents/mlagents/trainers/cli_utils.py


from mlagents.trainers.exception import TrainerConfigError
from mlagents_envs.environment import UnityEnvironment
import argparse
from mlagents_envs import logging_util
logger = logging_util.get_logger(__name__)
class RaiseDeprecationWarning(argparse.Action):
"""
Internal custom Action to raise warning when argument is called.
"""
def __init__(self, nargs=0, **kwargs):
super().__init__(nargs=nargs, **kwargs)
def __call__(self, arg_parser, namespace, values, option_string=None):
logger.warning(f"The command line argument {option_string} was deprecated")
class DetectDefault(argparse.Action):

argparser.add_argument(
"--torch",
default=False,
action=DetectDefaultStoreTrue,
help="Use the PyTorch framework. Note that this option is not required anymore as PyTorch is the"
action=RaiseDeprecationWarning,
help="(Deprecated) Use the PyTorch framework. Note that this option is not required anymore as PyTorch is the"
action=DetectDefaultStoreTrue,
action=RaiseDeprecationWarning,
help="(Deprecated) Use the TensorFlow framework instead of PyTorch. Install TensorFlow "
"before using this option.",
)

7
ml-agents/mlagents/trainers/learn.py


import mlagents.trainers
import mlagents_envs
from mlagents import tf_utils
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents.trainers.trainer import TrainerFactory

GaugeWriter,
ConsoleWriter,
)
from mlagents.trainers.cli_utils import parser, DetectDefault
from mlagents.trainers.cli_utils import parser
from mlagents_envs.environment import UnityEnvironment
from mlagents.trainers.settings import RunOptions

param_manager=env_parameter_manager,
init_path=maybe_init_path,
multi_gpu=False,
force_torch="torch" in DetectDefault.non_default_args,
force_tensorflow="tensorflow" in DetectDefault.non_default_args,
)
# Create controller and begin training.
tc = TrainerController(

log_level = logging_util.DEBUG
else:
log_level = logging_util.INFO
# disable noisy warnings from tensorflow
tf_utils.set_warnings_enabled(False)
logging_util.set_log_level(log_level)

46
ml-agents/mlagents/trainers/ppo/trainer.py


from mlagents.trainers.ppo.optimizer_torch import TorchPPOOptimizer
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.settings import TrainerSettings, PPOSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings, PPOSettings
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.ppo.optimizer_tf import PPOOptimizer
else:
TFPolicy = None # type: ignore
PPOOptimizer = None # type: ignore
logger = get_logger(__name__)

self._clear_update_buffer()
return True
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Creates a policy with a Tensorflow backend and PPO hyperparameters
:param parsed_behavior_id:
:param behavior_spec: specifications for policy construction
:param create_graph: whether to create the Tensorflow graph on construction
:return policy
"""
policy = TFPolicy(
self.seed,
behavior_spec,
self.trainer_settings,
condition_sigma_on_obs=False, # Faster training for PPO
create_tf_graph=create_graph,
)
return policy
def create_torch_policy(
self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec
) -> TorchPolicy:

)
return policy
def create_ppo_optimizer(self) -> PPOOptimizer:
if self.framework == FrameworkType.PYTORCH:
return TorchPPOOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
else:
return PPOOptimizer( # type: ignore
cast(TFPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def create_ppo_optimizer(self) -> TorchPPOOptimizer:
return TorchPPOOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def add_policy(
self, parsed_behavior_id: BehaviorIdentifiers, policy: Policy

45
ml-agents/mlagents/trainers/sac/trainer.py


from mlagents.trainers.sac.optimizer_torch import TorchSACOptimizer
from mlagents.trainers.trajectory import Trajectory, SplitObservations
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.settings import TrainerSettings, SACSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings, SACSettings
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.sac.optimizer_tf import SACOptimizer
else:
TFPolicy = None # type: ignore
SACOptimizer = None # type: ignore
logger = get_logger(__name__)

)
)
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Creates a policy with a Tensorflow backend and SAC hyperparameters
:param parsed_behavior_id:
:param behavior_spec: specifications for policy construction
:param create_graph: whether to create the Tensorflow graph on construction
:return policy
"""
policy = TFPolicy(
self.seed,
behavior_spec,
self.trainer_settings,
tanh_squash=True,
reparameterize=True,
create_tf_graph=create_graph,
)
self.maybe_load_replay_buffer()
return policy
def create_torch_policy(
self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec
) -> TorchPolicy:

self._stats_reporter.add_stat(stat, np.mean(stat_list))
def create_sac_optimizer(self) -> TorchSACOptimizer:
if self.framework == FrameworkType.PYTORCH:
return TorchSACOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
else:
return SACOptimizer( # type: ignore
cast(TFPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
return TorchSACOptimizer( # type: ignore
cast(TorchPolicy, self.policy), self.trainer_settings # type: ignore
) # type: ignore
def add_policy(
self, parsed_behavior_id: BehaviorIdentifiers, policy: Policy

10
ml-agents/mlagents/trainers/settings.py


return _mapping[self]
class FrameworkType(Enum):
TENSORFLOW: str = "tensorflow"
PYTORCH: str = "pytorch"
@attr.s(auto_attribs=True)
class TrainerSettings(ExportableSettings):
default_override: ClassVar[Optional["TrainerSettings"]] = None

threaded: bool = True
self_play: Optional[SelfPlaySettings] = None
behavioral_cloning: Optional[BehavioralCloningSettings] = None
framework: FrameworkType = FrameworkType.PYTORCH
cattr.register_structure_hook(
Dict[RewardSignalType, RewardSignalSettings], RewardSignalSettings.structure

d_copy.update(cattr.unstructure(TrainerSettings.default_override))
deep_update_dict(d_copy, d)
if "framework" in d_copy:
logger.warning("Framework option was deprecated but was specified")
d_copy.pop("framework", None)
for key, val in d_copy.items():
if attr.has(type(val)):

2
ml-agents/mlagents/trainers/stats.py


from mlagents_envs.logging_util import get_logger
from mlagents_envs.timers import set_gauge
from torch.utils.tensorboard import SummaryWriter
from mlagents.tf_utils.globals import get_rank
from mlagents.torch_utils.globals import get_rank
logger = get_logger(__name__)

2
ml-agents/mlagents/trainers/subprocess_env_manager.py


req: EnvironmentRequest = parent_conn.recv()
if req.cmd == EnvironmentCommand.STEP:
all_action_info = req.payload
# print("\n")
# print(brain_name, action_info.action)
if len(action_info.action) != 0:
env.set_actions(brain_name, action_info.action)
env.step()

17
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


from mlagents.trainers.trainer.rl_trainer import RLTrainer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.settings import TrainerSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings
from mlagents_envs.base_env import ActionSpec

super()._process_trajectory(trajectory)
def create_rl_trainer(framework=FrameworkType.TENSORFLOW):
def create_rl_trainer():
TrainerSettings(
max_steps=100, checkpoint_interval=10, summary_freq=20, framework=framework
),
TrainerSettings(max_steps=100, checkpoint_interval=10, summary_freq=20),
True,
False,
"mock_model_path",

assert mocked_save_model.call_count == 0
@pytest.mark.parametrize(
"framework", [FrameworkType.TENSORFLOW, FrameworkType.PYTORCH], ids=["tf", "torch"]
)
def test_summary_checkpoint(mock_add_checkpoint, mock_write_summary, framework):
trainer = create_rl_trainer(framework)
def test_summary_checkpoint(mock_add_checkpoint, mock_write_summary):
trainer = create_rl_trainer()
mock_policy = mock.Mock()
trainer.add_policy("TestBrain", mock_policy)
trajectory_queue = AgentManagerQueue("testbrain")

calls = [mock.call(trainer.brain_name, step) for step in checkpoint_range]
trainer.model_saver.save_checkpoint.assert_has_calls(calls, any_order=True)
export_ext = "nn" if trainer.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
add_checkpoint_calls = [
mock.call(

15
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


from unittest.mock import MagicMock, patch
import pytest
from mlagents.torch_utils import torch
from mlagents.tf_utils import tf
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents.trainers.ghost.controller import GhostController

@patch("numpy.random.seed")
@patch.object(tf, "set_random_seed")
@patch.object(torch, "manual_seed")
def test_initialization_seed(numpy_random_seed, tensorflow_set_seed):
seed = 27
trainer_factory_mock = MagicMock()

return tc, trainer_mock
@patch.object(tf, "reset_default_graph")
tf_reset_graph, trainer_controller_with_start_learning_mocks
trainer_controller_with_start_learning_mocks
tf_reset_graph.return_value = None
env_mock = MagicMock()
env_mock.close = MagicMock()
env_mock.reset = MagicMock()

tf_reset_graph.assert_called_once()
@patch.object(tf, "reset_default_graph")
tf_reset_graph, trainer_controller_with_start_learning_mocks
trainer_controller_with_start_learning_mocks
tf_reset_graph.return_value = None
brain_info_mock = MagicMock()
env_mock = MagicMock()

tc.start_learning(env_mock)
tf_reset_graph.assert_called_once()
env_mock.reset.assert_called_once()
assert tc.advance.call_count == trainer_mock.get_max_steps + 1
tc._save_models.assert_called_once()

4
ml-agents/mlagents/trainers/tests/test_training_status.py


version_statsmetadata = StatusMetaData(mlagents_version="test")
default_metadata.check_compatibility(version_statsmetadata)
tf_version_statsmetadata = StatusMetaData(tensorflow_version="test")
default_metadata.check_compatibility(tf_version_statsmetadata)
torch_version_statsmetadata = StatusMetaData(torch_version="test")
default_metadata.check_compatibility(torch_version_statsmetadata)
# Assert that 2 warnings have been thrown
assert len(cm.output) == 2

6
ml-agents/mlagents/trainers/tests/torch/test_ghost.py


from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
from mlagents.trainers.settings import TrainerSettings, SelfPlaySettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings, SelfPlaySettings
return TrainerSettings(
self_play=SelfPlaySettings(), framework=FrameworkType.PYTORCH
)
return TrainerSettings(self_play=SelfPlaySettings())
VECTOR_ACTION_SPACE = 1

9
ml-agents/mlagents/trainers/tests/torch/test_ppo.py


import pytest
import numpy as np
from mlagents.tf_utils import tf
import attr
from mlagents.trainers.ppo.optimizer_torch import TorchPPOOptimizer

from mlagents.trainers.settings import NetworkSettings, FrameworkType
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ( # noqa: F401; pylint: disable=unused-variable
ppo_dummy_config,
curiosity_dummy_config,

@pytest.fixture
def dummy_config():
return attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
return ppo_dummy_config()
VECTOR_ACTION_SPACE = 2

@pytest.mark.parametrize("rnn", [True, False], ids=["rnn", "no_rnn"])
def test_ppo_optimizer_update(dummy_config, rnn, visual, discrete):
# Test evaluate
tf.reset_default_graph()
optimizer = create_test_ppo_optimizer(
dummy_config, use_rnn=rnn, use_discrete=discrete, use_visual=visual
)

dummy_config, curiosity_dummy_config, rnn, visual, discrete # noqa: F811
):
# Test evaluate
tf.reset_default_graph()
dummy_config.reward_signals = curiosity_dummy_config
optimizer = create_test_ppo_optimizer(
dummy_config, use_rnn=rnn, use_discrete=discrete, use_visual=visual

def test_ppo_optimizer_update_gail(gail_dummy_config, dummy_config): # noqa: F811
# Test evaluate
dummy_config.reward_signals = gail_dummy_config
config = attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
config = ppo_dummy_config()
optimizer = create_test_ppo_optimizer(
config, use_rnn=False, use_discrete=False, use_visual=False
)

5
ml-agents/mlagents/trainers/tests/torch/test_sac.py


import pytest
from mlagents.torch_utils import torch
import attr
from mlagents.trainers.settings import NetworkSettings, FrameworkType
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ( # noqa: F401; pylint: disable=unused-variable
sac_dummy_config,
curiosity_dummy_config,

@pytest.fixture
def dummy_config():
return attr.evolve(sac_dummy_config(), framework=FrameworkType.PYTORCH)
return sac_dummy_config()
VECTOR_ACTION_SPACE = 2

5
ml-agents/mlagents/trainers/tests/torch/test_simple_rl.py


GAILSettings,
RewardSignalType,
EncoderType,
FrameworkType,
)
from mlagents_envs.communicator_objects.demonstration_meta_pb2 import (

BRAIN_NAME = "1D"
PPO_TORCH_CONFIG = attr.evolve(ppo_dummy_config(), framework=FrameworkType.PYTORCH)
SAC_TORCH_CONFIG = attr.evolve(sac_dummy_config(), framework=FrameworkType.PYTORCH)
PPO_TORCH_CONFIG = ppo_dummy_config()
SAC_TORCH_CONFIG = sac_dummy_config()
@pytest.mark.parametrize("use_discrete", [True, False])

56
ml-agents/mlagents/trainers/trainer/rl_trainer.py


from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.settings import TrainerSettings, FrameworkType
from mlagents.trainers.settings import TrainerSettings
from mlagents.trainers.exception import UnityTrainerException
from mlagents import tf_utils
if tf_utils.is_available():
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.model_saver.tf_model_saver import TFModelSaver
else:
TFPolicy = None # type: ignore
TFModelSaver = None # type: ignore
logger = get_logger(__name__)

self._stats_reporter.add_property(
StatsPropertyType.HYPERPARAMETERS, self.trainer_settings.as_dict()
)
self.framework = self.trainer_settings.framework
if self.framework == FrameworkType.TENSORFLOW and not tf_utils.is_available():
raise UnityTrainerException(
"To use the TensorFlow backend, install the TensorFlow Python package first."
)
logger.debug(f"Using framework {self.framework.value}")
self.framework, self.trainer_settings, self.artifact_path, self.load
self.trainer_settings, self.artifact_path, self.load
)
def end_episode(self) -> None:

behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> Policy:
if self.framework == FrameworkType.PYTORCH:
return self.create_torch_policy(parsed_behavior_id, behavior_spec)
else:
return self.create_tf_policy(
parsed_behavior_id, behavior_spec, create_graph=create_graph
)
return self.create_torch_policy(parsed_behavior_id, behavior_spec)
@abc.abstractmethod
def create_torch_policy(

"""
pass
@abc.abstractmethod
def create_tf_policy(
self,
parsed_behavior_id: BehaviorIdentifiers,
behavior_spec: BehaviorSpec,
create_graph: bool = False,
) -> TFPolicy:
"""
Create a Policy object that uses the TensorFlow backend.
"""
pass
framework: str, trainer_settings: TrainerSettings, model_path: str, load: bool
trainer_settings: TrainerSettings, model_path: str, load: bool
if framework == FrameworkType.PYTORCH:
model_saver = TorchModelSaver( # type: ignore
trainer_settings, model_path, load
)
else:
model_saver = TFModelSaver( # type: ignore
trainer_settings, model_path, load
)
model_saver = TorchModelSaver( # type: ignore
trainer_settings, model_path, load
)
return model_saver
def _policy_mean_reward(self) -> Optional[float]:

"Trainer has multiple policies, but default behavior only saves the first."
)
checkpoint_path = self.model_saver.save_checkpoint(self.brain_name, self.step)
export_ext = "nn" if self.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
new_checkpoint = ModelCheckpoint(
int(self.step),
f"{checkpoint_path}.{export_ext}",

model_checkpoint = self._checkpoint()
self.model_saver.copy_final_model(model_checkpoint.file_path)
export_ext = "nn" if self.framework == FrameworkType.TENSORFLOW else "onnx"
export_ext = "onnx"
final_checkpoint = attr.evolve(
model_checkpoint, file_path=f"{self.model_saver.model_path}.{export_ext}"
)

24
ml-agents/mlagents/trainers/trainer/trainer_factory.py


from mlagents.trainers.sac.trainer import SACTrainer
from mlagents.trainers.ghost.trainer import GhostTrainer
from mlagents.trainers.ghost.controller import GhostController
from mlagents.trainers.settings import TrainerSettings, TrainerType, FrameworkType
from mlagents.trainers.settings import TrainerSettings, TrainerType
logger = get_logger(__name__)

param_manager: EnvironmentParameterManager,
init_path: str = None,
multi_gpu: bool = False,
force_torch: bool = False,
force_tensorflow: bool = False,
):
"""
The TrainerFactory generates the Trainers based on the configuration passed as

the EnvironmentParameters must change.
:param init_path: Path from which to load model.
:param multi_gpu: If True, multi-gpu will be used. (currently not available)
:param force_torch: If True, the Trainers will all use the PyTorch framework
instead of what is specified in the config YAML.
:param force_tensorflow: If True, thee Trainers will all use the TensorFlow
framework.
"""
self.trainer_config = trainer_config
self.output_path = output_path

self.param_manager = param_manager
self.multi_gpu = multi_gpu
self.ghost_controller = GhostController()
self._force_torch = force_torch
self._force_tf = force_tensorflow
def generate(self, behavior_name: str) -> Trainer:
if behavior_name not in self.trainer_config.keys():

)
trainer_settings = self.trainer_config[behavior_name]
if self._force_torch:
trainer_settings.framework = FrameworkType.PYTORCH
logger.warning(
"Note that specifying --torch is not required anymore as PyTorch is the default framework."
)
if self._force_tf:
trainer_settings.framework = FrameworkType.TENSORFLOW
logger.warning(
"Setting the framework to TensorFlow. TensorFlow trainers will be deprecated in the future."
)
if self._force_torch:
logger.warning(
"Both --torch and --tensorflow CLI options were specified. Using TensorFlow."
)
return TrainerFactory._initialize_trainer(
trainer_settings,
behavior_name,

12
ml-agents/mlagents/trainers/trainer_controller.py


from collections import defaultdict
import numpy as np
from mlagents.tf_utils import tf
from mlagents import tf_utils
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.env_manager import EnvManager, EnvironmentStep

from mlagents.trainers.trainer import TrainerFactory
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
from mlagents.trainers.agent_processor import AgentManager
from mlagents.tf_utils.globals import get_rank
from mlagents.torch_utils.globals import get_rank
class TrainerController:

self.trainer_threads: List[threading.Thread] = []
self.kill_trainers = False
np.random.seed(training_seed)
if tf_utils.is_available():
tf.set_random_seed(training_seed)
torch_utils.torch.manual_seed(training_seed)
self.rank = get_rank()

self.trainer_threads.append(trainerthread)
policy = trainer.create_policy(
parsed_behavior_id, env_manager.training_behaviors[name_behavior_id]
parsed_behavior_id,
env_manager.training_behaviors[name_behavior_id],
create_graph=True,
)
trainer.add_policy(parsed_behavior_id, policy)

@timed
def start_learning(self, env_manager: EnvManager) -> None:
self._create_output_path(self.output_path)
if tf_utils.is_available():
tf.reset_default_graph()
try:
# Initial reset
self._reset_env(env_manager)

6
ml-agents/mlagents/trainers/training_status.py


import cattr
from mlagents.torch_utils import torch
from mlagents.tf_utils import tf, is_available as tf_is_available
from mlagents_envs.logging_util import get_logger
from mlagents.trainers import __version__
from mlagents.trainers.exception import TrainerError

stats_format_version: str = STATUS_FORMAT_VERSION
mlagents_version: str = __version__
torch_version: str = torch.__version__
tensorflow_version: str = tf.__version__ if tf_is_available() else -1
def to_dict(self) -> Dict[str, str]:
return cattr.unstructure(self)

if self.mlagents_version != other.mlagents_version:
logger.warning(
"Checkpoint was loaded from a different version of ML-Agents. Some things may not resume properly."
)
if self.tensorflow_version != other.tensorflow_version:
logger.warning(
"Tensorflow checkpoint was saved with a different version of Tensorflow. Model may not resume properly."
)
if self.torch_version != other.torch_version:
logger.warning(

175
ml-agents/mlagents/trainers/model_saver/tf_model_saver.py


import os
import shutil
from typing import Optional, Union, cast
from mlagents_envs.exception import UnityPolicyException
from mlagents_envs.logging_util import get_logger
from mlagents.tf_utils import tf
from mlagents.trainers.model_saver.model_saver import BaseModelSaver
from mlagents.trainers.tf.model_serialization import export_policy_model
from mlagents.trainers.settings import TrainerSettings, SerializationSettings
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers import __version__
logger = get_logger(__name__)
class TFModelSaver(BaseModelSaver):
"""
ModelSaver class for TensorFlow
"""
def __init__(
self, trainer_settings: TrainerSettings, model_path: str, load: bool = False
):
super().__init__()
self.model_path = model_path
self.initialize_path = trainer_settings.init_path
self._keep_checkpoints = trainer_settings.keep_checkpoints
self.load = load
# Currently only support saving one policy. This is the one to be saved.
self.policy: Optional[TFPolicy] = None
self.graph = None
self.sess = None
self.tf_saver = None
def register(self, module: Union[TFPolicy, TFOptimizer]) -> None:
if isinstance(module, TFPolicy):
self._register_policy(module)
elif isinstance(module, TFOptimizer):
self._register_optimizer(module)
else:
raise UnityPolicyException(
"Registering Object of unsupported type {} to Saver ".format(
type(module)
)
)
def _register_policy(self, policy: TFPolicy) -> None:
if self.policy is None:
self.policy = policy
self.graph = self.policy.graph
self.sess = self.policy.sess
with self.policy.graph.as_default():
self.tf_saver = tf.train.Saver(max_to_keep=self._keep_checkpoints)
def save_checkpoint(self, behavior_name: str, step: int) -> str:
checkpoint_path = os.path.join(self.model_path, f"{behavior_name}-{step}")
# Save the TF checkpoint and graph definition
if self.graph:
with self.graph.as_default():
if self.tf_saver:
self.tf_saver.save(self.sess, f"{checkpoint_path}.ckpt")
tf.train.write_graph(
self.graph, self.model_path, "raw_graph_def.pb", as_text=False
)
# also save the policy so we have optimized model files for each checkpoint
self.export(checkpoint_path, behavior_name)
return checkpoint_path
def export(self, output_filepath: str, behavior_name: str) -> None:
# save model if there is only one worker or
# only on worker-0 if there are multiple workers
if self.policy and self.policy.rank is not None and self.policy.rank != 0:
return
if self.graph is None:
logger.info("No model to export")
return
export_policy_model(
self.model_path, output_filepath, behavior_name, self.graph, self.sess
)
def initialize_or_load(self, policy: Optional[TFPolicy] = None) -> None:
# If there is an initialize path, load from that. Else, load from the set model path.
# If load is set to True, don't reset steps to 0. Else, do. This allows a user to,
# e.g., resume from an initialize path.
if policy is None:
policy = self.policy
policy = cast(TFPolicy, policy)
reset_steps = not self.load
if self.initialize_path is not None:
self._load_graph(
policy, self.initialize_path, reset_global_steps=reset_steps
)
elif self.load:
self._load_graph(policy, self.model_path, reset_global_steps=reset_steps)
else:
policy.initialize()
TFPolicy.broadcast_global_variables(0)
def _load_graph(
self, policy: TFPolicy, model_path: str, reset_global_steps: bool = False
) -> None:
# This prevents normalizer init up from executing on load
policy.first_normalization_update = False
with policy.graph.as_default():
logger.info(f"Loading model from {model_path}.")
ckpt = tf.train.get_checkpoint_state(model_path)
if ckpt is None:
raise UnityPolicyException(
"The model {} could not be loaded. Make "
"sure you specified the right "
"--run-id and that the previous run you are loading from had the same "
"behavior names.".format(model_path)
)
if self.tf_saver:
try:
self.tf_saver.restore(policy.sess, ckpt.model_checkpoint_path)
except tf.errors.NotFoundError:
raise UnityPolicyException(
"The model {} was found but could not be loaded. Make "
"sure the model is from the same version of ML-Agents, has the same behavior parameters, "
"and is using the same trainer configuration as the current run.".format(
model_path
)
)
self._check_model_version(__version__)
if reset_global_steps:
policy.set_step(0)
logger.info(
"Starting training from step 0 and saving to {}.".format(
self.model_path
)
)
else:
logger.info(f"Resuming training from step {policy.get_current_step()}.")
def _check_model_version(self, version: str) -> None:
"""
Checks whether the model being loaded was created with the same version of
ML-Agents, and throw a warning if not so.
"""
if self.policy is not None and self.policy.version_tensors is not None:
loaded_ver = tuple(
num.eval(session=self.sess) for num in self.policy.version_tensors
)
if loaded_ver != TFPolicy._convert_version_string(version):
logger.warning(
f"The model checkpoint you are loading from was saved with ML-Agents version "
f"{loaded_ver[0]}.{loaded_ver[1]}.{loaded_ver[2]} but your current ML-Agents"
f"version is {version}. Model may not behave properly."
)
def copy_final_model(self, source_nn_path: str) -> None:
"""
Copy the .nn file at the given source to the destination.
Also copies the corresponding .onnx file if it exists.
"""
final_model_name = os.path.splitext(source_nn_path)[0]
if SerializationSettings.convert_to_barracuda:
source_path = f"{final_model_name}.nn"
destination_path = f"{self.model_path}.nn"
shutil.copyfile(source_path, destination_path)
logger.info(f"Copied {source_path} to {destination_path}.")
if SerializationSettings.convert_to_onnx:
try:
source_path = f"{final_model_name}.onnx"
destination_path = f"{self.model_path}.onnx"
shutil.copyfile(source_path, destination_path)
logger.info(f"Copied {source_path} to {destination_path}.")
except OSError:
pass

166
ml-agents/mlagents/trainers/optimizer/tf_optimizer.py


from typing import Dict, Any, List, Tuple, Optional
import numpy as np
from mlagents.tf_utils.tf import tf
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer import Optimizer
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.tf.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)
from mlagents.trainers.settings import TrainerSettings, RewardSignalType
from mlagents.trainers.tf.components.bc.module import BCModule
class TFOptimizer(Optimizer): # pylint: disable=W0223
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
super().__init__()
self.sess = policy.sess
self.policy = policy
self.update_dict: Dict[str, tf.Tensor] = {}
self.value_heads: Dict[str, tf.Tensor] = {}
self.create_reward_signals(trainer_params.reward_signals)
self.memory_in: tf.Tensor = None
self.memory_out: tf.Tensor = None
self.m_size: int = 0
self.bc_module: Optional[BCModule] = None
# Create pretrainer if needed
if trainer_params.behavioral_cloning is not None:
self.bc_module = BCModule(
self.policy,
trainer_params.behavioral_cloning,
policy_learning_rate=trainer_params.hyperparameters.learning_rate,
default_batch_size=trainer_params.hyperparameters.batch_size,
default_num_epoch=3,
)
def get_trajectory_value_estimates(
self, batch: AgentBuffer, next_obs: List[np.ndarray], done: bool
) -> Tuple[Dict[str, np.ndarray], Dict[str, float]]:
feed_dict: Dict[tf.Tensor, Any] = {
self.policy.batch_size_ph: batch.num_experiences,
self.policy.sequence_length_ph: batch.num_experiences, # We want to feed data in batch-wise, not time-wise.
}
if self.policy.vec_obs_size > 0:
feed_dict[self.policy.vector_in] = batch["vector_obs"]
if self.policy.vis_obs_size > 0:
for i in range(len(self.policy.visual_in)):
_obs = batch["visual_obs%d" % i]
feed_dict[self.policy.visual_in[i]] = _obs
if self.policy.use_recurrent:
feed_dict[self.policy.memory_in] = [
np.zeros((self.policy.m_size), dtype=np.float32)
]
feed_dict[self.memory_in] = [np.zeros((self.m_size), dtype=np.float32)]
if self.policy.prev_action is not None:
feed_dict[self.policy.prev_action] = batch["prev_action"]
if self.policy.use_recurrent:
value_estimates, policy_mem, value_mem = self.sess.run(
[self.value_heads, self.policy.memory_out, self.memory_out], feed_dict
)
prev_action = (
batch["actions"][-1] if not self.policy.use_continuous_act else None
)
else:
value_estimates = self.sess.run(self.value_heads, feed_dict)
prev_action = None
policy_mem = None
value_mem = None
value_estimates = {k: np.squeeze(v, axis=1) for k, v in value_estimates.items()}
# We do this in a separate step to feed the memory outs - a further optimization would
# be to append to the obs before running sess.run.
final_value_estimates = self._get_value_estimates(
next_obs, done, policy_mem, value_mem, prev_action
)
return value_estimates, final_value_estimates
def _get_value_estimates(
self,
next_obs: List[np.ndarray],
done: bool,
policy_memory: np.ndarray = None,
value_memory: np.ndarray = None,
prev_action: np.ndarray = None,
) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param experience: AgentExperience to be used for bootstrapping.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.policy.batch_size_ph: 1,
self.policy.sequence_length_ph: 1,
}
vec_vis_obs = SplitObservations.from_observations(next_obs)
for i in range(len(vec_vis_obs.visual_observations)):
feed_dict[self.policy.visual_in[i]] = [vec_vis_obs.visual_observations[i]]
if self.policy.vec_obs_size > 0:
feed_dict[self.policy.vector_in] = [vec_vis_obs.vector_observations]
if policy_memory is not None:
feed_dict[self.policy.memory_in] = policy_memory
if value_memory is not None:
feed_dict[self.memory_in] = value_memory
if prev_action is not None:
feed_dict[self.policy.prev_action] = [prev_action]
value_estimates = self.sess.run(self.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates
def create_reward_signals(
self, reward_signal_configs: Dict[RewardSignalType, Any]
) -> None:
"""
Create reward signals
:param reward_signal_configs: Reward signal config.
"""
# Create reward signals
for reward_signal, settings in reward_signal_configs.items():
# Name reward signals by string in case we have duplicates later
self.reward_signals[reward_signal.value] = create_reward_signal(
self.policy, reward_signal, settings
)
self.update_dict.update(
self.reward_signals[reward_signal.value].update_dict
)
@classmethod
def create_optimizer_op(
cls, learning_rate: tf.Tensor, name: str = "Adam"
) -> tf.train.Optimizer:
return tf.train.AdamOptimizer(learning_rate=learning_rate, name=name)
def _execute_model(
self, feed_dict: Dict[tf.Tensor, np.ndarray], out_dict: Dict[str, tf.Tensor]
) -> Dict[str, np.ndarray]:
"""
Executes model.
:param feed_dict: Input dictionary mapping nodes to input data.
:param out_dict: Output dictionary mapping names to nodes.
:return: Dictionary mapping names to input data.
"""
network_out = self.sess.run(list(out_dict.values()), feed_dict=feed_dict)
run_out = dict(zip(list(out_dict.keys()), network_out))
return run_out
def _make_zero_mem(self, m_size: int, length: int) -> List[np.ndarray]:
return [
np.zeros((m_size), dtype=np.float32)
for i in range(0, length, self.policy.sequence_length)
]

602
ml-agents/mlagents/trainers/policy/tf_policy.py


from typing import Any, Dict, List, Optional, Tuple, Callable
import numpy as np
from distutils.version import LooseVersion
from mlagents_envs.timers import timed
from mlagents.tf_utils import tf
from mlagents import tf_utils
from mlagents_envs.exception import UnityException
from mlagents_envs.base_env import BehaviorSpec
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.policy import Policy
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.behavior_id_utils import get_global_agent_id
from mlagents_envs.base_env import DecisionSteps
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.settings import TrainerSettings, EncoderType
from mlagents.trainers import __version__
from mlagents.trainers.tf.distributions import (
GaussianDistribution,
MultiCategoricalDistribution,
)
from mlagents.tf_utils.globals import get_rank
logger = get_logger(__name__)
# This is the version number of the inputs and outputs of the model, and
# determines compatibility with inference in Barracuda.
MODEL_FORMAT_VERSION = 2
EPSILON = 1e-6 # Small value to avoid divide by zero
class UnityPolicyException(UnityException):
"""
Related to errors with the Trainer.
"""
pass
class TFPolicy(Policy):
"""
Contains a learning model, and the necessary
functions to save/load models and create the input placeholders.
"""
# Callback function used at the start of training to synchronize weights.
# By default, this nothing.
# If this needs to be used, it should be done from outside ml-agents.
broadcast_global_variables: Callable[[int], None] = lambda root_rank: None
def __init__(
self,
seed: int,
behavior_spec: BehaviorSpec,
trainer_settings: TrainerSettings,
tanh_squash: bool = False,
reparameterize: bool = False,
condition_sigma_on_obs: bool = True,
create_tf_graph: bool = True,
):
"""
Initialized the policy.
:param seed: Random seed to use for TensorFlow.
:param brain: The corresponding Brain for this policy.
:param trainer_settings: The trainer parameters.
"""
super().__init__(
seed,
behavior_spec,
trainer_settings,
tanh_squash,
reparameterize,
condition_sigma_on_obs,
)
# for ghost trainer save/load snapshots
self.assign_phs: List[tf.Tensor] = []
self.assign_ops: List[tf.Operation] = []
self.update_dict: Dict[str, tf.Tensor] = {}
self.inference_dict: Dict[str, tf.Tensor] = {}
self.first_normalization_update: bool = False
self.graph = tf.Graph()
self.sess = tf.Session(
config=tf_utils.generate_session_config(), graph=self.graph
)
self._initialize_tensorflow_references()
self.grads = None
self.update_batch: Optional[tf.Operation] = None
self.trainable_variables: List[tf.Variable] = []
self.rank = get_rank()
if create_tf_graph:
self.create_tf_graph()
def get_trainable_variables(self) -> List[tf.Variable]:
"""
Returns a List of the trainable variables in this policy. if create_tf_graph hasn't been called,
returns empty list.
"""
return self.trainable_variables
def create_tf_graph(self) -> None:
"""
Builds the tensorflow graph needed for this policy.
"""
with self.graph.as_default():
tf.set_random_seed(self.seed)
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
if len(_vars) > 0:
# We assume the first thing created in the graph is the Policy. If
# already populated, don't create more tensors.
return
self.create_input_placeholders()
encoded = self._create_encoder(
self.visual_in,
self.processed_vector_in,
self.h_size,
self.num_layers,
self.vis_encode_type,
)
if self.use_continuous_act:
self._create_cc_actor(
encoded,
self.tanh_squash,
self.reparameterize,
self.condition_sigma_on_obs,
)
else:
self._create_dc_actor(encoded)
self.trainable_variables = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, scope="policy"
)
self.trainable_variables += tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, scope="lstm"
) # LSTMs need to be root scope for Barracuda export
self.inference_dict = {
"action": self.output,
"log_probs": self.all_log_probs,
"entropy": self.entropy,
}
if self.use_continuous_act:
self.inference_dict["pre_action"] = self.output_pre
if self.use_recurrent:
self.inference_dict["memory_out"] = self.memory_out
# We do an initialize to make the Policy usable out of the box. If an optimizer is needed,
# it will re-load the full graph
self.initialize()
# Create assignment ops for Ghost Trainer
self.init_load_weights()
def _create_encoder(
self,
visual_in: List[tf.Tensor],
vector_in: tf.Tensor,
h_size: int,
num_layers: int,
vis_encode_type: EncoderType,
) -> tf.Tensor:
"""
Creates an encoder for visual and vector observations.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
:return: The hidden layer (tf.Tensor) after the encoder.
"""
with tf.variable_scope("policy"):
encoded = ModelUtils.create_observation_streams(
self.visual_in,
self.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
return encoded
@staticmethod
def _convert_version_string(version_string: str) -> Tuple[int, ...]:
"""
Converts the version string into a Tuple of ints (major_ver, minor_ver, patch_ver).
:param version_string: The semantic-versioned version string (X.Y.Z).
:return: A Tuple containing (major_ver, minor_ver, patch_ver).
"""
ver = LooseVersion(version_string)
return tuple(map(int, ver.version[0:3]))
def initialize(self):
with self.graph.as_default():
init = tf.global_variables_initializer()
self.sess.run(init)
def get_weights(self):
with self.graph.as_default():
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
values = [v.eval(session=self.sess) for v in _vars]
return values
def init_load_weights(self):
with self.graph.as_default():
_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
values = [v.eval(session=self.sess) for v in _vars]
for var, value in zip(_vars, values):
assign_ph = tf.placeholder(var.dtype, shape=value.shape)
self.assign_phs.append(assign_ph)
self.assign_ops.append(tf.assign(var, assign_ph))
def load_weights(self, values):
if len(self.assign_ops) == 0:
logger.warning(
"Calling load_weights in tf_policy but assign_ops is empty. Did you forget to call init_load_weights?"
)
with self.graph.as_default():
feed_dict = {}
for assign_ph, value in zip(self.assign_phs, values):
feed_dict[assign_ph] = value
self.sess.run(self.assign_ops, feed_dict=feed_dict)
@timed
def evaluate(
self, decision_requests: DecisionSteps, global_agent_ids: List[str]
) -> Dict[str, Any]:
"""
Evaluates policy for the agent experiences provided.
:param decision_requests: DecisionSteps object containing inputs.
:param global_agent_ids: The global (with worker ID) agent ids of the data in the batched_step_result.
:return: Outputs from network as defined by self.inference_dict.
"""
feed_dict = {
self.batch_size_ph: len(decision_requests),
self.sequence_length_ph: 1,
}
if self.use_recurrent:
if not self.use_continuous_act:
feed_dict[self.prev_action] = self.retrieve_previous_action(
global_agent_ids
)
feed_dict[self.memory_in] = self.retrieve_memories(global_agent_ids)
feed_dict = self.fill_eval_dict(feed_dict, decision_requests)
run_out = self._execute_model(feed_dict, self.inference_dict)
return run_out
def get_action(
self, decision_requests: DecisionSteps, worker_id: int = 0
) -> ActionInfo:
"""
Decides actions given observations information, and takes them in environment.
:param decision_requests: A dictionary of brain names and DecisionSteps from environment.
:param worker_id: In parallel environment training, the unique id of the environment worker that
the DecisionSteps came from. Used to construct a globally unique id for each agent.
:return: an ActionInfo containing action, memories, values and an object
to be passed to add experiences
"""
if len(decision_requests) == 0:
return ActionInfo.empty()
global_agent_ids = [
get_global_agent_id(worker_id, int(agent_id))
for agent_id in decision_requests.agent_id
] # For 1-D array, the iterator order is correct.
run_out = self.evaluate( # pylint: disable=assignment-from-no-return
decision_requests, global_agent_ids
)
self.save_memories(global_agent_ids, run_out.get("memory_out"))
self.check_nan_action(run_out.get("action"))
return ActionInfo(
action=run_out.get("action"),
value=run_out.get("value"),
outputs=run_out,
agent_ids=decision_requests.agent_id,
)
def update(self, mini_batch, num_sequences):
"""
Performs update of the policy.
:param num_sequences: Number of experience trajectories in batch.
:param mini_batch: Batch of experiences.
:return: Results of update.
"""
raise UnityPolicyException("The update function was not implemented.")
def _execute_model(self, feed_dict, out_dict):
"""
Executes model.
:param feed_dict: Input dictionary mapping nodes to input data.
:param out_dict: Output dictionary mapping names to nodes.
:return: Dictionary mapping names to input data.
"""
network_out = self.sess.run(list(out_dict.values()), feed_dict=feed_dict)
run_out = dict(zip(list(out_dict.keys()), network_out))
return run_out
def fill_eval_dict(self, feed_dict, batched_step_result):
vec_vis_obs = SplitObservations.from_observations(batched_step_result.obs)
for i, _ in enumerate(vec_vis_obs.visual_observations):
feed_dict[self.visual_in[i]] = vec_vis_obs.visual_observations[i]
if self.use_vec_obs:
feed_dict[self.vector_in] = vec_vis_obs.vector_observations
if not self.use_continuous_act:
mask = np.ones(
(
len(batched_step_result),
sum(self.behavior_spec.action_spec.discrete_branches),
),
dtype=np.float32,
)
if batched_step_result.action_mask is not None:
mask = 1 - np.concatenate(batched_step_result.action_mask, axis=1)
feed_dict[self.action_masks] = mask
return feed_dict
def get_current_step(self):
"""
Gets current model step.
:return: current model step.
"""
step = self.sess.run(self.global_step)
return step
def set_step(self, step: int) -> int:
"""
Sets current model step to step without creating additional ops.
:param step: Step to set the current model step to.
:return: The step the model was set to.
"""
current_step = self.get_current_step()
# Increment a positive or negative number of steps.
return self.increment_step(step - current_step)
def increment_step(self, n_steps):
"""
Increments model step.
"""
out_dict = {
"global_step": self.global_step,
"increment_step": self.increment_step_op,
}
feed_dict = {self.steps_to_increment: n_steps}
return self.sess.run(out_dict, feed_dict=feed_dict)["global_step"]
def get_inference_vars(self):
"""
:return:list of inference var names
"""
return list(self.inference_dict.keys())
def get_update_vars(self):
"""
:return:list of update var names
"""
return list(self.update_dict.keys())
def update_normalization(self, vector_obs: np.ndarray) -> None:
"""
If this policy normalizes vector observations, this will update the norm values in the graph.
:param vector_obs: The vector observations to add to the running estimate of the distribution.
"""
if self.use_vec_obs and self.normalize:
if self.first_normalization_update:
self.sess.run(
self.init_normalization_op, feed_dict={self.vector_in: vector_obs}
)
self.first_normalization_update = False
else:
self.sess.run(
self.update_normalization_op, feed_dict={self.vector_in: vector_obs}
)
@property
def use_vis_obs(self):
return self.vis_obs_size > 0
@property
def use_vec_obs(self):
return self.vec_obs_size > 0
def _initialize_tensorflow_references(self):
self.value_heads: Dict[str, tf.Tensor] = {}
self.normalization_steps: Optional[tf.Variable] = None
self.running_mean: Optional[tf.Variable] = None
self.running_variance: Optional[tf.Variable] = None
self.init_normalization_op: Optional[tf.Operation] = None
self.update_normalization_op: Optional[tf.Operation] = None
self.value: Optional[tf.Tensor] = None
self.all_log_probs: tf.Tensor = None
self.total_log_probs: Optional[tf.Tensor] = None
self.entropy: Optional[tf.Tensor] = None
self.output_pre: Optional[tf.Tensor] = None
self.output: Optional[tf.Tensor] = None
self.selected_actions: tf.Tensor = None
self.action_masks: Optional[tf.Tensor] = None
self.prev_action: Optional[tf.Tensor] = None
self.memory_in: Optional[tf.Tensor] = None
self.memory_out: Optional[tf.Tensor] = None
self.version_tensors: Optional[Tuple[tf.Tensor, tf.Tensor, tf.Tensor]] = None
def create_input_placeholders(self):
with self.graph.as_default():
(
self.global_step,
self.increment_step_op,
self.steps_to_increment,
) = ModelUtils.create_global_steps()
self.vector_in, self.visual_in = ModelUtils.create_input_placeholders(
self.behavior_spec.observation_shapes
)
if self.normalize:
self.first_normalization_update = True
normalization_tensors = ModelUtils.create_normalizer(self.vector_in)
self.update_normalization_op = normalization_tensors.update_op
self.init_normalization_op = normalization_tensors.init_op
self.normalization_steps = normalization_tensors.steps
self.running_mean = normalization_tensors.running_mean
self.running_variance = normalization_tensors.running_variance
self.processed_vector_in = ModelUtils.normalize_vector_obs(
self.vector_in,
self.running_mean,
self.running_variance,
self.normalization_steps,
)
else:
self.processed_vector_in = self.vector_in
self.update_normalization_op = None
self.batch_size_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="batch_size"
)
self.sequence_length_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="sequence_length"
)
self.mask_input = tf.placeholder(
shape=[None], dtype=tf.float32, name="masks"
)
# Only needed for PPO, but needed for BC module
self.epsilon = tf.placeholder(
shape=[None, self.act_size[0]], dtype=tf.float32, name="epsilon"
)
self.mask = tf.cast(self.mask_input, tf.int32)
tf.Variable(
int(self.behavior_spec.action_spec.is_continuous()),
name="is_continuous_control",
trainable=False,
dtype=tf.int32,
)
int_version = TFPolicy._convert_version_string(__version__)
major_ver_t = tf.Variable(
int_version[0],
name="trainer_major_version",
trainable=False,
dtype=tf.int32,
)
minor_ver_t = tf.Variable(
int_version[1],
name="trainer_minor_version",
trainable=False,
dtype=tf.int32,
)
patch_ver_t = tf.Variable(
int_version[2],
name="trainer_patch_version",
trainable=False,
dtype=tf.int32,
)
self.version_tensors = (major_ver_t, minor_ver_t, patch_ver_t)
tf.Variable(
MODEL_FORMAT_VERSION,
name="version_number",
trainable=False,
dtype=tf.int32,
)
tf.Variable(
self.m_size, name="memory_size", trainable=False, dtype=tf.int32
)
if self.behavior_spec.action_spec.is_continuous():
tf.Variable(
self.act_size[0],
name="action_output_shape",
trainable=False,
dtype=tf.int32,
)
else:
tf.Variable(
sum(self.act_size),
name="action_output_shape",
trainable=False,
dtype=tf.int32,
)
def _create_cc_actor(
self,
encoded: tf.Tensor,
tanh_squash: bool = False,
reparameterize: bool = False,
condition_sigma_on_obs: bool = True,
) -> None:
"""
Creates Continuous control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
:param tanh_squash: Whether to use a tanh function, or a clipped output.
:param reparameterize: Whether we are using the resampling trick to update the policy.
"""
if self.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder(
encoded, self.memory_in, self.sequence_length_ph, name="lstm_policy"
)
self.memory_out = tf.identity(memory_policy_out, name="recurrent_out")
else:
hidden_policy = encoded
with tf.variable_scope("policy"):
distribution = GaussianDistribution(
hidden_policy,
self.act_size,
reparameterize=reparameterize,
tanh_squash=tanh_squash,
condition_sigma=condition_sigma_on_obs,
)
if tanh_squash:
self.output_pre = distribution.sample
self.output = tf.identity(self.output_pre, name="action")
else:
self.output_pre = distribution.sample
# Clip and scale output to ensure actions are always within [-1, 1] range.
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3
self.output = tf.identity(output_post, name="action")
self.selected_actions = tf.stop_gradient(self.output)
self.all_log_probs = tf.identity(distribution.log_probs, name="action_probs")
self.entropy = distribution.entropy
# We keep these tensors the same name, but use new nodes to keep code parallelism with discrete control.
self.total_log_probs = distribution.total_log_probs
def _create_dc_actor(self, encoded: tf.Tensor) -> None:
"""
Creates Discrete control actor-critic model.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: Type of visual encoder to use if visual input.
"""
if self.use_recurrent:
self.prev_action = tf.placeholder(
shape=[None, len(self.act_size)], dtype=tf.int32, name="prev_action"
)
prev_action_oh = tf.concat(
[
tf.one_hot(self.prev_action[:, i], self.act_size[i])
for i in range(len(self.act_size))
],
axis=1,
)
hidden_policy = tf.concat([encoded, prev_action_oh], axis=1)
self.memory_in = tf.placeholder(
shape=[None, self.m_size], dtype=tf.float32, name="recurrent_in"
)
hidden_policy, memory_policy_out = ModelUtils.create_recurrent_encoder(
hidden_policy,
self.memory_in,
self.sequence_length_ph,
name="lstm_policy",
)
self.memory_out = tf.identity(memory_policy_out, "recurrent_out")
else:
hidden_policy = encoded
self.action_masks = tf.placeholder(
shape=[None, sum(self.act_size)], dtype=tf.float32, name="action_masks"
)
with tf.variable_scope("policy"):
distribution = MultiCategoricalDistribution(
hidden_policy, self.act_size, self.action_masks
)
# It's important that we are able to feed_dict a value into this tensor to get the
# right one-hot encoding, so we can't do identity on it.
self.output = distribution.sample
self.all_log_probs = tf.identity(distribution.log_probs, name="action")
self.selected_actions = tf.stop_gradient(
distribution.sample_onehot
) # In discrete, these are onehot
self.entropy = distribution.entropy
self.total_log_probs = distribution.total_log_probs

360
ml-agents/mlagents/trainers/ppo/optimizer_tf.py


from typing import Optional, Any, Dict, cast
import numpy as np
from mlagents.tf_utils import tf
from mlagents_envs.timers import timed
from mlagents.trainers.tf.models import ModelUtils, EncoderType
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.settings import TrainerSettings, PPOSettings
class PPOOptimizer(TFOptimizer):
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
"""
Takes a Policy and a Dict of trainer parameters and creates an Optimizer around the policy.
The PPO optimizer has a value estimator and a loss function.
:param policy: A TFPolicy object that will be updated by this PPO Optimizer.
:param trainer_params: Trainer parameters dictionary that specifies the properties of the trainer.
"""
# Create the graph here to give more granular control of the TF graph to the Optimizer.
policy.create_tf_graph()
with policy.graph.as_default():
with tf.variable_scope("optimizer/"):
super().__init__(policy, trainer_params)
hyperparameters: PPOSettings = cast(
PPOSettings, trainer_params.hyperparameters
)
lr = float(hyperparameters.learning_rate)
self._schedule = hyperparameters.learning_rate_schedule
epsilon = float(hyperparameters.epsilon)
beta = float(hyperparameters.beta)
max_step = float(trainer_params.max_steps)
policy_network_settings = policy.network_settings
h_size = int(policy_network_settings.hidden_units)
num_layers = policy_network_settings.num_layers
vis_encode_type = policy_network_settings.vis_encode_type
self.burn_in_ratio = 0.0
self.stream_names = list(self.reward_signals.keys())
self.tf_optimizer_op: Optional[tf.train.Optimizer] = None
self.grads = None
self.update_batch: Optional[tf.Operation] = None
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
"Policy/Learning Rate": "learning_rate",
"Policy/Epsilon": "decay_epsilon",
"Policy/Beta": "decay_beta",
}
if self.policy.use_recurrent:
self.m_size = self.policy.m_size
self.memory_in = tf.placeholder(
shape=[None, self.m_size],
dtype=tf.float32,
name="recurrent_value_in",
)
if num_layers < 1:
num_layers = 1
if policy.use_continuous_act:
self._create_cc_critic(h_size, num_layers, vis_encode_type)
else:
self._create_dc_critic(h_size, num_layers, vis_encode_type)
self.learning_rate = ModelUtils.create_schedule(
self._schedule,
lr,
self.policy.global_step,
int(max_step),
min_value=1e-10,
)
self._create_losses(
self.policy.total_log_probs,
self.old_log_probs,
self.value_heads,
self.policy.entropy,
beta,
epsilon,
lr,
max_step,
)
self._create_ppo_optimizer_ops()
self.update_dict.update(
{
"value_loss": self.value_loss,
"policy_loss": self.abs_policy_loss,
"update_batch": self.update_batch,
"learning_rate": self.learning_rate,
"decay_epsilon": self.decay_epsilon,
"decay_beta": self.decay_beta,
}
)
def _create_cc_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Continuous control critic (value) network.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: The type of visual encoder to use.
"""
hidden_stream = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
if self.policy.use_recurrent:
hidden_value, memory_value_out = ModelUtils.create_recurrent_encoder(
hidden_stream,
self.memory_in,
self.policy.sequence_length_ph,
name="lstm_value",
)
self.memory_out = memory_value_out
else:
hidden_value = hidden_stream
self.value_heads, self.value = ModelUtils.create_value_heads(
self.stream_names, hidden_value
)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.policy.act_size)],
dtype=tf.float32,
name="old_probabilities",
)
self.old_log_probs = tf.reduce_sum(
(tf.identity(self.all_old_log_probs)), axis=1, keepdims=True
)
def _create_dc_critic(
self, h_size: int, num_layers: int, vis_encode_type: EncoderType
) -> None:
"""
Creates Discrete control critic (value) network.
:param h_size: Size of hidden linear layers.
:param num_layers: Number of hidden linear layers.
:param vis_encode_type: The type of visual encoder to use.
"""
hidden_stream = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
h_size,
num_layers,
vis_encode_type,
)[0]
if self.policy.use_recurrent:
hidden_value, memory_value_out = ModelUtils.create_recurrent_encoder(
hidden_stream,
self.memory_in,
self.policy.sequence_length_ph,
name="lstm_value",
)
self.memory_out = memory_value_out
else:
hidden_value = hidden_stream
self.value_heads, self.value = ModelUtils.create_value_heads(
self.stream_names, hidden_value
)
self.all_old_log_probs = tf.placeholder(
shape=[None, sum(self.policy.act_size)],
dtype=tf.float32,
name="old_probabilities",
)
# Break old log log_probs into separate branches
old_log_prob_branches = ModelUtils.break_into_branches(
self.all_old_log_probs, self.policy.act_size
)
_, _, old_normalized_logits = ModelUtils.create_discrete_action_masking_layer(
old_log_prob_branches, self.policy.action_masks, self.policy.act_size
)
action_idx = [0] + list(np.cumsum(self.policy.act_size))
self.old_log_probs = tf.reduce_sum(
(
tf.stack(
[
-tf.nn.softmax_cross_entropy_with_logits_v2(
labels=self.policy.selected_actions[
:, action_idx[i] : action_idx[i + 1]
],
logits=old_normalized_logits[
:, action_idx[i] : action_idx[i + 1]
],
)
for i in range(len(self.policy.act_size))
],
axis=1,
)
),
axis=1,
keepdims=True,
)
def _create_losses(
self, probs, old_probs, value_heads, entropy, beta, epsilon, lr, max_step
):
"""
Creates training-specific Tensorflow ops for PPO models.
:param probs: Current policy probabilities
:param old_probs: Past policy probabilities
:param value_heads: Value estimate tensors from each value stream
:param beta: Entropy regularization strength
:param entropy: Current policy entropy
:param epsilon: Value for policy-divergence threshold
:param lr: Learning rate
:param max_step: Total number of training steps.
"""
self.returns_holders = {}
self.old_values = {}
for name in value_heads.keys():
returns_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_returns"
)
old_value = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_value_estimate"
)
self.returns_holders[name] = returns_holder
self.old_values[name] = old_value
self.advantage = tf.placeholder(
shape=[None], dtype=tf.float32, name="advantages"
)
advantage = tf.expand_dims(self.advantage, -1)
self.decay_epsilon = ModelUtils.create_schedule(
self._schedule, epsilon, self.policy.global_step, max_step, min_value=0.1
)
self.decay_beta = ModelUtils.create_schedule(
self._schedule, beta, self.policy.global_step, max_step, min_value=1e-5
)
value_losses = []
for name, head in value_heads.items():
clipped_value_estimate = self.old_values[name] + tf.clip_by_value(
tf.reduce_sum(head, axis=1) - self.old_values[name],
-self.decay_epsilon,
self.decay_epsilon,
)
v_opt_a = tf.squared_difference(
self.returns_holders[name], tf.reduce_sum(head, axis=1)
)
v_opt_b = tf.squared_difference(
self.returns_holders[name], clipped_value_estimate
)
value_loss = tf.reduce_mean(
tf.dynamic_partition(tf.maximum(v_opt_a, v_opt_b), self.policy.mask, 2)[
1
]
)
value_losses.append(value_loss)
self.value_loss = tf.reduce_mean(value_losses)
r_theta = tf.exp(probs - old_probs)
p_opt_a = r_theta * advantage
p_opt_b = (
tf.clip_by_value(
r_theta, 1.0 - self.decay_epsilon, 1.0 + self.decay_epsilon
)
* advantage
)
self.policy_loss = -tf.reduce_mean(
tf.dynamic_partition(tf.minimum(p_opt_a, p_opt_b), self.policy.mask, 2)[1]
)
# For cleaner stats reporting
self.abs_policy_loss = tf.abs(self.policy_loss)
self.loss = (
self.policy_loss
+ 0.5 * self.value_loss
- self.decay_beta
* tf.reduce_mean(tf.dynamic_partition(entropy, self.policy.mask, 2)[1])
)
def _create_ppo_optimizer_ops(self):
self.tf_optimizer_op = self.create_optimizer_op(self.learning_rate)
self.grads = self.tf_optimizer_op.compute_gradients(self.loss)
self.update_batch = self.tf_optimizer_op.minimize(self.loss)
@timed
def update(self, batch: AgentBuffer, num_sequences: int) -> Dict[str, float]:
"""
Performs update on model.
:param mini_batch: Batch of experiences.
:param num_sequences: Number of sequences to process.
:return: Results of update.
"""
feed_dict = self._construct_feed_dict(batch, num_sequences)
stats_needed = self.stats_name_to_update_name
update_stats = {}
# Collect feed dicts for all reward signals.
for _, reward_signal in self.reward_signals.items():
feed_dict.update(
reward_signal.prepare_update(self.policy, batch, num_sequences)
)
stats_needed.update(reward_signal.stats_name_to_update_name)
update_vals = self._execute_model(feed_dict, self.update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
return update_stats
def _construct_feed_dict(
self, mini_batch: AgentBuffer, num_sequences: int
) -> Dict[tf.Tensor, Any]:
# Do an optional burn-in for memories
num_burn_in = int(self.burn_in_ratio * self.policy.sequence_length)
burn_in_mask = np.ones((self.policy.sequence_length), dtype=np.float32)
burn_in_mask[range(0, num_burn_in)] = 0
burn_in_mask = np.tile(burn_in_mask, num_sequences)
feed_dict = {
self.policy.batch_size_ph: num_sequences,
self.policy.sequence_length_ph: self.policy.sequence_length,
self.policy.mask_input: mini_batch["masks"] * burn_in_mask,
self.advantage: mini_batch["advantages"],
self.all_old_log_probs: mini_batch["action_probs"],
}
for name in self.reward_signals:
feed_dict[self.returns_holders[name]] = mini_batch[f"{name}_returns"]
feed_dict[self.old_values[name]] = mini_batch[f"{name}_value_estimates"]
if self.policy.output_pre is not None and "actions_pre" in mini_batch:
feed_dict[self.policy.output_pre] = mini_batch["actions_pre"]
else:
feed_dict[self.policy.output] = mini_batch["actions"]
if self.policy.use_recurrent:
feed_dict[self.policy.prev_action] = mini_batch["prev_action"]
feed_dict[self.policy.action_masks] = mini_batch["action_mask"]
if "vector_obs" in mini_batch:
feed_dict[self.policy.vector_in] = mini_batch["vector_obs"]
if self.policy.vis_obs_size > 0:
for i, _ in enumerate(self.policy.visual_in):
feed_dict[self.policy.visual_in[i]] = mini_batch["visual_obs%d" % i]
if self.policy.use_recurrent:
feed_dict[self.policy.memory_in] = [
mini_batch["memory"][i]
for i in range(
0, len(mini_batch["memory"]), self.policy.sequence_length
)
]
feed_dict[self.memory_in] = self._make_zero_mem(
self.m_size, mini_batch.num_experiences
)
return feed_dict

444
ml-agents/mlagents/trainers/sac/network.py


from typing import Dict, Optional
from mlagents.tf_utils import tf
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.settings import EncoderType
LOG_STD_MAX = 2
LOG_STD_MIN = -20
EPSILON = 1e-6 # Small value to avoid divide by zero
DISCRETE_TARGET_ENTROPY_SCALE = 0.2 # Roughly equal to e-greedy 0.05
CONTINUOUS_TARGET_ENTROPY_SCALE = 1.0 # TODO: Make these an optional hyperparam.
POLICY_SCOPE = ""
TARGET_SCOPE = "target_network"
class SACNetwork:
"""
Base class for an SAC network. Implements methods for creating the actor and critic heads.
"""
def __init__(
self,
policy=None,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
self.normalize = normalize
self.use_recurrent = use_recurrent
self.num_layers = num_layers
self.stream_names = stream_names
self.h_size = h_size
self.activ_fn = ModelUtils.swish
self.sequence_length_ph = tf.placeholder(
shape=None, dtype=tf.int32, name="sac_sequence_length"
)
self.policy_memory_in: Optional[tf.Tensor] = None
self.policy_memory_out: Optional[tf.Tensor] = None
self.value_memory_in: Optional[tf.Tensor] = None
self.value_memory_out: Optional[tf.Tensor] = None
self.q1: Optional[tf.Tensor] = None
self.q2: Optional[tf.Tensor] = None
self.q1_p: Optional[tf.Tensor] = None
self.q2_p: Optional[tf.Tensor] = None
self.q1_memory_in: Optional[tf.Tensor] = None
self.q2_memory_in: Optional[tf.Tensor] = None
self.q1_memory_out: Optional[tf.Tensor] = None
self.q2_memory_out: Optional[tf.Tensor] = None
self.prev_action: Optional[tf.Tensor] = None
self.action_masks: Optional[tf.Tensor] = None
self.external_action_in: Optional[tf.Tensor] = None
self.log_sigma_sq: Optional[tf.Tensor] = None
self.entropy: Optional[tf.Tensor] = None
self.deterministic_output: Optional[tf.Tensor] = None
self.normalized_logprobs: Optional[tf.Tensor] = None
self.action_probs: Optional[tf.Tensor] = None
self.output_oh: Optional[tf.Tensor] = None
self.output_pre: Optional[tf.Tensor] = None
self.value_vars = None
self.q_vars = None
self.critic_vars = None
self.policy_vars = None
self.q1_heads: Dict[str, tf.Tensor] = None
self.q2_heads: Dict[str, tf.Tensor] = None
self.q1_pheads: Dict[str, tf.Tensor] = None
self.q2_pheads: Dict[str, tf.Tensor] = None
self.policy = policy
def get_vars(self, scope):
return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope)
def join_scopes(self, scope_1, scope_2):
"""
Joins two scopes. Does so safetly (i.e., if one of the two scopes doesn't
exist, don't add any backslashes)
"""
if not scope_1:
return scope_2
if not scope_2:
return scope_1
else:
return "/".join(filter(None, [scope_1, scope_2]))
def create_value_heads(self, stream_names, hidden_input):
"""
Creates one value estimator head for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
"""
self.value_heads = {}
for name in stream_names:
value = tf.layers.dense(hidden_input, 1, name=f"{name}_value")
self.value_heads[name] = value
self.value = tf.reduce_mean(list(self.value_heads.values()), 0)
def _create_cc_critic(self, hidden_value, scope, create_qs=True):
"""
Creates just the critic network
"""
scope = self.join_scopes(scope, "critic")
self.create_sac_value_head(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "value"),
)
self.external_action_in = tf.placeholder(
shape=[None, self.policy.act_size[0]],
dtype=tf.float32,
name="external_action_in",
)
self.value_vars = self.get_vars(self.join_scopes(scope, "value"))
if create_qs:
hidden_q = tf.concat([hidden_value, self.external_action_in], axis=-1)
hidden_qp = tf.concat([hidden_value, self.policy.output], axis=-1)
self.q1_heads, self.q2_heads, self.q1, self.q2 = self.create_q_heads(
self.stream_names,
hidden_q,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
)
self.q1_pheads, self.q2_pheads, self.q1_p, self.q2_p = self.create_q_heads(
self.stream_names,
hidden_qp,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
reuse=True,
)
self.q_vars = self.get_vars(self.join_scopes(scope, "q"))
self.critic_vars = self.get_vars(scope)
def _create_dc_critic(self, hidden_value, scope, create_qs=True):
"""
Creates just the critic network
"""
scope = self.join_scopes(scope, "critic")
self.create_sac_value_head(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "value"),
)
self.value_vars = self.get_vars("/".join([scope, "value"]))
if create_qs:
self.q1_heads, self.q2_heads, self.q1, self.q2 = self.create_q_heads(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
num_outputs=sum(self.policy.act_size),
)
self.q1_pheads, self.q2_pheads, self.q1_p, self.q2_p = self.create_q_heads(
self.stream_names,
hidden_value,
self.num_layers,
self.h_size,
self.join_scopes(scope, "q"),
reuse=True,
num_outputs=sum(self.policy.act_size),
)
self.q_vars = self.get_vars(scope)
self.critic_vars = self.get_vars(scope)
def create_sac_value_head(
self, stream_names, hidden_input, num_layers, h_size, scope
):
"""
Creates one value estimator head for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
:param num_layers: Number of hidden layers for value network
:param h_size: size of hidden layers for value network
:param scope: TF scope for value network.
"""
with tf.variable_scope(scope):
value_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "encoder", False
)
if self.use_recurrent:
value_hidden, memory_out = ModelUtils.create_recurrent_encoder(
value_hidden,
self.value_memory_in,
self.sequence_length_ph,
name="lstm_value",
)
self.value_memory_out = memory_out
self.create_value_heads(stream_names, value_hidden)
def create_q_heads(
self,
stream_names,
hidden_input,
num_layers,
h_size,
scope,
reuse=False,
num_outputs=1,
):
"""
Creates two q heads for each reward signal in stream_names.
Also creates the node corresponding to the mean of all the value heads in self.value.
self.value_head is a dictionary of stream name to node containing the value estimator head for that signal.
:param stream_names: The list of reward signal names
:param hidden_input: The last layer of the Critic. The heads will consist of one dense hidden layer on top
of the hidden input.
:param num_layers: Number of hidden layers for Q network
:param h_size: size of hidden layers for Q network
:param scope: TF scope for Q network.
:param reuse: Whether or not to reuse variables. Useful for creating Q of policy.
:param num_outputs: Number of outputs of each Q function. If discrete, equal to number of actions.
"""
with tf.variable_scope(self.join_scopes(scope, "q1_encoding"), reuse=reuse):
q1_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q1_encoder", reuse
)
if self.use_recurrent:
q1_hidden, memory_out = ModelUtils.create_recurrent_encoder(
q1_hidden,
self.q1_memory_in,
self.sequence_length_ph,
name="lstm_q1",
)
self.q1_memory_out = memory_out
q1_heads = {}
for name in stream_names:
_q1 = tf.layers.dense(q1_hidden, num_outputs, name=f"{name}_q1")
q1_heads[name] = _q1
q1 = tf.reduce_mean(list(q1_heads.values()), axis=0)
with tf.variable_scope(self.join_scopes(scope, "q2_encoding"), reuse=reuse):
q2_hidden = ModelUtils.create_vector_observation_encoder(
hidden_input, h_size, self.activ_fn, num_layers, "q2_encoder", reuse
)
if self.use_recurrent:
q2_hidden, memory_out = ModelUtils.create_recurrent_encoder(
q2_hidden,
self.q2_memory_in,
self.sequence_length_ph,
name="lstm_q2",
)
self.q2_memory_out = memory_out
q2_heads = {}
for name in stream_names:
_q2 = tf.layers.dense(q2_hidden, num_outputs, name=f"{name}_q2")
q2_heads[name] = _q2
q2 = tf.reduce_mean(list(q2_heads.values()), axis=0)
return q1_heads, q2_heads, q1, q2
class SACTargetNetwork(SACNetwork):
"""
Instantiation for the SAC target network. Only contains a single
value estimator and is updated from the Policy Network.
"""
def __init__(
self,
policy,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
super().__init__(
policy,
m_size,
h_size,
normalize,
use_recurrent,
num_layers,
stream_names,
vis_encode_type,
)
with tf.variable_scope(TARGET_SCOPE):
self.vector_in, self.visual_in = ModelUtils.create_input_placeholders(
self.policy.behavior_spec.observation_shapes
)
if self.policy.normalize:
normalization_tensors = ModelUtils.create_normalizer(self.vector_in)
self.update_normalization_op = normalization_tensors.update_op
self.normalization_steps = normalization_tensors.steps
self.running_mean = normalization_tensors.running_mean
self.running_variance = normalization_tensors.running_variance
self.processed_vector_in = ModelUtils.normalize_vector_obs(
self.vector_in,
self.running_mean,
self.running_variance,
self.normalization_steps,
)
else:
self.processed_vector_in = self.vector_in
self.update_normalization_op = None
if self.policy.use_recurrent:
self.memory_in = tf.placeholder(
shape=[None, m_size], dtype=tf.float32, name="target_recurrent_in"
)
self.value_memory_in = self.memory_in
hidden_streams = ModelUtils.create_observation_streams(
self.visual_in,
self.processed_vector_in,
1,
self.h_size,
0,
vis_encode_type=vis_encode_type,
stream_scopes=["critic/value/"],
)
if self.policy.use_continuous_act:
self._create_cc_critic(hidden_streams[0], TARGET_SCOPE, create_qs=False)
else:
self._create_dc_critic(hidden_streams[0], TARGET_SCOPE, create_qs=False)
if self.use_recurrent:
self.memory_out = tf.concat(
self.value_memory_out, axis=1
) # Needed for Barracuda to work
def copy_normalization(self, mean, variance, steps):
"""
Copies the mean, variance, and steps into the normalizers of the
input of this SACNetwork. Used to copy the normalizer from the policy network
to the target network.
param mean: Tensor containing the mean.
param variance: Tensor containing the variance
param steps: Tensor containing the number of steps.
"""
update_mean = tf.assign(self.running_mean, mean)
update_variance = tf.assign(self.running_variance, variance)
update_norm_step = tf.assign(self.normalization_steps, steps)
return tf.group([update_mean, update_variance, update_norm_step])
class SACPolicyNetwork(SACNetwork):
"""
Instantiation for SAC policy network. Contains a dual Q estimator,
a value estimator, and a reference to the actual policy network.
"""
def __init__(
self,
policy,
m_size=None,
h_size=128,
normalize=False,
use_recurrent=False,
num_layers=2,
stream_names=None,
vis_encode_type=EncoderType.SIMPLE,
):
super().__init__(
policy,
m_size,
h_size,
normalize,
use_recurrent,
num_layers,
stream_names,
vis_encode_type,
)
if self.policy.use_recurrent:
self._create_memory_ins(m_size)
hidden_critic = self._create_observation_in(vis_encode_type)
# Use the sequence length of the policy
self.sequence_length_ph = self.policy.sequence_length_ph
if self.policy.use_continuous_act:
self._create_cc_critic(hidden_critic, POLICY_SCOPE)
else:
self._create_dc_critic(hidden_critic, POLICY_SCOPE)
if self.use_recurrent:
mem_outs = [self.value_memory_out, self.q1_memory_out, self.q2_memory_out]
self.memory_out = tf.concat(mem_outs, axis=1)
def _create_memory_ins(self, m_size):
"""
Creates the memory input placeholders for LSTM.
:param m_size: the total size of the memory.
"""
self.memory_in = tf.placeholder(
shape=[None, m_size * 3], dtype=tf.float32, name="value_recurrent_in"
)
# Re-break-up for each network
num_mems = 3
input_size = self.memory_in.get_shape().as_list()[1]
mem_ins = []
for i in range(num_mems):
_start = input_size // num_mems * i
_end = input_size // num_mems * (i + 1)
mem_ins.append(self.memory_in[:, _start:_end])
self.value_memory_in = mem_ins[0]
self.q1_memory_in = mem_ins[1]
self.q2_memory_in = mem_ins[2]
def _create_observation_in(self, vis_encode_type):
"""
Creates the observation inputs, and a CNN if needed,
:param vis_encode_type: Type of CNN encoder.
:param share_ac_cnn: Whether or not to share the actor and critic CNNs.
:return A tuple of (hidden_policy, hidden_critic). We don't save it to self since they're used
once and thrown away.
"""
with tf.variable_scope(POLICY_SCOPE):
hidden_streams = ModelUtils.create_observation_streams(
self.policy.visual_in,
self.policy.processed_vector_in,
1,
self.h_size,
0,
vis_encode_type=vis_encode_type,
stream_scopes=["critic/value/"],
)
hidden_critic = hidden_streams[0]
return hidden_critic

639
ml-agents/mlagents/trainers/sac/optimizer_tf.py


import numpy as np
from typing import Dict, List, Optional, Any, Mapping, cast
from mlagents.tf_utils import tf
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.sac.network import SACPolicyNetwork, SACTargetNetwork
from mlagents.trainers.tf.models import ModelUtils
from mlagents.trainers.optimizer.tf_optimizer import TFOptimizer
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.buffer import AgentBuffer
from mlagents_envs.timers import timed
from mlagents.trainers.settings import TrainerSettings, SACSettings
EPSILON = 1e-6 # Small value to avoid divide by zero
logger = get_logger(__name__)
POLICY_SCOPE = ""
TARGET_SCOPE = "target_network"
class SACOptimizer(TFOptimizer):
def __init__(self, policy: TFPolicy, trainer_params: TrainerSettings):
"""
Takes a Unity environment and model-specific hyper-parameters and returns the
appropriate PPO agent model for the environment.
:param brain: Brain parameters used to generate specific network graph.
:param lr: Learning rate.
:param lr_schedule: Learning rate decay schedule.
:param h_size: Size of hidden layers
:param init_entcoef: Initial value for entropy coefficient. Set lower to learn faster,
set higher to explore more.
:return: a sub-class of PPOAgent tailored to the environment.
:param max_step: Total number of training steps.
:param normalize: Whether to normalize vector observation input.
:param use_recurrent: Whether to use an LSTM layer in the network.
:param num_layers: Number of hidden layers between encoded input and policy & value layers
:param tau: Strength of soft-Q update.
:param m_size: Size of brain memory.
"""
# Create the graph here to give more granular control of the TF graph to the Optimizer.
policy.create_tf_graph()
with policy.graph.as_default():
with tf.variable_scope(""):
super().__init__(policy, trainer_params)
hyperparameters: SACSettings = cast(
SACSettings, trainer_params.hyperparameters
)
lr = hyperparameters.learning_rate
lr_schedule = hyperparameters.learning_rate_schedule
max_step = trainer_params.max_steps
self.tau = hyperparameters.tau
self.init_entcoef = hyperparameters.init_entcoef
self.policy = policy
self.act_size = policy.act_size
policy_network_settings = policy.network_settings
h_size = policy_network_settings.hidden_units
num_layers = policy_network_settings.num_layers
vis_encode_type = policy_network_settings.vis_encode_type
self.tau = hyperparameters.tau
self.burn_in_ratio = 0.0
# Non-exposed SAC parameters
self.discrete_target_entropy_scale = (
0.2 # Roughly equal to e-greedy 0.05
)
self.continuous_target_entropy_scale = 1.0
stream_names = list(self.reward_signals.keys())
# Use to reduce "survivor bonus" when using Curiosity or GAIL.
self.gammas = [
_val.gamma for _val in trainer_params.reward_signals.values()
]
self.use_dones_in_backup = {
name: tf.Variable(1.0) for name in stream_names
}
self.disable_use_dones = {
name: self.use_dones_in_backup[name].assign(0.0)
for name in stream_names
}
if num_layers < 1:
num_layers = 1
self.target_init_op: List[tf.Tensor] = []
self.target_update_op: List[tf.Tensor] = []
self.update_batch_policy: Optional[tf.Operation] = None
self.update_batch_value: Optional[tf.Operation] = None
self.update_batch_entropy: Optional[tf.Operation] = None
self.policy_network = SACPolicyNetwork(
policy=self.policy,
m_size=self.policy.m_size, # 3x policy.m_size
h_size=h_size,
normalize=self.policy.normalize,
use_recurrent=self.policy.use_recurrent,
num_layers=num_layers,
stream_names=stream_names,
vis_encode_type=vis_encode_type,
)
self.target_network = SACTargetNetwork(
policy=self.policy,
m_size=self.policy.m_size, # 1x policy.m_size
h_size=h_size,
normalize=self.policy.normalize,
use_recurrent=self.policy.use_recurrent,
num_layers=num_layers,
stream_names=stream_names,
vis_encode_type=vis_encode_type,
)
# The optimizer's m_size is 3 times the policy (Q1, Q2, and Value)
self.m_size = 3 * self.policy.m_size
self._create_inputs_and_outputs()
self.learning_rate = ModelUtils.create_schedule(
lr_schedule,
lr,
self.policy.global_step,
int(max_step),
min_value=1e-10,
)
self._create_losses(
self.policy_network.q1_heads,
self.policy_network.q2_heads,
lr,
int(max_step),
stream_names,
discrete=not self.policy.use_continuous_act,
)
self._create_sac_optimizer_ops()
self.selected_actions = (
self.policy.selected_actions
) # For GAIL and other reward signals
if self.policy.normalize:
target_update_norm = self.target_network.copy_normalization(
self.policy.running_mean,
self.policy.running_variance,
self.policy.normalization_steps,
)
# Update the normalization of the optimizer when the policy does.
self.policy.update_normalization_op = tf.group(
[self.policy.update_normalization_op, target_update_norm]
)
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
"Losses/Q1 Loss": "q1_loss",
"Losses/Q2 Loss": "q2_loss",
"Policy/Entropy Coeff": "entropy_coef",
"Policy/Learning Rate": "learning_rate",
}
self.update_dict = {
"value_loss": self.total_value_loss,
"policy_loss": self.policy_loss,
"q1_loss": self.q1_loss,
"q2_loss": self.q2_loss,
"entropy_coef": self.ent_coef,
"update_batch": self.update_batch_policy,
"update_value": self.update_batch_value,
"update_entropy": self.update_batch_entropy,
"learning_rate": self.learning_rate,
}
def _create_inputs_and_outputs(self) -> None:
"""
Assign the higher-level SACModel's inputs and outputs to those of its policy or
target network.
"""
self.vector_in = self.policy.vector_in
self.visual_in = self.policy.visual_in
self.next_vector_in = self.target_network.vector_in
self.next_visual_in = self.target_network.visual_in
self.sequence_length_ph = self.policy.sequence_length_ph
self.next_sequence_length_ph = self.target_network.sequence_length_ph
if not self.policy.use_continuous_act:
self.action_masks = self.policy_network.action_masks
else:
self.output_pre = self.policy_network.output_pre
# Don't use value estimate during inference.
self.value = tf.identity(
self.policy_network.value, name="value_estimate_unused"
)
self.value_heads = self.policy_network.value_heads
self.dones_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name="dones_holder"
)
if self.policy.use_recurrent:
self.memory_in = self.policy_network.memory_in
self.memory_out = self.policy_network.memory_out
if not self.policy.use_continuous_act:
self.prev_action = self.policy_network.prev_action
self.next_memory_in = self.target_network.memory_in
def _create_losses(
self,
q1_streams: Dict[str, tf.Tensor],
q2_streams: Dict[str, tf.Tensor],
lr: tf.Tensor,
max_step: int,
stream_names: List[str],
discrete: bool = False,
) -> None:
"""
Creates training-specific Tensorflow ops for SAC models.
:param q1_streams: Q1 streams from policy network
:param q1_streams: Q2 streams from policy network
:param lr: Learning rate
:param max_step: Total number of training steps.
:param stream_names: List of reward stream names.
:param discrete: Whether or not to use discrete action losses.
"""
if discrete:
self.target_entropy = [
self.discrete_target_entropy_scale * np.log(i).astype(np.float32)
for i in self.act_size
]
discrete_action_probs = tf.exp(self.policy.all_log_probs)
per_action_entropy = discrete_action_probs * self.policy.all_log_probs
else:
self.target_entropy = (
-1
* self.continuous_target_entropy_scale
* np.prod(self.act_size[0]).astype(np.float32)
)
self.rewards_holders = {}
self.min_policy_qs = {}
for name in stream_names:
if discrete:
_branched_mpq1 = ModelUtils.break_into_branches(
self.policy_network.q1_pheads[name] * discrete_action_probs,
self.act_size,
)
branched_mpq1 = tf.stack(
[
tf.reduce_sum(_br, axis=1, keep_dims=True)
for _br in _branched_mpq1
]
)
_q1_p_mean = tf.reduce_mean(branched_mpq1, axis=0)
_branched_mpq2 = ModelUtils.break_into_branches(
self.policy_network.q2_pheads[name] * discrete_action_probs,
self.act_size,
)
branched_mpq2 = tf.stack(
[
tf.reduce_sum(_br, axis=1, keep_dims=True)
for _br in _branched_mpq2
]
)
_q2_p_mean = tf.reduce_mean(branched_mpq2, axis=0)
self.min_policy_qs[name] = tf.minimum(_q1_p_mean, _q2_p_mean)
else:
self.min_policy_qs[name] = tf.minimum(
self.policy_network.q1_pheads[name],
self.policy_network.q2_pheads[name],
)
rewards_holder = tf.placeholder(
shape=[None], dtype=tf.float32, name=f"{name}_rewards"
)
self.rewards_holders[name] = rewards_holder
q1_losses = []
q2_losses = []
# Multiple q losses per stream
expanded_dones = tf.expand_dims(self.dones_holder, axis=-1)
for i, name in enumerate(stream_names):
_expanded_rewards = tf.expand_dims(self.rewards_holders[name], axis=-1)
q_backup = tf.stop_gradient(
_expanded_rewards
+ (1.0 - self.use_dones_in_backup[name] * expanded_dones)
* self.gammas[i]
* self.target_network.value_heads[name]
)
if discrete:
# We need to break up the Q functions by branch, and update them individually.
branched_q1_stream = ModelUtils.break_into_branches(
self.policy.selected_actions * q1_streams[name], self.act_size
)
branched_q2_stream = ModelUtils.break_into_branches(
self.policy.selected_actions * q2_streams[name], self.act_size
)
# Reduce each branch into scalar
branched_q1_stream = [
tf.reduce_sum(_branch, axis=1, keep_dims=True)
for _branch in branched_q1_stream
]
branched_q2_stream = [
tf.reduce_sum(_branch, axis=1, keep_dims=True)
for _branch in branched_q2_stream
]
q1_stream = tf.reduce_mean(branched_q1_stream, axis=0)
q2_stream = tf.reduce_mean(branched_q2_stream, axis=0)
else:
q1_stream = q1_streams[name]
q2_stream = q2_streams[name]
_q1_loss = 0.5 * tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(q_backup, q1_stream)
)
_q2_loss = 0.5 * tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(q_backup, q2_stream)
)
q1_losses.append(_q1_loss)
q2_losses.append(_q2_loss)
self.q1_loss = tf.reduce_mean(q1_losses)
self.q2_loss = tf.reduce_mean(q2_losses)
# Learn entropy coefficient
if discrete:
# Create a log_ent_coef for each branch
self.log_ent_coef = tf.get_variable(
"log_ent_coef",
dtype=tf.float32,
initializer=np.log([self.init_entcoef] * len(self.act_size)).astype(
np.float32
),
trainable=True,
)
else:
self.log_ent_coef = tf.get_variable(
"log_ent_coef",
dtype=tf.float32,
initializer=np.log(self.init_entcoef).astype(np.float32),
trainable=True,
)
self.ent_coef = tf.exp(self.log_ent_coef)
if discrete:
# We also have to do a different entropy and target_entropy per branch.
branched_per_action_ent = ModelUtils.break_into_branches(
per_action_entropy, self.act_size
)
branched_ent_sums = tf.stack(
[
tf.reduce_sum(_lp, axis=1, keep_dims=True) + _te
for _lp, _te in zip(branched_per_action_ent, self.target_entropy)
],
axis=1,
)
self.entropy_loss = -tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.reduce_mean(
self.log_ent_coef
* tf.squeeze(tf.stop_gradient(branched_ent_sums), axis=2),
axis=1,
)
)
# Same with policy loss, we have to do the loss per branch and average them,
# so that larger branches don't get more weight.
# The equivalent KL divergence from Eq 10 of Haarnoja et al. is also pi*log(pi) - Q
branched_q_term = ModelUtils.break_into_branches(
discrete_action_probs * self.policy_network.q1_p, self.act_size
)
branched_policy_loss = tf.stack(
[
tf.reduce_sum(self.ent_coef[i] * _lp - _qt, axis=1, keep_dims=True)
for i, (_lp, _qt) in enumerate(
zip(branched_per_action_ent, branched_q_term)
)
]
)
self.policy_loss = tf.reduce_mean(
tf.to_float(self.policy.mask) * tf.squeeze(branched_policy_loss)
)
# Do vbackup entropy bonus per branch as well.
branched_ent_bonus = tf.stack(
[
tf.reduce_sum(self.ent_coef[i] * _lp, axis=1, keep_dims=True)
for i, _lp in enumerate(branched_per_action_ent)
]
)
value_losses = []
for name in stream_names:
v_backup = tf.stop_gradient(
self.min_policy_qs[name]
- tf.reduce_mean(branched_ent_bonus, axis=0)
)
value_losses.append(
0.5
* tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(
self.policy_network.value_heads[name], v_backup
)
)
)
else:
self.entropy_loss = -tf.reduce_mean(
self.log_ent_coef
* tf.to_float(self.policy.mask)
* tf.stop_gradient(
tf.reduce_sum(
self.policy.all_log_probs + self.target_entropy,
axis=1,
keep_dims=True,
)
)
)
batch_policy_loss = tf.reduce_mean(
self.ent_coef * self.policy.all_log_probs - self.policy_network.q1_p,
axis=1,
)
self.policy_loss = tf.reduce_mean(
tf.to_float(self.policy.mask) * batch_policy_loss
)
value_losses = []
for name in stream_names:
v_backup = tf.stop_gradient(
self.min_policy_qs[name]
- tf.reduce_sum(self.ent_coef * self.policy.all_log_probs, axis=1)
)
value_losses.append(
0.5
* tf.reduce_mean(
tf.to_float(self.policy.mask)
* tf.squared_difference(
self.policy_network.value_heads[name], v_backup
)
)
)
self.value_loss = tf.reduce_mean(value_losses)
self.total_value_loss = self.q1_loss + self.q2_loss + self.value_loss
self.entropy = self.policy_network.entropy
def _create_sac_optimizer_ops(self) -> None:
"""
Creates the Adam optimizers and update ops for SAC, including
the policy, value, and entropy updates, as well as the target network update.
"""
policy_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_policy_opt"
)
entropy_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_entropy_opt"
)
value_optimizer = self.create_optimizer_op(
learning_rate=self.learning_rate, name="sac_value_opt"
)
self.target_update_op = [
tf.assign(target, (1 - self.tau) * target + self.tau * source)
for target, source in zip(
self.target_network.value_vars, self.policy_network.value_vars
)
]
logger.debug("value_vars")
self.print_all_vars(self.policy_network.value_vars)
logger.debug("targvalue_vars")
self.print_all_vars(self.target_network.value_vars)
logger.debug("critic_vars")
self.print_all_vars(self.policy_network.critic_vars)
logger.debug("q_vars")
self.print_all_vars(self.policy_network.q_vars)
logger.debug("policy_vars")
policy_vars = self.policy.get_trainable_variables()
self.print_all_vars(policy_vars)
self.target_init_op = [
tf.assign(target, source)
for target, source in zip(
self.target_network.value_vars, self.policy_network.value_vars
)
]
self.update_batch_policy = policy_optimizer.minimize(
self.policy_loss, var_list=policy_vars
)
# Make sure policy is updated first, then value, then entropy.
with tf.control_dependencies([self.update_batch_policy]):
self.update_batch_value = value_optimizer.minimize(
self.total_value_loss, var_list=self.policy_network.critic_vars
)
# Add entropy coefficient optimization operation
with tf.control_dependencies([self.update_batch_value]):
self.update_batch_entropy = entropy_optimizer.minimize(
self.entropy_loss, var_list=self.log_ent_coef
)
def print_all_vars(self, variables):
for _var in variables:
logger.debug(_var)
@timed
def update(self, batch: AgentBuffer, num_sequences: int) -> Dict[str, float]:
"""
Updates model using buffer.
:param num_sequences: Number of trajectories in batch.
:param batch: Experience mini-batch.
:param update_target: Whether or not to update target value network
:param reward_signal_batches: Minibatches to use for updating the reward signals,
indexed by name. If none, don't update the reward signals.
:return: Output from update process.
"""
feed_dict = self._construct_feed_dict(self.policy, batch, num_sequences)
stats_needed = self.stats_name_to_update_name
update_stats: Dict[str, float] = {}
update_vals = self._execute_model(feed_dict, self.update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
# Update target network. By default, target update happens at every policy update.
self.sess.run(self.target_update_op)
return update_stats
def update_reward_signals(
self, reward_signal_minibatches: Mapping[str, AgentBuffer], num_sequences: int
) -> Dict[str, float]:
"""
Only update the reward signals.
:param reward_signal_batches: Minibatches to use for updating the reward signals,
indexed by name. If none, don't update the reward signals.
"""
# Collect feed dicts for all reward signals.
feed_dict: Dict[tf.Tensor, Any] = {}
update_dict: Dict[str, tf.Tensor] = {}
update_stats: Dict[str, float] = {}
stats_needed: Dict[str, str] = {}
if reward_signal_minibatches:
self.add_reward_signal_dicts(
feed_dict,
update_dict,
stats_needed,
reward_signal_minibatches,
num_sequences,
)
update_vals = self._execute_model(feed_dict, update_dict)
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
return update_stats
def add_reward_signal_dicts(
self,
feed_dict: Dict[tf.Tensor, Any],
update_dict: Dict[str, tf.Tensor],
stats_needed: Dict[str, str],
reward_signal_minibatches: Mapping[str, AgentBuffer],
num_sequences: int,
) -> None:
"""
Adds the items needed for reward signal updates to the feed_dict and stats_needed dict.
:param feed_dict: Feed dict needed update
:param update_dit: Update dict that needs update
:param stats_needed: Stats needed to get from the update.
:param reward_signal_minibatches: Minibatches to use for updating the reward signals,
indexed by name.
"""
for name, r_batch in reward_signal_minibatches.items():
feed_dict.update(
self.reward_signals[name].prepare_update(
self.policy, r_batch, num_sequences
)
)
update_dict.update(self.reward_signals[name].update_dict)
stats_needed.update(self.reward_signals[name].stats_name_to_update_name)
def _construct_feed_dict(
self, policy: TFPolicy, batch: AgentBuffer, num_sequences: int
) -> Dict[tf.Tensor, Any]:
"""
Builds the feed dict for updating the SAC model.
:param model: The model to update. May be different when, e.g. using multi-GPU.
:param batch: Mini-batch to use to update.
:param num_sequences: Number of LSTM sequences in batch.
"""
# Do an optional burn-in for memories
num_burn_in = int(self.burn_in_ratio * self.policy.sequence_length)
burn_in_mask = np.ones((self.policy.sequence_length), dtype=np.float32)
burn_in_mask[range(0, num_burn_in)] = 0
burn_in_mask = np.tile(burn_in_mask, num_sequences)
feed_dict = {
policy.batch_size_ph: num_sequences,
policy.sequence_length_ph: self.policy.sequence_length,
self.next_sequence_length_ph: self.policy.sequence_length,
self.policy.mask_input: batch["masks"] * burn_in_mask,
}
for name in self.reward_signals:
feed_dict[self.rewards_holders[name]] = batch[f"{name}_rewards"]
if self.policy.use_continuous_act:
feed_dict[self.policy_network.external_action_in] = batch["actions"]
else:
feed_dict[policy.output] = batch["actions"]
if self.policy.use_recurrent:
feed_dict[policy.prev_action] = batch["prev_action"]
feed_dict[policy.action_masks] = batch["action_mask"]
if self.policy.use_vec_obs:
feed_dict[policy.vector_in] = batch["vector_obs"]
feed_dict[self.next_vector_in] = batch["next_vector_in"]
if self.policy.vis_obs_size > 0:
for i, _ in enumerate(policy.visual_in):
_obs = batch["visual_obs%d" % i]
feed_dict[policy.visual_in[i]] = _obs
for i, _ in enumerate(self.next_visual_in):
_obs = batch["next_visual_obs%d" % i]
feed_dict[self.next_visual_in[i]] = _obs
if self.policy.use_recurrent:
feed_dict[policy.memory_in] = [
batch["memory"][i]
for i in range(0, len(batch["memory"]), self.policy.sequence_length)
]
feed_dict[self.policy_network.memory_in] = self._make_zero_mem(
self.m_size, batch.num_experiences
)
feed_dict[self.target_network.memory_in] = self._make_zero_mem(
self.m_size // 3, batch.num_experiences
)
feed_dict[self.dones_holder] = batch["done"]
return feed_dict

/ml-agents/mlagents/tf_utils/globals.py → /ml-agents/mlagents/torch_utils/globals.py

正在加载...
取消
保存