浏览代码

Add environment manager for parallel environments (#2209)

Previously in v0.8 we added parallel environments via the
SubprocessUnityEnvironment, which exposed the same abstraction as
UnityEnvironment while actually wrapping many parallel environments
via subprocesses.

Wrapping many environments with the same interface as a single
environment had some downsides, however:
* Ordering needed to be preserved for agents across different envs,
  complicating the SubprocessEnvironment logic
* Asynchronous environments with steps taken out of sync with the
  trainer aren't viable with the Environment abstraction

This PR introduces a new EnvManager abstraction which exposes a
reduced subset of the UnityEnvironment abstraction and a
SubprocessEnvManager implementation which replaces the
SubprocessUnityEnvironment.
/develop-generalizationTraining-TrainerController
GitHub 5 年前
当前提交
b05c9ac1
共有 29 个文件被更改,包括 537 次插入519 次删除
  1. 4
      ml-agents-envs/mlagents/envs/__init__.py
  2. 13
      ml-agents-envs/mlagents/envs/brain.py
  3. 3
      ml-agents-envs/mlagents/envs/environment.py
  4. 3
      ml-agents/mlagents/trainers/__init__.py
  5. 4
      ml-agents/mlagents/trainers/bc/policy.py
  6. 4
      ml-agents/mlagents/trainers/components/reward_signals/curiosity/signal.py
  7. 4
      ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py
  8. 4
      ml-agents/mlagents/trainers/components/reward_signals/reward_signal.py
  9. 4
      ml-agents/mlagents/trainers/components/reward_signals/reward_signal_factory.py
  10. 2
      ml-agents/mlagents/trainers/demo_loader.py
  11. 6
      ml-agents/mlagents/trainers/learn.py
  12. 11
      ml-agents/mlagents/trainers/models.py
  13. 4
      ml-agents/mlagents/trainers/ppo/policy.py
  14. 13
      ml-agents/mlagents/trainers/ppo/trainer.py
  15. 5
      ml-agents/mlagents/trainers/tests/test_learn.py
  16. 8
      ml-agents/mlagents/trainers/tests/test_policy.py
  17. 45
      ml-agents/mlagents/trainers/tests/test_ppo.py
  18. 47
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  19. 27
      ml-agents/mlagents/trainers/trainer.py
  20. 145
      ml-agents/mlagents/trainers/trainer_controller.py
  21. 14
      ml-agents/mlagents/trainers/tf_policy.py
  22. 38
      ml-agents-envs/mlagents/envs/env_manager.py
  23. 10
      ml-agents-envs/mlagents/envs/policy.py
  24. 180
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py
  25. 110
      ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py
  26. 124
      ml-agents-envs/mlagents/envs/tests/test_subprocess_unity_environment.py
  27. 224
      ml-agents-envs/mlagents/envs/subprocess_environment.py
  28. 0
      /ml-agents/mlagents/trainers/tf_policy.py
  29. 0
      /ml-agents-envs/mlagents/envs/action_info.py

4
ml-agents-envs/mlagents/envs/__init__.py


from .brain import *
from .brain import AllBrainInfo, BrainInfo, BrainParameters
from .action_info import ActionInfo, ActionInfoOutputs
from .policy import Policy
from .environment import *
from .exception import *

13
ml-agents-envs/mlagents/envs/brain.py


return np.append(m1, m2, axis=0)
@staticmethod
def process_pixels(image_bytes, gray_scale):
def process_pixels(image_bytes: bytes, gray_scale: bool) -> np.ndarray:
"""
Converts byte array observation image into numpy array, re-sizes it,
and optionally converts it to grey scale

"""
s = bytearray(image_bytes)
image = Image.open(io.BytesIO(s))
image_bytearray = bytearray(image_bytes)
image = Image.open(io.BytesIO(image_bytearray))
s = np.array(image) / 255.0
if gray_scale:
s = np.mean(s, axis=2)

@staticmethod
def from_agent_proto(agent_info_list, brain_params):
def from_agent_proto(worker_id: int, agent_info_list, brain_params):
vis_obs = []
vis_obs: List[np.ndarray] = []
for i in range(brain_params.number_visual_observations):
obs = [
BrainInfo.process_pixels(

vector_obs = np.nan_to_num(
np.array([x.stacked_vector_observation for x in agent_info_list])
)
agents = [f"${worker_id}-{x.id}" for x in agent_info_list]
brain_info = BrainInfo(
visual_observation=vis_obs,
vector_observation=vector_obs,

agents=[x.id for x in agent_info_list],
agents=agents,
local_done=[x.done for x in agent_info_list],
vector_action=np.array([x.stored_vector_actions for x in agent_info_list]),
text_action=[list(x.stored_text_actions) for x in agent_info_list],

3
ml-agents-envs/mlagents/envs/environment.py


None
) # The process that is started. If None, no process was started
self.communicator = self.get_communicator(worker_id, base_port, timeout_wait)
self.worker_id = worker_id
# If the environment name is None, a new environment will not be launched
# and the communicator will directly try to connect to an existing unity environment.

for brain_name in output.agentInfos:
agent_info_list = output.agentInfos[brain_name].value
_data[brain_name] = BrainInfo.from_agent_proto(
agent_info_list, self.brains[brain_name]
self.worker_id, agent_info_list, self.brains[brain_name]
)
return _data, global_done

3
ml-agents/mlagents/trainers/__init__.py


from .action_info import *
from .buffer import *
from .curriculum import *
from .meta_curriculum import *

from .policy import *
from .tf_policy import *
from .trainer_controller import *
from .bc.models import *
from .bc.offline_trainer import *

4
ml-agents/mlagents/trainers/bc/policy.py


import numpy as np
from mlagents.trainers.bc.models import BehavioralCloningModel
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
class BCPolicy(Policy):
class BCPolicy(TFPolicy):
def __init__(self, seed, brain, trainer_parameters, load):
"""
:param seed: Random seed.

4
ml-agents/mlagents/trainers/components/reward_signals/curiosity/signal.py


import numpy as np
from mlagents.trainers.components.reward_signals import RewardSignal, RewardSignalResult
from mlagents.trainers.components.reward_signals.curiosity.model import CuriosityModel
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
policy: Policy,
policy: TFPolicy,
strength: float,
gamma: float,
encoding_size: int = 128,

4
ml-agents/mlagents/trainers/components/reward_signals/extrinsic/signal.py


import numpy as np
from mlagents.trainers.components.reward_signals import RewardSignal, RewardSignalResult
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
def __init__(self, policy: Policy, strength: float, gamma: float):
def __init__(self, policy: TFPolicy, strength: float, gamma: float):
"""
The extrinsic reward generator. Returns the reward received by the environment
:param policy: The Policy object (e.g. PPOPolicy) that this Reward Signal will apply to.

4
ml-agents/mlagents/trainers/components/reward_signals/reward_signal.py


import logging
from mlagents.trainers.trainer import UnityTrainerException
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
from collections import namedtuple
import numpy as np
import abc

class RewardSignal(abc.ABC):
def __init__(self, policy: Policy, strength: float, gamma: float):
def __init__(self, policy: TFPolicy, strength: float, gamma: float):
"""
Initializes a reward signal. At minimum, you must pass in the policy it is being applied to,
the reward strength, and the gamma (discount factor.)

4
ml-agents/mlagents/trainers/components/reward_signals/reward_signal_factory.py


from mlagents.trainers.components.reward_signals.curiosity.signal import (
CuriosityRewardSignal,
)
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
logger = logging.getLogger("mlagents.trainers")

def create_reward_signal(
policy: Policy, name: str, config_entry: Dict[str, Any]
policy: TFPolicy, name: str, config_entry: Dict[str, Any]
) -> RewardSignal:
"""
Creates a reward signal class based on the name and config entry provided as a dict.

2
ml-agents/mlagents/trainers/demo_loader.py


if obs_decoded > 1:
agent_info = AgentInfoProto()
agent_info.ParseFromString(data[pos : pos + next_pos])
brain_info = BrainInfo.from_agent_proto([agent_info], brain_params)
brain_info = BrainInfo.from_agent_proto(0, [agent_info], brain_params)
brain_infos.append(brain_info)
if len(brain_infos) == total_expected:
break

6
ml-agents/mlagents/trainers/learn.py


from mlagents.envs import UnityEnvironment
from mlagents.envs.exception import UnityEnvironmentException
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.subprocess_environment import SubprocessUnityEnvironment
from mlagents.envs.subprocess_env_manager import SubprocessEnvManager
def run_training(

run_seed,
base_port + (sub_id * num_envs),
)
env = SubprocessUnityEnvironment(env_factory, num_envs)
env = SubprocessEnvManager(env_factory, num_envs)
maybe_meta_curriculum = try_create_meta_curriculum(curriculum_folder, env)
# Create controller and begin training.

def try_create_meta_curriculum(
curriculum_folder: Optional[str], env: BaseUnityEnvironment
curriculum_folder: Optional[str], env: SubprocessEnvManager
) -> Optional[MetaCurriculum]:
if curriculum_folder is None:
return None

11
ml-agents/mlagents/trainers/models.py


tf.set_random_seed(seed)
self.brain = brain
self.vector_in = None
self.global_step, self.increment_step = self.create_global_steps()
self.global_step, self.increment_step, self.steps_to_increment = (
self.create_global_steps()
)
self.visual_in = []
self.batch_size = tf.placeholder(shape=None, dtype=tf.int32, name="batch_size")
self.sequence_length = tf.placeholder(

global_step = tf.Variable(
0, name="global_step", trainable=False, dtype=tf.int32
)
increment_step = tf.assign(global_step, tf.add(global_step, 1))
return global_step, increment_step
steps_to_increment = tf.placeholder(
shape=[], dtype=tf.int32, name="steps_to_increment"
)
increment_step = tf.assign(global_step, tf.add(global_step, steps_to_increment))
return global_step, increment_step, steps_to_increment
@staticmethod
def scaled_init(scale):

4
ml-agents/mlagents/trainers/ppo/policy.py


from mlagents.trainers import BrainInfo, ActionInfo
from mlagents.trainers.ppo.models import PPOModel
from mlagents.trainers.policy import Policy
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)

class PPOPolicy(Policy):
class PPOPolicy(TFPolicy):
def __init__(self, seed, brain, trainer_params, is_training, load):
"""
Policy for Proximal Policy Optimization Networks.

13
ml-agents/mlagents/trainers/ppo/trainer.py


import logging
from collections import deque, defaultdict
from typing import Any, List
from typing import List, Any
import tensorflow as tf
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.envs.action_info import ActionInfoOutputs
logger = logging.getLogger("mlagents.trainers")

"""
Responsible for collecting experiences and training PPO model.
:param trainer_parameters: The parameters for the trainer (dictionary).
:param reward_buff_cap: Max reward history to track in the reward buffer
:param training: Whether the trainer is set for training.
:param load: Whether the model should be loaded.
:param seed: The seed the model will be initialized with

"""
return self._reward_buffer
def increment_step(self):
def increment_step(self, n_steps: int) -> None:
:param n_steps: number of steps to increment the step count by
self.policy.increment_step()
self.step = self.policy.get_current_step()
self.step = self.policy.increment_step(n_steps)
def construct_curr_info(self, next_info: BrainInfo) -> BrainInfo:
"""

5
ml-agents/mlagents/trainers/tests/test_learn.py


import unittest.mock as mock
import pytest
from unittest.mock import *
from mlagents.trainers import learn, TrainerController

}
@patch("mlagents.trainers.learn.SubprocessUnityEnvironment")
@patch("mlagents.trainers.learn.SubprocessEnvManager")
@patch("mlagents.trainers.learn.create_environment_factory")
@patch("mlagents.trainers.learn.load_config")
def test_run_training(load_config, create_environment_factory, subproc_env_mock):

)
@patch("mlagents.trainers.learn.SubprocessUnityEnvironment")
@patch("mlagents.trainers.learn.SubprocessEnvManager")
@patch("mlagents.trainers.learn.create_environment_factory")
@patch("mlagents.trainers.learn.load_config")
def test_docker_target_path(load_config, create_environment_factory, subproc_env_mock):

8
ml-agents/mlagents/trainers/tests/test_policy.py


from mlagents.trainers.policy import *
from mlagents.trainers.tf_policy import *
from unittest.mock import MagicMock

def test_take_action_returns_empty_with_no_agents():
test_seed = 3
policy = Policy(test_seed, basic_mock_brain(), basic_params())
policy = TFPolicy(test_seed, basic_mock_brain(), basic_params())
no_agent_brain_info = BrainInfo([], [], [], agents=[])
result = policy.get_action(no_agent_brain_info)
assert result == ActionInfo([], [], [], None, None)

test_seed = 3
policy = Policy(test_seed, basic_mock_brain(), basic_params())
policy = TFPolicy(test_seed, basic_mock_brain(), basic_params())
policy.evaluate = MagicMock(return_value={})
brain_info_with_agents = BrainInfo([], [], [], agents=["an-agent-id"])
result = policy.get_action(brain_info_with_agents)

def test_take_action_returns_action_info_when_available():
test_seed = 3
policy = Policy(test_seed, basic_mock_brain(), basic_params())
policy = TFPolicy(test_seed, basic_mock_brain(), basic_params())
policy_eval_out = {
"action": np.array([1.0]),
"memory_out": np.array([2.5]),

45
ml-agents/mlagents/trainers/tests/test_ppo.py


import yaml
from mlagents.trainers.ppo.models import PPOModel
from mlagents.trainers.ppo.trainer import discount_rewards
from mlagents.trainers.ppo.trainer import PPOTrainer, discount_rewards
from mlagents.envs import UnityEnvironment
from mlagents.envs import UnityEnvironment, BrainParameters
from mlagents.envs.mock_communicator import MockCommunicator

gamma = 0.9
returns = discount_rewards(rewards, gamma, 0.0)
np.testing.assert_array_almost_equal(returns, np.array([0.729, 0.81, 0.9, 1.0]))
def test_trainer_increment_step():
trainer_params = {
"trainer": "ppo",
"batch_size": 2048,
"beta": 0.005,
"buffer_size": 20480,
"epsilon": 0.2,
"gamma": 0.995,
"hidden_units": 512,
"lambd": 0.95,
"learning_rate": 0.0003,
"max_steps": "2e6",
"memory_size": 256,
"normalize": True,
"num_epoch": 3,
"num_layers": 3,
"time_horizon": 1000,
"sequence_length": 64,
"summary_freq": 3000,
"use_recurrent": False,
"use_curiosity": False,
"curiosity_strength": 0.01,
"curiosity_enc_size": 128,
"summary_path": "./summaries/test_trainer_summary",
"model_path": "./models/test_trainer_models/TestModel",
"keep_checkpoints": 5,
"reward_signals": {"extrinsic": {"strength": 1.0, "gamma": 0.99}},
}
brain_params = BrainParameters("test_brain", 1, 1, [], [2], [], 0)
trainer = PPOTrainer(brain_params, 0, trainer_params, True, False, 0, "0")
policy_mock = mock.Mock()
step_count = 10
policy_mock.increment_step = mock.Mock(return_value=step_count)
trainer.policy = policy_mock
trainer.increment_step(5)
policy_mock.increment_step.assert_called_with(5)
assert trainer.step == 10
if __name__ == "__main__":

47
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


import json
import os
from unittest.mock import *

from mlagents.trainers.ppo.trainer import PPOTrainer
from mlagents.trainers.bc.offline_trainer import OfflineBCTrainer
from mlagents.trainers.bc.online_trainer import OnlineBCTrainer
from mlagents.envs.subprocess_env_manager import StepInfo
from mlagents.envs.exception import UnityEnvironmentException

tc = basic_trainer_controller(brain_info_mock)
tc.initialize_trainers = MagicMock()
tc.trainers = {"testbrain": trainer_mock}
tc.take_step = MagicMock()
tc.advance = MagicMock()
tc.trainers["testbrain"].get_step = 0
def take_step_sideeffect(env, curr_info):
def take_step_sideeffect(env):
return 1
tc.take_step.side_effect = take_step_sideeffect
tc.advance.side_effect = take_step_sideeffect
tc._export_graph = MagicMock()
tc._save_model = MagicMock()

tf_reset_graph.assert_called_once()
tc.initialize_trainers.assert_called_once_with(trainer_config)
env_mock.reset.assert_called_once()
assert tc.take_step.call_count == 11
assert tc.advance.call_count == 11
tc._export_graph.assert_not_called()
tc._save_model.assert_not_called()
env_mock.close.assert_called_once()

tf_reset_graph.assert_called_once()
tc.initialize_trainers.assert_called_once_with(trainer_config)
env_mock.reset.assert_called_once()
assert tc.take_step.call_count == trainer_mock.get_max_steps + 1
assert tc.advance.call_count == trainer_mock.get_max_steps + 1
env_mock.close.assert_called_once()
tc._save_model.assert_called_once_with(steps=6)

def test_take_step_adds_experiences_to_trainer_and_trains():
tc, trainer_mock = trainer_controller_with_take_step_mocks()
curr_info_mock = MagicMock()
brain_info_mock = MagicMock()
curr_info_mock.__getitem__ = MagicMock(return_value=brain_info_mock)
old_step_info = StepInfo(Mock(), Mock(), MagicMock())
new_step_info = StepInfo(Mock(), Mock(), MagicMock())
env_step_output_mock = MagicMock()
env_mock.step = MagicMock(return_value=env_step_output_mock)
env_mock.close = MagicMock()
env_mock.reset = MagicMock(return_value=curr_info_mock)
env_mock.step.return_value = [new_step_info]
env_mock.reset.return_value = [old_step_info]
action_output_mock = ActionInfo(
"action", "memory", "actiontext", "value", {"some": "output"}
)
trainer_mock.get_action = MagicMock(return_value=action_output_mock)
tc.take_step(env_mock, curr_info_mock)
tc.advance(env_mock)
trainer_mock.get_action.assert_called_once_with(brain_info_mock)
env_mock.step.assert_called_once_with(
vector_action={"testbrain": action_output_mock.action},
memory={"testbrain": action_output_mock.memory},
text_action={"testbrain": action_output_mock.text},
value={"testbrain": action_output_mock.value},
)
env_mock.step.assert_called_once()
curr_info_mock, env_step_output_mock, action_output_mock.outputs
new_step_info.previous_all_brain_info,
new_step_info.current_all_brain_info,
new_step_info.brain_name_to_action_info["testbrain"].outputs,
curr_info_mock, env_step_output_mock
new_step_info.previous_all_brain_info, new_step_info.current_all_brain_info
trainer_mock.write_summary.assert_called_once()
trainer_mock.increment_step.assert_called_once()

27
ml-agents/mlagents/trainers/trainer.py


import tensorflow as tf
import numpy as np
from mlagents.envs import UnityException, AllBrainInfo, BrainInfo
from mlagents.trainers import ActionInfo, ActionInfoOutputs
from mlagents.envs import UnityException, AllBrainInfo, ActionInfoOutputs
from mlagents.trainers import TrainerMetrics
LOGGER = logging.getLogger("mlagents.trainers")

"""
raise UnityTrainerException("The get_step property was not implemented.")
def increment_step(self):
def increment_step(self, n_steps: int) -> None:
def get_action(self, curr_info: BrainInfo) -> ActionInfo:
"""
Get an action using this trainer's current policy.
:param curr_info: Current BrainInfo.
:return: The ActionInfo given by the policy given the BrainInfo.
"""
self.trainer_metrics.start_experience_collection_timer()
action = self.policy.get_action(curr_info)
self.trainer_metrics.end_experience_collection_timer()
return action
def add_experiences(
self,
curr_info: AllBrainInfo,

if self.is_training and self.get_step <= self.get_max_steps
else "Not Training."
)
step = min(self.get_step, self.get_max_steps)
if len(self.stats["Environment/Cumulative Reward"]) > 0:
mean_reward = np.mean(self.stats["Environment/Cumulative Reward"])
LOGGER.info(

"Reward: {"
":0.3f}. Std of Reward: {:0.3f}. {}".format(
"Reward: {:0.3f}"
". Std of Reward: {:0.3f}. {}".format(
min(self.get_step, self.get_max_steps),
step,
delta_train_start,
mean_reward,
np.std(self.stats["Environment/Cumulative Reward"]),

else:
LOGGER.info(
" {}: {}: Step: {}. No episode was completed since last summary. {}".format(
self.run_id, self.brain_name, self.get_step, is_training
self.run_id, self.brain_name, step, is_training
)
)
summary = tf.Summary()

summary.value.add(tag="{}".format(key), simple_value=stat_mean)
self.stats[key] = []
summary.value.add(tag="Environment/Lesson", simple_value=lesson_num)
self.summary_writer.add_summary(summary, self.get_step)
self.summary_writer.add_summary(summary, step)
self.summary_writer.flush()
def write_tensorboard_text(self, key, input_dict):

145
ml-agents/mlagents/trainers/trainer_controller.py


import os
import logging
import shutil
import sys
from typing import *
import numpy as np

from mlagents.envs import AllBrainInfo, BrainParameters
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs import BrainParameters
from mlagents.envs.env_manager import StepInfo
from mlagents.envs.subprocess_env_manager import SubprocessEnvManager
from mlagents.envs.exception import UnityEnvironmentException
from mlagents.trainers import Trainer, TrainerMetrics
from mlagents.trainers.ppo.trainer import PPOTrainer

self.keep_checkpoints = keep_checkpoints
self.trainers: Dict[str, Trainer] = {}
self.trainer_metrics: Dict[str, TrainerMetrics] = {}
self.global_step = 0
self.meta_curriculum = meta_curriculum
self.seed = training_seed
self.training_start_time = time()

"permissions are set correctly.".format(model_path)
)
def _reset_env(self, env: BaseUnityEnvironment) -> AllBrainInfo:
def _reset_env(self, env: SubprocessEnvManager) -> List[StepInfo]:
"""Resets the environment.
Returns:

else:
return env.reset(train_mode=self.fast_simulation)
def _should_save_model(self, global_step: int) -> bool:
return (
global_step % self.save_freq == 0 and global_step != 0 and self.train_model
)
def _not_done_training(self) -> bool:
return (
any([t.get_step <= t.get_max_steps for k, t in self.trainers.items()])
or not self.train_model
)
def write_to_tensorboard(self, global_step: int) -> None:
for brain_name, trainer in self.trainers.items():
# Write training statistics to Tensorboard.
delta_train_start = time() - self.training_start_time
if self.meta_curriculum is not None:
trainer.write_summary(
global_step,
delta_train_start,
lesson_num=self.meta_curriculum.brains_to_curriculums[
brain_name
].lesson_num,
)
else:
trainer.write_summary(global_step, delta_train_start)
self, env: BaseUnityEnvironment, trainer_config: Dict[str, Any]
self, env_manager: SubprocessEnvManager, trainer_config: Dict[str, Any]
) -> None:
# TODO: Should be able to start learning at different lesson numbers
# for each curriculum.

for _, t in self.trainers.items():
self.logger.info(t)
global_step = 0
curr_info = self._reset_env(env)
while (
any([t.get_step <= t.get_max_steps for k, t in self.trainers.items()])
or not self.train_model
):
new_info = self.take_step(env, curr_info)
self.global_step += 1
if (
self.global_step % self.save_freq == 0
and self.global_step != 0
and self.train_model
):
# Save Tensorflow model
self._save_model(steps=self.global_step)
curr_info = new_info
for brain_name, trainer in self.trainers.items():
env_manager.set_policy(brain_name, trainer.policy)
self._reset_env(env_manager)
while self._not_done_training():
n_steps = self.advance(env_manager)
for i in range(n_steps):
global_step += 1
if self._should_save_model(global_step):
# Save Tensorflow model
self._save_model(steps=global_step)
self.write_to_tensorboard(global_step)
if self.global_step != 0 and self.train_model:
self._save_model(steps=self.global_step)
if global_step != 0 and self.train_model:
self._save_model(steps=global_step)
self._save_model_when_interrupted(steps=self.global_step)
self._save_model_when_interrupted(steps=global_step)
env.close()
env_manager.close()
def take_step(
self, env: BaseUnityEnvironment, curr_info: AllBrainInfo
) -> AllBrainInfo:
def advance(self, env: SubprocessEnvManager) -> int:
if self.meta_curriculum:
# Get the sizes of the reward buffers.
reward_buff_sizes = {

# If any lessons were incremented or the environment is
# ready to be reset
if self.meta_curriculum and any(lessons_incremented.values()):
curr_info = self._reset_env(env)
self._reset_env(env)
for brain_name, trainer in self.trainers.items():
trainer.end_episode()
for brain_name, changed in lessons_incremented.items():

# Decide and take an action
take_action_vector = {}
take_action_memories = {}
take_action_text = {}
take_action_value = {}
take_action_outputs = {}
for brain_name, trainer in self.trainers.items():
action_info = trainer.get_action(curr_info[brain_name])
take_action_vector[brain_name] = action_info.action
take_action_memories[brain_name] = action_info.memory
take_action_text[brain_name] = action_info.text
take_action_value[brain_name] = action_info.value
take_action_outputs[brain_name] = action_info.outputs
new_info = env.step(
vector_action=take_action_vector,
memory=take_action_memories,
text_action=take_action_text,
value=take_action_value,
)
new_step_infos = env.step()
for step_info in new_step_infos:
for brain_name, trainer in self.trainers.items():
if brain_name in self.trainer_metrics:
self.trainer_metrics[brain_name].add_delta_step(delta_time_step)
trainer.add_experiences(
step_info.previous_all_brain_info,
step_info.current_all_brain_info,
step_info.brain_name_to_action_info[brain_name].outputs,
)
trainer.process_experiences(
step_info.previous_all_brain_info, step_info.current_all_brain_info
)
trainer.add_experiences(
curr_info, new_info, take_action_outputs[brain_name]
)
trainer.process_experiences(curr_info, new_info)
if (
trainer.is_ready_update()
and self.train_model
and trainer.get_step <= trainer.get_max_steps
):
# Perform gradient descent with experience buffer
trainer.update_policy()
# Write training statistics to Tensorboard.
delta_train_start = time() - self.training_start_time
if self.meta_curriculum is not None:
trainer.write_summary(
self.global_step,
delta_train_start,
lesson_num=self.meta_curriculum.brains_to_curriculums[
brain_name
].lesson_num,
)
else:
trainer.write_summary(self.global_step, delta_train_start)
trainer.increment_step()
return new_info
trainer.increment_step(len(new_step_infos))
if trainer.is_ready_update():
# Perform gradient descent with experience buffer
trainer.update_policy()
env.set_policy(brain_name, trainer.policy)
return len(new_step_infos)

14
ml-agents/mlagents/trainers/tf_policy.py


import numpy as np
import tensorflow as tf
from mlagents.trainers import ActionInfo, UnityException
from mlagents.trainers import UnityException
from mlagents.envs import Policy, ActionInfo
from tensorflow.python.tools import freeze_graph
from mlagents.trainers import tensorflow_to_barracuda as tf2bc
from mlagents.envs import BrainInfo

pass
class Policy(object):
class TFPolicy(Policy):
"""
Contains a learning model, and the necessary
functions to interact with it to perform evaluate and updating.

step = self.sess.run(self.model.global_step)
return step
def increment_step(self):
def increment_step(self, n_steps):
self.sess.run(self.model.increment_step)
out_dict = {
"global_step": self.model.global_step,
"increment_step": self.model.increment_step,
}
feed_dict = {self.model.steps_to_increment: n_steps}
return self.sess.run(out_dict, feed_dict=feed_dict)["global_step"]
def get_inference_vars(self):
"""

38
ml-agents-envs/mlagents/envs/env_manager.py


from abc import ABC, abstractmethod
from typing import List, Dict, NamedTuple, Optional
from mlagents.envs import AllBrainInfo, BrainParameters, Policy, ActionInfo
class StepInfo(NamedTuple):
previous_all_brain_info: Optional[AllBrainInfo]
current_all_brain_info: AllBrainInfo
brain_name_to_action_info: Optional[Dict[str, ActionInfo]]
class EnvManager(ABC):
def __init__(self):
self.policies: Dict[str, Policy] = {}
def set_policy(self, brain_name: str, policy: Policy) -> None:
self.policies[brain_name] = policy
@abstractmethod
def step(self) -> List[StepInfo]:
pass
@abstractmethod
def reset(self, config=None, train_mode=True) -> List[StepInfo]:
pass
@abstractmethod
def external_brains(self) -> Dict[str, BrainParameters]:
pass
@property
@abstractmethod
def reset_parameters(self) -> Dict[str, float]:
pass
@abstractmethod
def close(self):
pass

10
ml-agents-envs/mlagents/envs/policy.py


from abc import ABC, abstractmethod
from mlagents.envs import BrainInfo
from mlagents.envs import ActionInfo
class Policy(ABC):
@abstractmethod
def get_action(self, brain_info: BrainInfo) -> ActionInfo:
pass

180
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


from typing import *
import cloudpickle
from mlagents.envs import UnityEnvironment
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.env_manager import EnvManager, StepInfo
from mlagents.envs import AllBrainInfo, BrainParameters, ActionInfo
class EnvironmentCommand(NamedTuple):
name: str
payload: Any = None
class EnvironmentResponse(NamedTuple):
name: str
worker_id: int
payload: Any
class UnityEnvWorker:
def __init__(self, process: Process, worker_id: int, conn: Connection):
self.process = process
self.worker_id = worker_id
self.conn = conn
self.previous_step: StepInfo = StepInfo(None, {}, None)
self.previous_all_action_info: Dict[str, ActionInfo] = {}
def send(self, name: str, payload=None):
try:
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)
except (BrokenPipeError, EOFError):
raise KeyboardInterrupt
def recv(self) -> EnvironmentResponse:
try:
response: EnvironmentResponse = self.conn.recv()
return response
except (BrokenPipeError, EOFError):
raise KeyboardInterrupt
def close(self):
try:
self.conn.send(EnvironmentCommand("close"))
except (BrokenPipeError, EOFError):
pass
self.process.join()
def worker(parent_conn: Connection, pickled_env_factory: str, worker_id: int):
env_factory: Callable[[int], UnityEnvironment] = cloudpickle.loads(
pickled_env_factory
)
env = env_factory(worker_id)
def _send_response(cmd_name, payload):
parent_conn.send(EnvironmentResponse(cmd_name, worker_id, payload))
try:
while True:
cmd: EnvironmentCommand = parent_conn.recv()
if cmd.name == "step":
all_action_info = cmd.payload
if env.global_done:
all_brain_info = env.reset()
else:
actions = {}
memories = {}
texts = {}
values = {}
for brain_name, action_info in all_action_info.items():
actions[brain_name] = action_info.action
memories[brain_name] = action_info.memory
texts[brain_name] = action_info.text
values[brain_name] = action_info.value
all_brain_info = env.step(actions, memories, texts, values)
_send_response("step", all_brain_info)
elif cmd.name == "external_brains":
_send_response("external_brains", env.external_brains)
elif cmd.name == "reset_parameters":
_send_response("reset_parameters", env.reset_parameters)
elif cmd.name == "reset":
all_brain_info = env.reset(cmd.payload[0], cmd.payload[1])
_send_response("reset", all_brain_info)
elif cmd.name == "global_done":
_send_response("global_done", env.global_done)
elif cmd.name == "close":
break
except KeyboardInterrupt:
print("UnityEnvironment worker: keyboard interrupt")
finally:
env.close()
class SubprocessEnvManager(EnvManager):
def __init__(
self, env_factory: Callable[[int], BaseUnityEnvironment], n_env: int = 1
):
super().__init__()
self.env_workers: List[UnityEnvWorker] = []
for worker_idx in range(n_env):
self.env_workers.append(self.create_worker(worker_idx, env_factory))
def get_last_steps(self):
return [ew.previous_step for ew in self.env_workers]
@staticmethod
def create_worker(
worker_id: int, env_factory: Callable[[int], BaseUnityEnvironment]
) -> UnityEnvWorker:
parent_conn, child_conn = Pipe()
# Need to use cloudpickle for the env factory function since function objects aren't picklable
# on Windows as of Python 3.6.
pickled_env_factory = cloudpickle.dumps(env_factory)
child_process = Process(
target=worker, args=(child_conn, pickled_env_factory, worker_id)
)
child_process.start()
return UnityEnvWorker(child_process, worker_id, parent_conn)
def step(self) -> List[StepInfo]:
for env_worker in self.env_workers:
all_action_info = self._take_step(env_worker.previous_step)
env_worker.previous_all_action_info = all_action_info
env_worker.send("step", all_action_info)
step_brain_infos: List[AllBrainInfo] = [
self.env_workers[i].recv().payload for i in range(len(self.env_workers))
]
steps = []
for i in range(len(step_brain_infos)):
env_worker = self.env_workers[i]
step_info = StepInfo(
env_worker.previous_step.current_all_brain_info,
step_brain_infos[i],
env_worker.previous_all_action_info,
)
env_worker.previous_step = step_info
steps.append(step_info)
return steps
def reset(self, config=None, train_mode=True) -> List[StepInfo]:
self._broadcast_message("reset", (config, train_mode))
reset_results = [
self.env_workers[i].recv().payload for i in range(len(self.env_workers))
]
for i in range(len(reset_results)):
env_worker = self.env_workers[i]
env_worker.previous_step = StepInfo(None, reset_results[i], None)
return list(map(lambda ew: ew.previous_step, self.env_workers))
@property
def external_brains(self) -> Dict[str, BrainParameters]:
self.env_workers[0].send("external_brains")
return self.env_workers[0].recv().payload
@property
def reset_parameters(self) -> Dict[str, float]:
self.env_workers[0].send("reset_parameters")
return self.env_workers[0].recv().payload
def close(self):
for env in self.env_workers:
env.close()
def _broadcast_message(self, name: str, payload=None):
for env in self.env_workers:
env.send(name, payload)
def _take_step(self, last_step: StepInfo) -> Dict[str, ActionInfo]:
all_action_info: Dict[str, ActionInfo] = {}
for brain_name, brain_info in last_step.current_all_brain_info.items():
all_action_info[brain_name] = self.policies[brain_name].get_action(
brain_info
)
return all_action_info

110
ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py


import unittest.mock as mock
from unittest.mock import Mock, MagicMock
import unittest
import cloudpickle
from mlagents.envs.subprocess_env_manager import StepInfo
from mlagents.envs.subprocess_env_manager import (
SubprocessEnvManager,
EnvironmentResponse,
EnvironmentCommand,
worker,
)
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
def mock_env_factory(worker_id: int):
return mock.create_autospec(spec=BaseUnityEnvironment)
class MockEnvWorker:
def __init__(self, worker_id, resp=None):
self.worker_id = worker_id
self.process = None
self.conn = None
self.send = Mock()
self.recv = Mock(return_value=resp)
class SubprocessEnvManagerTest(unittest.TestCase):
def test_environments_are_created(self):
SubprocessEnvManager.create_worker = MagicMock()
env = SubprocessEnvManager(mock_env_factory, 2)
# Creates two processes
env.create_worker.assert_has_calls(
[mock.call(0, mock_env_factory), mock.call(1, mock_env_factory)]
)
self.assertEqual(len(env.env_workers), 2)
def test_worker_step_resets_on_global_done(self):
env_mock = Mock()
env_mock.reset = Mock(return_value="reset_data")
env_mock.global_done = True
def mock_global_done_env_factory(worker_id: int):
return env_mock
mock_parent_connection = Mock()
step_command = EnvironmentCommand("step", (None, None, None, None))
close_command = EnvironmentCommand("close")
mock_parent_connection.recv.side_effect = [step_command, close_command]
mock_parent_connection.send = Mock()
worker(
mock_parent_connection, cloudpickle.dumps(mock_global_done_env_factory), 0
)
# recv called twice to get step and close command
self.assertEqual(mock_parent_connection.recv.call_count, 2)
# worker returns the data from the reset
mock_parent_connection.send.assert_called_with(
EnvironmentResponse("step", 0, "reset_data")
)
def test_reset_passes_reset_params(self):
manager = SubprocessEnvManager(mock_env_factory, 1)
params = {"test": "params"}
manager.reset(params, False)
manager.env_workers[0].send.assert_called_with("reset", (params, False))
def test_reset_collects_results_from_all_envs(self):
SubprocessEnvManager.create_worker = lambda em, worker_id, env_factory: MockEnvWorker(
worker_id, EnvironmentResponse("reset", worker_id, worker_id)
)
manager = SubprocessEnvManager(mock_env_factory, 4)
params = {"test": "params"}
res = manager.reset(params)
for i, env in enumerate(manager.env_workers):
env.send.assert_called_with("reset", (params, True))
env.recv.assert_called()
# Check that the "last steps" are set to the value returned for each step
self.assertEqual(
manager.env_workers[i].previous_step.current_all_brain_info, i
)
assert res == list(map(lambda ew: ew.previous_step, manager.env_workers))
def test_step_takes_steps_for_all_envs(self):
SubprocessEnvManager.create_worker = lambda em, worker_id, env_factory: MockEnvWorker(
worker_id, EnvironmentResponse("step", worker_id, worker_id)
)
manager = SubprocessEnvManager(mock_env_factory, 2)
step_mock = Mock()
last_steps = [Mock(), Mock()]
manager.env_workers[0].previous_step = last_steps[0]
manager.env_workers[1].previous_step = last_steps[1]
manager._take_step = Mock(return_value=step_mock)
res = manager.step()
for i, env in enumerate(manager.env_workers):
env.send.assert_called_with("step", step_mock)
env.recv.assert_called()
# Check that the "last steps" are set to the value returned for each step
self.assertEqual(
manager.env_workers[i].previous_step.current_all_brain_info, i
)
self.assertEqual(
manager.env_workers[i].previous_step.previous_all_brain_info,
last_steps[i].current_all_brain_info,
)
assert res == list(map(lambda ew: ew.previous_step, manager.env_workers))

124
ml-agents-envs/mlagents/envs/tests/test_subprocess_unity_environment.py


import unittest.mock as mock
from unittest.mock import Mock, MagicMock
import unittest
from mlagents.envs.subprocess_environment import *
from mlagents.envs import UnityEnvironmentException, BrainInfo
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
def mock_env_factory(worker_id: int):
return mock.create_autospec(spec=BaseUnityEnvironment)
class MockEnvWorker:
def __init__(self, worker_id):
self.worker_id = worker_id
self.process = None
self.conn = None
self.send = MagicMock()
self.recv = MagicMock()
class SubprocessEnvironmentTest(unittest.TestCase):
def test_environments_are_created(self):
SubprocessUnityEnvironment.create_worker = MagicMock()
env = SubprocessUnityEnvironment(mock_env_factory, 2)
# Creates two processes
self.assertEqual(
env.create_worker.call_args_list,
[mock.call(0, mock_env_factory), mock.call(1, mock_env_factory)],
)
self.assertEqual(len(env.envs), 2)
def test_step_async_fails_when_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.waiting = True
with self.assertRaises(UnityEnvironmentException):
env.step_async(vector_action=[])
@staticmethod
def test_step_async_splits_input_by_agent_count():
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.env_agent_counts = {"MockBrain": [1, 3, 5]}
env.envs = [MockEnvWorker(0), MockEnvWorker(1), MockEnvWorker(2)]
env_0_actions = [[1.0, 2.0]]
env_1_actions = [[3.0, 4.0]] * 3
env_2_actions = [[5.0, 6.0]] * 5
vector_action = {"MockBrain": env_0_actions + env_1_actions + env_2_actions}
env.step_async(vector_action=vector_action)
env.envs[0].send.assert_called_with(
"step", ({"MockBrain": env_0_actions}, {}, {}, {})
)
env.envs[1].send.assert_called_with(
"step", ({"MockBrain": env_1_actions}, {}, {}, {})
)
env.envs[2].send.assert_called_with(
"step", ({"MockBrain": env_2_actions}, {}, {}, {})
)
def test_step_async_sets_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.step_async(vector_action=[])
self.assertTrue(env.waiting)
def test_step_await_fails_if_not_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
with self.assertRaises(UnityEnvironmentException):
env.step_await()
def test_step_await_combines_brain_info(self):
all_brain_info_env0 = {
"MockBrain": BrainInfo(
[], [[1.0, 2.0], [1.0, 2.0]], [], agents=[1, 2], memory=np.zeros((0, 0))
)
}
all_brain_info_env1 = {
"MockBrain": BrainInfo(
[], [[3.0, 4.0]], [], agents=[3], memory=np.zeros((0, 0))
)
}
env_worker_0 = MockEnvWorker(0)
env_worker_0.recv.return_value = EnvironmentResponse(
"step", 0, all_brain_info_env0
)
env_worker_1 = MockEnvWorker(1)
env_worker_1.recv.return_value = EnvironmentResponse(
"step", 1, all_brain_info_env1
)
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.envs = [env_worker_0, env_worker_1]
env.waiting = True
combined_braininfo = env.step_await()["MockBrain"]
self.assertEqual(
combined_braininfo.vector_observations.tolist(),
[[1.0, 2.0], [1.0, 2.0], [3.0, 4.0]],
)
self.assertEqual(combined_braininfo.agents, ["0-1", "0-2", "1-3"])
def test_step_resets_on_global_done(self):
env_mock = Mock()
env_mock.reset = Mock(return_value="reset_data")
env_mock.global_done = True
def mock_global_done_env_factory(worker_id: int):
return env_mock
mock_parent_connection = Mock()
step_command = EnvironmentCommand("step", (None, None, None, None))
close_command = EnvironmentCommand("close")
mock_parent_connection.recv = Mock()
mock_parent_connection.recv.side_effect = [step_command, close_command]
mock_parent_connection.send = Mock()
worker(
mock_parent_connection, cloudpickle.dumps(mock_global_done_env_factory), 0
)
# recv called twice to get step and close command
self.assertEqual(mock_parent_connection.recv.call_count, 2)
# worker returns the data from the reset
mock_parent_connection.send.assert_called_with(
EnvironmentResponse("step", 0, "reset_data")
)

224
ml-agents-envs/mlagents/envs/subprocess_environment.py


from typing import *
import copy
import numpy as np
import cloudpickle
from mlagents.envs import UnityEnvironment
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs import AllBrainInfo, UnityEnvironmentException
class EnvironmentCommand(NamedTuple):
name: str
payload: Any = None
class EnvironmentResponse(NamedTuple):
name: str
worker_id: int
payload: Any
class UnityEnvWorker(NamedTuple):
process: Process
worker_id: int
conn: Connection
def send(self, name: str, payload=None):
try:
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)
except (BrokenPipeError, EOFError):
raise KeyboardInterrupt
def recv(self) -> EnvironmentResponse:
try:
response: EnvironmentResponse = self.conn.recv()
return response
except (BrokenPipeError, EOFError):
raise KeyboardInterrupt
def close(self):
try:
self.conn.send(EnvironmentCommand("close"))
except (BrokenPipeError, EOFError):
pass
self.process.join()
def worker(parent_conn: Connection, pickled_env_factory: str, worker_id: int):
env_factory: Callable[[int], UnityEnvironment] = cloudpickle.loads(
pickled_env_factory
)
env = env_factory(worker_id)
def _send_response(cmd_name, payload):
parent_conn.send(EnvironmentResponse(cmd_name, worker_id, payload))
try:
while True:
cmd: EnvironmentCommand = parent_conn.recv()
if cmd.name == "step":
vector_action, memory, text_action, value = cmd.payload
if env.global_done:
all_brain_info = env.reset()
else:
all_brain_info = env.step(vector_action, memory, text_action, value)
_send_response("step", all_brain_info)
elif cmd.name == "external_brains":
_send_response("external_brains", env.external_brains)
elif cmd.name == "reset_parameters":
_send_response("reset_parameters", env.reset_parameters)
elif cmd.name == "reset":
all_brain_info = env.reset(cmd.payload[0], cmd.payload[1])
_send_response("reset", all_brain_info)
elif cmd.name == "global_done":
_send_response("global_done", env.global_done)
elif cmd.name == "close":
break
except KeyboardInterrupt:
print("UnityEnvironment worker: keyboard interrupt")
finally:
env.close()
class SubprocessUnityEnvironment(BaseUnityEnvironment):
def __init__(
self, env_factory: Callable[[int], BaseUnityEnvironment], n_env: int = 1
):
self.envs: List[UnityEnvWorker] = []
self.env_agent_counts: Dict[str, List[int]] = {}
self.waiting = False
for worker_id in range(n_env):
self.envs.append(self.create_worker(worker_id, env_factory))
@staticmethod
def create_worker(
worker_id: int, env_factory: Callable[[int], BaseUnityEnvironment]
) -> UnityEnvWorker:
parent_conn, child_conn = Pipe()
# Need to use cloudpickle for the env factory function since function objects aren't picklable
# on Windows as of Python 3.6.
pickled_env_factory = cloudpickle.dumps(env_factory)
child_process = Process(
target=worker, args=(child_conn, pickled_env_factory, worker_id)
)
child_process.start()
return UnityEnvWorker(child_process, worker_id, parent_conn)
def step_async(