浏览代码

Adds SubprocessUnityEnvironment for parallel envs (#1751)

This commit adds support for running Unity environments in parallel.
An abstract base class was created for UnityEnvironment which a new
SubprocessUnityEnvironment inherits from.

SubprocessUnityEnvironment communicates through a pipe in order to
send commands which will be run in parallel to its workers.

A few significant changes needed to be made as a side-effect:
* UnityEnvironments are created via a factory method (a closure)
  rather than being directly created by the main process.
* In mlagents-learn "worker-id" has been replaced by "base-port"
  and "num-envs", and worker_ids are automatically assigned across runs.
* BrainInfo objects now convert all fields to numpy arrays or lists to
  avoid serialization issues.
/develop-generalizationTraining-TrainerController
GitHub 6 年前
当前提交
93760bc4
共有 13 个文件被更改,包括 518 次插入64 次删除
  1. 2
      ml-agents-envs/mlagents/envs/__init__.py
  2. 96
      ml-agents-envs/mlagents/envs/brain.py
  3. 43
      ml-agents-envs/mlagents/envs/environment.py
  4. 9
      ml-agents-envs/mlagents/envs/tests/test_envs.py
  5. 65
      ml-agents/mlagents/trainers/learn.py
  6. 6
      ml-agents/mlagents/trainers/ppo/trainer.py
  7. 19
      ml-agents/mlagents/trainers/tests/test_learn.py
  8. 27
      ml-agents/mlagents/trainers/trainer_controller.py
  9. 33
      ml-agents-envs/mlagents/envs/base_unity_environment.py
  10. 192
      ml-agents-envs/mlagents/envs/subprocess_environment.py
  11. 90
      ml-agents/tests/envs/test_subprocess_unity_environment.py
  12. 0
      ml-agents-envs/__init__.py

2
ml-agents-envs/mlagents/envs/__init__.py


from .environment import *
from .environment import *
from .exception import *

96
ml-agents-envs/mlagents/envs/brain.py


import numpy as np
import io
from typing import Dict
from typing import Dict, List, Optional
from PIL import Image
logger = logging.getLogger("mlagents.envs")

self.action_masks = action_mask
self.custom_observations = custom_observations
def merge(self, other):
for i in range(len(self.visual_observations)):
self.visual_observations[i].extend(other.visual_observations[i])
self.vector_observations = np.append(self.vector_observations, other.vector_observations, axis=0)
self.text_observations.extend(other.text_observations)
self.memories = self.merge_memories(self.memories, other.memories, self.agents, other.agents)
self.rewards = safe_concat_lists(self.rewards, other.rewards)
self.local_done = safe_concat_lists(self.local_done, other.local_done)
self.max_reached = safe_concat_lists(self.max_reached, other.max_reached)
self.agents = safe_concat_lists(self.agents, other.agents)
self.previous_vector_actions = safe_concat_np_ndarray(
self.previous_vector_actions, other.previous_vector_actions
)
self.previous_text_actions = safe_concat_lists(
self.previous_text_actions, other.previous_text_actions
)
self.action_masks = safe_concat_np_ndarray(self.action_masks, other.action_masks)
self.custom_observations = safe_concat_lists(self.custom_observations, other.custom_observations)
@staticmethod
def merge_memories(m1, m2, agents1, agents2):
if len(m1) == 0 and len(m2) != 0:
m1 = np.zeros((len(agents1), m2.shape[1]))
elif len(m2) == 0 and len(m1) != 0:
m2 = np.zeros((len(agents2), m1.shape[1]))
elif m2.shape[1] > m1.shape[1]:
new_m1 = np.zeros((m1.shape[0], m2.shape[1]))
new_m1[0:m1.shape[0], 0:m1.shape[1]] = m1
return np.append(new_m1, m2, axis=0)
elif m1.shape[1] > m2.shape[1]:
new_m2 = np.zeros((m2.shape[0], m1.shape[1]))
new_m2[0:m2.shape[0], 0:m2.shape[1]] = m2
return np.append(m1, new_m2, axis=0)
return np.append(m1, m2, axis=0)
@staticmethod
def process_pixels(image_bytes, gray_scale):
"""

obs = [BrainInfo.process_pixels(x.visual_observations[i],
brain_params.camera_resolutions[i]['blackAndWhite'])
for x in agent_info_list]
vis_obs += [np.array(obs)]
vis_obs += [obs]
if len(agent_info_list) == 0:
memory_size = 0
else:

else:
[x.memories.extend([0] * (memory_size - len(x.memories))) for x in agent_info_list]
memory = np.array([x.memories for x in agent_info_list])
memory = np.array([list(x.memories) for x in agent_info_list])
total_num_actions = sum(brain_params.vector_action_space_size)
mask_actions = np.ones((len(agent_info_list), total_num_actions))
for agent_index, agent_info in enumerate(agent_info_list):

logger.warning("An agent had a NaN reward for brain " + brain_params.brain_name)
if any([np.isnan(x.stacked_vector_observation).any() for x in agent_info_list]):
logger.warning("An agent had a NaN observation for brain " + brain_params.brain_name)
if len(agent_info_list) == 0:
vector_obs = np.zeros(
(0, brain_params.vector_observation_space_size * brain_params.num_stacked_vector_observations)
)
else:
vector_obs = np.nan_to_num(
np.array([x.stacked_vector_observation for x in agent_info_list])
)
vector_observation=np.nan_to_num(
np.array([x.stacked_vector_observation for x in agent_info_list])),
vector_observation=vector_obs,
text_observations=[x.text_observation for x in agent_info_list],
memory=memory,
reward=[x.reward if not np.isnan(x.reward) else 0 for x in agent_info_list],

text_action=[x.stored_text_actions for x in agent_info_list],
text_action=[list(x.stored_text_actions) for x in agent_info_list],
max_reached=[x.max_step_reached for x in agent_info_list],
custom_observations=[x.custom_observation for x in agent_info_list],
action_mask=mask_actions

def safe_concat_lists(l1: Optional[List], l2: Optional[List]):
if l1 is None and l2 is None:
return None
if l1 is None and l2 is not None:
return l2.copy()
if l1 is not None and l2 is None:
return l1.copy()
else:
copy = l1.copy()
copy.extend(l2)
return copy
def safe_concat_np_ndarray(a1: Optional[np.ndarray], a2: Optional[np.ndarray]):
if a1 is not None and a1.size != 0:
if a2 is not None and a2.size != 0:
return np.append(a1, a2, axis=0)
else:
return a1.copy()
elif a2 is not None and a2.size != 0:
return a2.copy()
return None
def __init__(self, brain_name, vector_observation_space_size, num_stacked_vector_observations,
camera_resolutions, vector_action_space_size,
vector_action_descriptions, vector_action_space_type):
def __init__(self,
brain_name: str,
vector_observation_space_size: int,
num_stacked_vector_observations: int,
camera_resolutions: List[Dict],
vector_action_space_size: List[int],
vector_action_descriptions: List[str],
vector_action_space_type: int):
"""
Contains all brain-specific parameters.
"""

brain_param_proto.vector_observation_size,
brain_param_proto.num_stacked_vector_observations,
resolution,
brain_param_proto.vector_action_size,
brain_param_proto.vector_action_descriptions,
list(brain_param_proto.vector_action_size),
list(brain_param_proto.vector_action_descriptions),
return brain_params
return brain_params

43
ml-agents-envs/mlagents/envs/environment.py


import numpy as np
import os
import subprocess
from typing import *
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from .brain import AllBrainInfo, BrainInfo, BrainParameters
from .exception import UnityEnvironmentException, UnityActionException, UnityTimeOutException

logger = logging.getLogger("mlagents.envs")
class UnityEnvironment(object):
class UnityEnvironment(BaseUnityEnvironment):
def __init__(self, file_name=None, worker_id=0,
base_port=5005, seed=0,
docker_training=False, no_graphics=False,
timeout_wait=30):
def __init__(self,
file_name: Optional[str] = None,
worker_id: int = 0,
base_port: int = 5005,
seed: int = 0,
docker_training: bool = False,
no_graphics: bool = False,
timeout_wait: int = 30,
train_mode: bool = True):
"""
Starts a new unity environment and establishes a connection with the environment.
Notice: Currently communication between Unity and Python takes place over an open socket without authentication.

:bool docker_training: Informs this class whether the process is being run within a container.
:bool no_graphics: Whether to run the Unity simulator in no-graphics mode
:int timeout_wait: Time (in seconds) to wait for connection from environment.
:bool train_mode: Whether to run in training mode, speeding up the simulation, by default.
"""
atexit.register(self._close)

self._loaded = False # If true, this means the environment was successfully loaded
self.proc1 = None # The process that is started. If None, no process was started
self.communicator = self.get_communicator(worker_id, base_port, timeout_wait)
self._train_mode = train_mode
# If the environment name is None, a new environment will not be launched
# and the communicator will directly try to connect to an existing unity environment.

# TODO : think of a better way to expose the academyParameters
self._unity_version = aca_params.version
if self._unity_version != self._version_:
self._close()
raise UnityEnvironmentException(
"The API number is not compatible between Unity and python. Python API : {0}, Unity API : "
"{1}.\nPlease go to https://github.com/Unity-Technologies/ml-agents to download the latest version "

def get_communicator(worker_id, base_port, timeout_wait):
return RpcCommunicator(worker_id, base_port, timeout_wait)
@property
def external_brains(self):
external_brains = {}
for brain_name in self.external_brain_names:
external_brains[brain_name] = self.brains[brain_name]
return external_brains
@property
def reset_parameters(self):
return self._resetParameters
def executable_launcher(self, file_name, docker_training, no_graphics):
cwd = os.getcwd()
file_name = (file_name.strip()

for k in self._resetParameters])) + '\n' + \
'\n'.join([str(self._brains[b]) for b in self._brains])
def reset(self, config=None, train_mode=True, custom_reset_parameters=None) -> AllBrainInfo:
def reset(self, config=None, train_mode=None, custom_reset_parameters=None) -> AllBrainInfo:
"""
Sends a signal to reset the unity environment.
:return: AllBrainInfo : A data structure corresponding to the initial reset state of the environment.

else:
raise UnityEnvironmentException(
"The parameter '{0}' is not a valid parameter.".format(k))
if train_mode is None:
train_mode = self._train_mode
else:
self._train_mode = train_mode
if self._loaded:
outputs = self.communicator.exchange(

self.proc1.kill()
@classmethod
def _flatten(cls, arr):
def _flatten(cls, arr) -> List[float]:
"""
Converts arrays to list.
:param arr: numpy vector.

inputs.rl_initialization_input.CopyFrom(init_parameters)
return self.communicator.initialize(inputs).rl_initialization_output
def wrap_unity_input(self, rl_input: UnityRLInput) -> UnityOutput:
@staticmethod
def wrap_unity_input(rl_input: UnityRLInput) -> UnityOutput:
result = UnityInput()
result.rl_input.CopyFrom(rl_input)
return result

9
ml-agents-envs/mlagents/envs/tests/test_envs.py


BrainInfo
from mlagents.envs.mock_communicator import MockCommunicator
@mock.patch('mlagents.envs.UnityEnvironment.get_communicator')
def test_handles_bad_filename(get_communicator):
with pytest.raises(UnityEnvironmentException):

assert isinstance(brain_info['RealFakeBrain'].visual_observations, list)
assert isinstance(brain_info['RealFakeBrain'].vector_observations, np.ndarray)
assert len(brain_info['RealFakeBrain'].visual_observations) == brain.number_visual_observations
assert brain_info['RealFakeBrain'].vector_observations.shape[0] == \
assert len(brain_info['RealFakeBrain'].vector_observations) == \
assert brain_info['RealFakeBrain'].vector_observations.shape[1] == \
assert len(brain_info['RealFakeBrain'].vector_observations[0]) == \
brain.vector_observation_space_size * brain.num_stacked_vector_observations

assert isinstance(brain_info['RealFakeBrain'].visual_observations, list)
assert isinstance(brain_info['RealFakeBrain'].vector_observations, np.ndarray)
assert len(brain_info['RealFakeBrain'].visual_observations) == brain.number_visual_observations
assert brain_info['RealFakeBrain'].vector_observations.shape[0] == \
assert len(brain_info['RealFakeBrain'].vector_observations) == \
assert brain_info['RealFakeBrain'].vector_observations.shape[1] == \
assert len(brain_info['RealFakeBrain'].vector_observations[0]) == \
brain.vector_observation_space_size * brain.num_stacked_vector_observations
print("\n\n\n\n\n\n\n" + str(brain_info['RealFakeBrain'].local_done))

65
ml-agents/mlagents/trainers/learn.py


import numpy as np
import yaml
from docopt import docopt
from typing import Optional
from typing import Optional, Callable
from mlagents.trainers.trainer_controller import TrainerController

from mlagents.envs.exception import UnityEnvironmentException
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.subprocess_environment import SubprocessUnityEnvironment
def run_training(sub_id: int, run_seed: int, run_options, process_queue):

train_model = run_options['--train']
save_freq = int(run_options['--save-freq'])
keep_checkpoints = int(run_options['--keep-checkpoints'])
worker_id = int(run_options['--worker-id'])
base_port = int(run_options['--base-port'])
num_envs = int(run_options['--num-envs'])
curriculum_folder = (run_options['--curriculum']
if run_options['--curriculum'] != 'None' else None)
lesson = int(run_options['--lesson'])

docker_target_name=docker_target_name)
trainer_config = load_config(trainer_config_path)
env = init_environment(env_path, docker_target_name, no_graphics, worker_id + sub_id, fast_simulation, run_seed)
env_factory = create_environment_factory(
env_path,
docker_target_name,
no_graphics,
run_seed,
base_port + (sub_id * num_envs),
fast_simulation
)
env = SubprocessUnityEnvironment(env_factory, num_envs)
external_brains = {}
for brain_name in env.external_brain_names:
external_brains[brain_name] = env.brains[brain_name]
keep_checkpoints, lesson, external_brains,
keep_checkpoints, lesson, env.external_brains,
run_seed)
# Signal that environment has been launched.

tc.start_learning(env, trainer_config)
def try_create_meta_curriculum(curriculum_folder: Optional[str], env: UnityEnvironment) -> Optional[MetaCurriculum]:
def try_create_meta_curriculum(curriculum_folder: Optional[str], env: BaseUnityEnvironment) -> Optional[MetaCurriculum]:
meta_curriculum = MetaCurriculum(curriculum_folder, env._resetParameters)
meta_curriculum = MetaCurriculum(curriculum_folder, env.reset_parameters)
if brain_name not in env.external_brain_names:
if brain_name not in env.external_brains.keys():
raise MetaCurriculumError('One of the curricula '
'defined in ' +
curriculum_folder + ' '

.format(trainer_config_path))
def init_environment(env_path, docker_target_name, no_graphics, worker_id, fast_simulation, seed):
def create_environment_factory(
env_path: str,
docker_target_name: str,
no_graphics: bool,
seed: Optional[int],
start_port: int,
fast_simulation: bool
) -> Callable[[int], BaseUnityEnvironment]:
if env_path is not None:
# Strip out executable extensions if passed
env_path = (env_path.strip()

# Navigate in docker path and find env_path and copy it.
env_path = prepare_for_docker_run(docker_target_name,
env_path)
return UnityEnvironment(
file_name=env_path,
worker_id=worker_id,
seed=seed,
docker_training=docker_training,
no_graphics=no_graphics
)
seed_count = 10000
seed_pool = [np.random.randint(0, seed_count) for _ in range(seed_count)]
def create_unity_environment(worker_id: int) -> UnityEnvironment:
env_seed = seed
if not env_seed:
env_seed = seed_pool[worker_id % len(seed_pool)]
return UnityEnvironment(
file_name=env_path,
worker_id=worker_id,
seed=env_seed,
docker_training=docker_training,
no_graphics=no_graphics,
base_port=start_port,
train_mode=(not fast_simulation)
)
return create_unity_environment
def main():

--seed=<n> Random seed used for training [default: -1].
--slow Whether to run the game at training speed [default: False].
--train Whether to train model, or only run inference [default: False].
--worker-id=<n> Number to add to communication port (5005) [default: 0].
--base-port=<n> Base port for environment communication [default: 5005].
--num-envs=<n> Number of parallel environments to use for training [default: 1]
--docker-target-name=<dt> Docker volume to store training-specific files [default: None].
--no-graphics Whether to run the environment in no-graphics mode [default: False].
--debug Whether to run ML-Agents in debug mode with detailed logging [default: False].

6
ml-agents/mlagents/trainers/ppo/trainer.py


agents = []
prev_vector_actions = []
prev_text_actions = []
action_masks = []
for agent_id in next_info.agents:
agent_brain_info = self.training_buffer[agent_id].last_brain_info
if agent_brain_info is None:

vector_observations.append(agent_brain_info.vector_observations[agent_index])
text_observations.append(agent_brain_info.text_observations[agent_index])
if self.policy.use_recurrent:
if len(agent_brain_info.memories > 0):
if len(agent_brain_info.memories) > 0:
memories.append(agent_brain_info.memories[agent_index])
else:
memories.append(self.policy.make_empty_memory(1))

agents.append(agent_brain_info.agents[agent_index])
prev_vector_actions.append(agent_brain_info.previous_vector_actions[agent_index])
prev_text_actions.append(agent_brain_info.previous_text_actions[agent_index])
action_masks.append(agent_brain_info.action_masks[agent_index])
prev_text_actions, max_reacheds)
prev_text_actions, max_reacheds, action_masks)
return curr_info
def add_experiences(self, curr_all_info: AllBrainInfo, next_all_info: AllBrainInfo, take_action_outputs):

19
ml-agents/mlagents/trainers/tests/test_learn.py


'--train': False,
'--save-freq': '50000',
'--keep-checkpoints': '5',
'--worker-id': '0',
'--base-port': '5005',
'--num-envs': '1',
'--curriculum': 'None',
'--lesson': '0',
'--slow': False,

}
@patch('mlagents.trainers.learn.init_environment')
@patch('mlagents.trainers.learn.SubprocessUnityEnvironment')
@patch('mlagents.trainers.learn.create_environment_factory')
def test_run_training(load_config, init_environment):
def test_run_training(load_config, create_environment_factory, subproc_env_mock):
init_environment.return_value = mock_env
create_environment_factory.return_value = mock_env
trainer_config_mock = MagicMock()
load_config.return_value = trainer_config_mock

False,
5,
0,
{},
subproc_env_mock.return_value.external_brains,
@patch('mlagents.trainers.learn.init_environment')
@patch('mlagents.trainers.learn.SubprocessUnityEnvironment')
@patch('mlagents.trainers.learn.create_environment_factory')
def test_docker_target_path(load_config, init_environment):
def test_docker_target_path(load_config, create_environment_factory, subproc_env_mock):
init_environment.return_value = mock_env
create_environment_factory.return_value = mock_env
trainer_config_mock = MagicMock()
load_config.return_value = trainer_config_mock

27
ml-agents/mlagents/trainers/trainer_controller.py


import tensorflow as tf
from time import time
from mlagents.envs import AllBrainInfo, BrainInfo
from mlagents.envs import AllBrainInfo, BrainParameters
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.exception import UnityEnvironmentException
from mlagents.trainers import Trainer, Policy
from mlagents.trainers.ppo.trainer import PPOTrainer

class TrainerController(object):
def __init__(self, model_path: str, summaries_dir: str,
run_id: str, save_freq: int, meta_curriculum: Optional[MetaCurriculum],
load: bool, train: bool, keep_checkpoints: int, lesson: Optional[int],
external_brains: Dict[str, BrainInfo], training_seed: int):
def __init__(self,
model_path: str,
summaries_dir: str,
run_id: str,
save_freq: int,
meta_curriculum: Optional[MetaCurriculum],
load: bool,
train: bool,
keep_checkpoints: int,
lesson: Optional[int],
external_brains: Dict[str, BrainParameters],
training_seed: int):
"""
:param model_path: Path to save the model.
:param summaries_dir: Folder to save training summaries.

for brain_name in self.trainers.keys():
self.trainers[brain_name].export_model()
def initialize_trainers(self, trainer_config):
def initialize_trainers(self, trainer_config: Dict[str, Dict[str, str]]):
"""
Initialization of the trainers
:param trainer_config: The configurations of the trainers

'permissions are set correctly.'
.format(model_path))
def _reset_env(self, env):
def _reset_env(self, env: BaseUnityEnvironment):
"""Resets the environment.
Returns:

else:
return env.reset()
def start_learning(self, env, trainer_config):
def start_learning(self, env: BaseUnityEnvironment, trainer_config):
# TODO: Should be able to start learning at different lesson numbers
# for each curriculum.
if self.meta_curriculum is not None:

self._write_training_metrics()
self._export_graph()
def take_step(self, env, curr_info: AllBrainInfo):
def take_step(self, env: BaseUnityEnvironment, curr_info: AllBrainInfo):
if self.meta_curriculum:
# Get the sizes of the reward buffers.
reward_buff_sizes = {k: len(t.reward_buffer)

33
ml-agents-envs/mlagents/envs/base_unity_environment.py


from abc import ABC, abstractmethod
from typing import Dict
from mlagents.envs import AllBrainInfo, BrainParameters
class BaseUnityEnvironment(ABC):
@abstractmethod
def step(self, vector_action=None, memory=None, text_action=None, value=None) -> AllBrainInfo:
pass
@abstractmethod
def reset(self, config=None, train_mode=True) -> AllBrainInfo:
pass
@property
@abstractmethod
def global_done(self):
pass
@property
@abstractmethod
def external_brains(self) -> Dict[str, BrainParameters]:
pass
@property
@abstractmethod
def reset_parameters(self) -> Dict[str, str]:
pass
@abstractmethod
def close(self):
pass

192
ml-agents-envs/mlagents/envs/subprocess_environment.py


from typing import *
import copy
import numpy as np
from mlagents.envs import UnityEnvironment
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs import AllBrainInfo, UnityEnvironmentException
class EnvironmentCommand(NamedTuple):
name: str
payload: Any = None
class EnvironmentResponse(NamedTuple):
name: str
worker_id: int
payload: Any
class UnityEnvWorker(NamedTuple):
process: Process
worker_id: int
conn: Connection
def send(self, name: str, payload=None):
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)
def recv(self) -> EnvironmentResponse:
response: EnvironmentResponse = self.conn.recv()
return response
def close(self):
self.process.join()
def worker(parent_conn: Connection, env_factory: Callable[[int], UnityEnvironment], worker_id: int):
env = env_factory(worker_id)
def _send_response(cmd_name, payload):
parent_conn.send(
EnvironmentResponse(cmd_name, worker_id, payload)
)
try:
while True:
cmd: EnvironmentCommand = parent_conn.recv()
if cmd.name == 'step':
vector_action, memory, text_action, value = cmd.payload
all_brain_info = env.step(vector_action, memory, text_action, value)
_send_response('step', all_brain_info)
elif cmd.name == 'external_brains':
_send_response('external_brains', env.external_brains)
elif cmd.name == 'reset_parameters':
_send_response('reset_parameters', env.reset_parameters)
elif cmd.name == 'reset':
all_brain_info = env.reset(cmd.payload[0], cmd.payload[1])
_send_response('reset', all_brain_info)
elif cmd.name == 'global_done':
_send_response('global_done', env.global_done)
elif cmd.name == 'close':
env.close()
break
except KeyboardInterrupt:
print('UnityEnvironment worker: keyboard interrupt')
finally:
env.close()
class SubprocessUnityEnvironment(BaseUnityEnvironment):
def __init__(self,
env_factory: Callable[[int], BaseUnityEnvironment],
n_env: int = 1):
self.envs = []
self.env_agent_counts = {}
self.waiting = False
for worker_id in range(n_env):
self.envs.append(self.create_worker(worker_id, env_factory))
@staticmethod
def create_worker(
worker_id: int,
env_factory: Callable[[int], BaseUnityEnvironment]
) -> UnityEnvWorker:
parent_conn, child_conn = Pipe()
child_process = Process(target=worker, args=(child_conn, env_factory, worker_id))
child_process.start()
return UnityEnvWorker(child_process, worker_id, parent_conn)
def step_async(self, vector_action, memory=None, text_action=None, value=None) -> None:
if self.waiting:
raise UnityEnvironmentException(
'Tried to take an environment step bore previous step has completed.'
)
agent_counts_cum = {}
for brain_name in self.env_agent_counts.keys():
agent_counts_cum[brain_name] = np.cumsum(self.env_agent_counts[brain_name])
# Split the actions provided by the previous set of agent counts, and send the step
# commands to the workers.
for worker_id, env in enumerate(self.envs):
env_actions = {}
env_memory = {}
env_text_action = {}
env_value = {}
for brain_name in self.env_agent_counts.keys():
start_ind = 0
if worker_id > 0:
start_ind = agent_counts_cum[brain_name][worker_id - 1]
end_ind = agent_counts_cum[brain_name][worker_id]
if vector_action.get(brain_name) is not None:
env_actions[brain_name] = vector_action[brain_name][start_ind:end_ind]
if memory and memory.get(brain_name) is not None:
env_memory[brain_name] = memory[brain_name][start_ind:end_ind]
if text_action and text_action.get(brain_name) is not None:
env_text_action[brain_name] = text_action[brain_name][start_ind:end_ind]
if value and value.get(brain_name) is not None:
env_value[brain_name] = value[brain_name][start_ind:end_ind]
env.send('step', (env_actions, env_memory, env_text_action, env_value))
self.waiting = True
def step_await(self) -> AllBrainInfo:
if not self.waiting:
raise UnityEnvironmentException('Tried to await an environment step, but no async step was taken.')
steps = [self.envs[i].recv() for i in range(len(self.envs))]
self._get_agent_counts(map(lambda s: s.payload, steps))
combined_brain_info = self._merge_step_info(steps)
self.waiting = False
return combined_brain_info
def step(self, vector_action=None, memory=None, text_action=None, value=None) -> AllBrainInfo:
self.step_async(vector_action, memory, text_action, value)
return self.step_await()
def reset(self, config=None, train_mode=True) -> AllBrainInfo:
self._broadcast_message('reset', (config, train_mode))
reset_results = [self.envs[i].recv() for i in range(len(self.envs))]
self._get_agent_counts(map(lambda r: r.payload, reset_results))
return self._merge_step_info(reset_results)
@property
def global_done(self):
self._broadcast_message('global_done')
dones: List[EnvironmentResponse] = [
self.envs[i].recv().payload for i in range(len(self.envs))
]
return all(dones)
@property
def external_brains(self):
self.envs[0].send('external_brains')
return self.envs[0].recv().payload
@property
def reset_parameters(self):
self.envs[0].send('reset_parameters')
return self.envs[0].recv().payload
def close(self):
for env in self.envs:
env.close()
def _get_agent_counts(self, step_list: Iterable[AllBrainInfo]):
for i, step in enumerate(step_list):
for brain_name, brain_info in step.items():
if brain_name not in self.env_agent_counts.keys():
self.env_agent_counts[brain_name] = [0] * len(self.envs)
self.env_agent_counts[brain_name][i] = len(brain_info.agents)
@staticmethod
def _merge_step_info(env_steps: List[EnvironmentResponse]) -> AllBrainInfo:
accumulated_brain_info: AllBrainInfo = None
for env_step in env_steps:
all_brain_info: AllBrainInfo = env_step.payload
for brain_name, brain_info in all_brain_info.items():
for i in range(len(brain_info.agents)):
brain_info.agents[i] = str(env_step.worker_id) + '-' + str(brain_info.agents[i])
if accumulated_brain_info:
accumulated_brain_info[brain_name].merge(brain_info)
if not accumulated_brain_info:
accumulated_brain_info = copy.deepcopy(all_brain_info)
return accumulated_brain_info
def _broadcast_message(self, name: str, payload = None):
for env in self.envs:
env.send(name, payload)

90
ml-agents/tests/envs/test_subprocess_unity_environment.py


import unittest.mock as mock
from unittest.mock import MagicMock
import unittest
from mlagents.envs.subprocess_environment import *
from mlagents.envs import UnityEnvironmentException, BrainInfo
def mock_env_factory(worker_id: int):
return mock.create_autospec(spec=BaseUnityEnvironment)
class MockEnvWorker:
def __init__(self, worker_id):
self.worker_id = worker_id
self.process = None
self.conn = None
self.send = MagicMock()
self.recv = MagicMock()
class SubprocessEnvironmentTest(unittest.TestCase):
def test_environments_are_created(self):
SubprocessUnityEnvironment.create_worker = MagicMock()
env = SubprocessUnityEnvironment(mock_env_factory, 2)
# Creates two processes
self.assertEqual(env.create_worker.call_args_list, [
mock.call(0, mock_env_factory),
mock.call(1, mock_env_factory)
])
self.assertEqual(len(env.envs), 2)
def test_step_async_fails_when_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.waiting = True
with self.assertRaises(UnityEnvironmentException):
env.step_async(vector_action=[])
@staticmethod
def test_step_async_splits_input_by_agent_count():
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.env_agent_counts = {
'MockBrain': [1, 3, 5]
}
env.envs = [
MockEnvWorker(0),
MockEnvWorker(1),
MockEnvWorker(2),
]
env_0_actions = [[1.0, 2.0]]
env_1_actions = ([[3.0, 4.0]] * 3)
env_2_actions = ([[5.0, 6.0]] * 5)
vector_action = {
'MockBrain': env_0_actions + env_1_actions + env_2_actions
}
env.step_async(vector_action=vector_action)
env.envs[0].send.assert_called_with('step', ({'MockBrain': env_0_actions}, {}, {}, {}))
env.envs[1].send.assert_called_with('step', ({'MockBrain': env_1_actions}, {}, {}, {}))
env.envs[2].send.assert_called_with('step', ({'MockBrain': env_2_actions}, {}, {}, {}))
def test_step_async_sets_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.step_async(vector_action=[])
self.assertTrue(env.waiting)
def test_step_await_fails_if_not_waiting(self):
env = SubprocessUnityEnvironment(mock_env_factory, 0)
with self.assertRaises(UnityEnvironmentException):
env.step_await()
def test_step_await_combines_brain_info(self):
all_brain_info_env0 = {
'MockBrain': BrainInfo([], [[1.0, 2.0], [1.0, 2.0]], [], agents=[1, 2], memory=np.zeros((0,0)))
}
all_brain_info_env1 = {
'MockBrain': BrainInfo([], [[3.0, 4.0]], [], agents=[3], memory=np.zeros((0,0)))
}
env_worker_0 = MockEnvWorker(0)
env_worker_0.recv.return_value = EnvironmentResponse('step', 0, all_brain_info_env0)
env_worker_1 = MockEnvWorker(1)
env_worker_1.recv.return_value = EnvironmentResponse('step', 1, all_brain_info_env1)
env = SubprocessUnityEnvironment(mock_env_factory, 0)
env.envs = [env_worker_0, env_worker_1]
env.waiting = True
combined_braininfo = env.step_await()['MockBrain']
self.assertEqual(
combined_braininfo.vector_observations.tolist(),
[[1.0, 2.0], [1.0, 2.0], [3.0, 4.0]]
)
self.assertEqual(combined_braininfo.agents, ['0-1', '0-2', '1-3'])

0
ml-agents-envs/__init__.py

正在加载...
取消
保存