浏览代码

Develop new ll api (#3022)

* initial commit for LL-API

* fixing ml-agents-envs tests

* Implementing action masks

* training is fixed for 3DBall

* Tests all fixed, gym is broken and missing documentation changes

* adding case where no vector obs

* Fixed Gym

* fixing tests of float64

* fixing float64

* reverting some of brain.py

* removing old proto apis

* comment type fixes

* added properties to AgentGroupSpec and edited the notebooks.

* clearing the notebook outputs

* Update gym-unity/gym_unity/tests/test_gym.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update gym-unity/gym_unity/tests/test_gym.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update ml-agents-envs/mlagents/envs/base_env.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update ml-agents-envs/mlagents/envs/base_env.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* addressing first comments

* NaN checks for r...
/develop
GitHub 5 年前
当前提交
a6df9f43
共有 22 个文件被更改,包括 1298 次插入657 次删除
  1. 1
      docs/Migrating.md
  2. 202
      docs/Python-API.md
  3. 40
      gym-unity/gym_unity/envs/__init__.py
  4. 87
      gym-unity/gym_unity/tests/test_gym.py
  5. 46
      ml-agents-envs/mlagents/envs/brain.py
  6. 327
      ml-agents-envs/mlagents/envs/environment.py
  7. 42
      ml-agents-envs/mlagents/envs/simple_env_manager.py
  8. 44
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py
  9. 82
      ml-agents-envs/mlagents/envs/tests/test_envs.py
  10. 4
      ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py
  11. 4
      ml-agents/mlagents/trainers/learn.py
  12. 23
      ml-agents/mlagents/trainers/tests/test_bc.py
  13. 36
      ml-agents/mlagents/trainers/tests/test_ppo.py
  14. 112
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  15. 4
      notebooks/getting-started-gym.ipynb
  16. 80
      notebooks/getting-started.ipynb
  17. 301
      ml-agents-envs/mlagents/envs/base_env.py
  18. 70
      ml-agents-envs/mlagents/envs/brain_conversion_utils.py
  19. 165
      ml-agents-envs/mlagents/envs/rpc_utils.py
  20. 187
      ml-agents-envs/mlagents/envs/tests/test_rpc_utils.py
  21. 73
      ml-agents-envs/mlagents/envs/tests/test_brain.py
  22. 25
      ml-agents-envs/mlagents/envs/base_unity_environment.py

1
docs/Migrating.md


## Migrating from master to develop
### Important changes
* The low level Python API has changed. You can look at the document [Low Level Python API documentation](Python-API.md) for more information. This should only affect you if you're writing a custom trainer; if you use `mlagents-learn` for training, this should be a transparent change.
* `CustomResetParameters` are now removed.
* `reset()` on the Low-Level Python API no longer takes a `train_mode` argument. To modify the performance/speed of the engine, you must use an `EngineConfigurationChannel`
* `reset()` on the Low-Level Python API no longer takes a `config` argument. `UnityEnvironment` no longer has a `reset_parameters` field. To modify float properties in the environment, you must use a `FloatPropertiesChannel`. For more information, refer to the [Low Level Python API documentation](Python-API.md)

202
docs/Python-API.md


# Unity ML-Agents Python Interface and Trainers
The `mlagents` Python package is part of the [ML-Agents
Toolkit](https://github.com/Unity-Technologies/ml-agents). `mlagents` provides a
Python API that allows direct interaction with the Unity game engine as well as
a collection of trainers and algorithms to train agents in Unity environments.
# Unity ML-Agents Python Low Level API
The `mlagents` Python package contains two components: a low level API which
allows you to interact directly with a Unity Environment (`mlagents.envs`) and

You can use the Python Low Level API to interact directly with your learning
environment, and use it to develop new learning algorithms.
The ML-Agents Toolkit provides a Python API for controlling the Agent simulation
The ML-Agents Toolkit Low Level API is a Python API for controlling the simulation
loop of an environment or game built with Unity. This API is used by the
training algorithms inside the ML-Agent Toolkit, but you can also write your own
Python programs using this API. Go [here](../notebooks/getting-started.ipynb)

- **UnityEnvironment** — the main interface between the Unity application and
your code. Use UnityEnvironment to start and control a simulation or training
session.
- **BrainInfo** — contains all the data from Agents in the simulation, such as
observations and rewards.
- **BrainParameters** — describes the data elements in a BrainInfo object. For
example, provides the array length of an observation in BrainInfo.
- **BatchedStepResult** — contains the data from Agents belonging to the same
"AgentGroup" in the simulation, such as observations and rewards.
- **AgentGroupSpec** — describes the shape of the data inside a BatchedStepResult.
For example, provides the dimensions of the observations of a group.
These classes are all defined in the [base_env](../ml-agents-envs/mlagents/envs/base_env.py)
script.
These classes are all defined in the `ml-agents/mlagents/envs` folder of
the ML-Agents SDK.
An Agent Group is a group of Agents identified by a string name that share the same
observations and action types. You can think about Agent Group as a group of agents
that will share the same policy or behavior. All Agents in a group have the same goal
and reward signals.
Agent must use a LearningBrain.
Your code is expected to return
actions for Agents with LearningBrains.
Agent in the simulation must have `Behavior Parameters` set to communicate. You
must set the `Behavior Type` to `Default` and give it a `Behavior Name`.
__Note__: The `Behavior Name` corresponds to the Agent Group name on Python.
_Notice: Currently communication between Unity and Python takes place over an
open socket without authentication. As such, please make sure that the network

### Loading a Unity Environment
## Loading a Unity Environment
Python-side communication happens through `UnityEnvironment` which is located in
`ml-agents/mlagents/envs`. To load a Unity environment from a built binary

```python
from mlagents.envs.environment import UnityEnvironment
env = UnityEnvironment(file_name="3DBall", worker_id=0, seed=1)
env = UnityEnvironment(file_name="3DBall", base_port=5005, seed=1, side_channels=[])
```
- `file_name` is the name of the environment binary (located in the root

training process. In environments which do not involve physics calculations,
setting the seed enables reproducible experimentation by ensuring that the
environment and trainers utilize the same random seed.
- `side_channels` provides a way to exchange data with the Unity simulation that
is not related to the reinforcement learning loop. For example: configurations
or properties. More on them in the [Modifying the environment from Python](Python-API.md#modifying-the-environment-from-python) section.
If you want to directly interact with the Editor, you need to use
`file_name=None`, then press the :arrow_forward: button in the Editor when the

### Interacting with a Unity Environment
A BrainInfo object contains the following fields:
#### The BaseEnv interface
- **`visual_observations`** : A list of 4 dimensional numpy arrays. Matrix n of
the list corresponds to the n<sup>th</sup> observation of the Brain.
- **`vector_observations`** : A two dimensional numpy array of dimension `(batch
size, vector observation size)`.
- **`rewards`** : A list as long as the number of Agents using the Brain
containing the rewards they each obtained at the previous step.
- **`local_done`** : A list as long as the number of Agents using the Brain
containing `done` flags (whether or not the Agent is done).
- **`max_reached`** : A list as long as the number of Agents using the Brain
containing true if the Agents reached their max steps.
- **`agents`** : A list of the unique ids of the Agents using the Brain.
A `BaseEnv` has the following methods:
Once loaded, you can use your UnityEnvironment object, which referenced by a
variable named `env` in this example, can be used in the following way:
- **Reset : `env.reset()`** Sends a signal to reset the environment. Returns None.
- **Step : `env.step()`** Sends a signal to step the environment. Returns None.
Note that a "step" for Python does not correspond to either Unity `Update` nor
`FixedUpdate`. When `step()` or `reset()` is called, the Unity simulation will
move forward until an Agent in the simulation needs a input from Python to act.
- **Close : `env.close()`** Sends a shutdown signal to the environment and terminates
the communication.
- **Get Agent Group Names : `env.get_agent_groups()`** Returns a list of agent group ids.
Note that the number of groups can change over time in the simulation if new
agent groups are created in the simulation.
- **Get Agent Group Spec : `env.get_agent_group_spec(agent_group: str)`** Returns
the `AgentGroupSpec` corresponding to the agent_group given as input. An
`AgentGroupSpec` contains information such as the observation shapes, the action
type (multi-discrete or continuous) and the action shape. Note that the `AgentGroupSpec`
for a specific group is fixed throughout the simulation.
- **Get Batched Step Result for Agent Group : `env.get_step_result(agent_group: str)`**
Returns a `BatchedStepResult` corresponding to the agent_group given as input.
A `BatchedStepResult` contains information about the state of the agents in a group
such as the observations, the rewards, the done flags and the agent identifiers. The
data is in `np.array` of which the first dimension is always the number of agents which
requested a decision in the simulation since the last call to `env.step()` note that the
number of agents is not guaranteed to remain constant during the simulation.
- **Set Actions for Agent Group :`env.set_actions(agent_group: str, action: np.array)`**
Sets the actions for a whole agent group. `action` is a 2D `np.array` of `dtype=np.int32`
in the discrete action case and `dtype=np.float32` in the continuous action case.
The first dimension of `action` is the number of agents that requested a decision
since the last call to `env.step()`. The second dimension is the number of discrete actions
in multi-discrete action type and the number of actions in continuous action type.
- **Set Action for Agent : `env.set_action_for_agent(agent_group: str, agent_id: int, action: np.array)`**
Sets the action for a specific Agent in an agent group. `agent_group` is the name of the
group the Agent belongs to and `agent_id` is the integer identifier of the Agent. Action
is a 1D array of type `dtype=np.int32` and size equal to the number of discrete actions
in multi-discrete action type and of type `dtype=np.float32` and size equal to the number
of actions in continuous action type.
- **Print : `print(str(env))`**
Prints all parameters relevant to the loaded environment and the
Brains.
- **Reset : `env.reset()`**
Send a reset signal to the environment, and provides a dictionary mapping
Brain names to BrainInfo objects.
- **Step : `env.step(action)`**
Sends a step signal to the environment using the actions. For each Brain :
- `action` can be one dimensional arrays or two dimensional arrays if you have
multiple Agents per Brain.
__Note:__ If no action is provided for an agent group between two calls to `env.step()` then
the default action will be all zeros (in either discrete or continuous action space)
#### BathedStepResult and StepResult
A `BatchedStepResult` has the following fields :
- `obs` is a list of numpy arrays observations collected by the group of
agent. The first dimension of the array corresponds to the batch size of
the group (number of agents requesting a decision since the last call to
`env.step()`).
- `reward` is a float vector of length batch size. Corresponds to the
rewards collected by each agent since the last simulation step.
- `done` is an array of booleans of length batch size. Is true if the
associated Agent was terminated during the last simulation step.
- `max_step` is an array of booleans of length batch size. Is true if the
associated Agent reached its maximum number of steps during the last
simulation step.
- `agent_id` is an int vector of length batch size containing unique
identifier for the corresponding Agent. This is used to track Agents
across simulation steps.
- `action_mask` is an optional list of two dimensional array of booleans.
Only available in multi-discrete action space type.
Each array corresponds to an action branch. The first dimension of each
array is the batch size and the second contains a mask for each action of
the branch. If true, the action is not available for the agent during
this simulation step.
It also has the two following methods:
- `n_agents()` Returns the number of agents requesting a decision since
the last call to `env.step()`
- `get_agent_step_result(agent_id: int)` Returns a `StepResult`
for the Agent with the `agent_id` unique identifier.
Returns a dictionary mapping Brain names to BrainInfo objects.
A `StepResult` has the following fields:
- `obs` is a list of numpy arrays observations collected by the group of
agent. (Each array has one less dimension than the arrays in `BatchedStepResult`)
- `reward` is a float. Corresponds to the rewards collected by the agent
since the last simulation step.
- `done` is a bool. Is true if the Agent was terminated during the last
simulation step.
- `max_step` is a bool. Is true if the Agent reached its maximum number of
steps during the last simulation step.
- `agent_id` is an int and an unique identifier for the corresponding Agent.
- `action_mask` is an optional list of one dimensional array of booleans.
Only available in multi-discrete action space type.
Each array corresponds to an action branch. Each array contains a mask
for each action of the branch. If true, the action is not available for
the agent during this simulation step.
For example, to access the BrainInfo belonging to a Brain called
'brain_name', and the BrainInfo field 'vector_observations':
#### AgentGroupSpec
```python
info = env.step()
brainInfo = info['brain_name']
observations = brainInfo.vector_observations
```
An Agent group can either have discrete or continuous actions. To check which type
it is, use `spec.is_action_discrete()` or `spec.is_action_continuous()` to see
which one it is. If discrete, the action tensors are expected to be `np.int32`. If
continuous, the actions are expected to be `np.float32`.
Note that if you have more than one LearningBrain in the scene, you
must provide dictionaries from Brain names to arrays for `action`, `memory`
and `value`. For example: If you have two Learning Brains named `brain1` and
`brain2` each with one Agent taking two continuous actions, then you can
have:
An `AgentGroupSpec` has the following fields :
```python
action = {'brain1':[1.0, 2.0], 'brain2':[3.0,4.0]}
```
- `observation_shapes` is a List of Tuples of int : Each Tuple corresponds
to an observation's dimensions (without the number of agents dimension).
The shape tuples have the same ordering as the ordering of the
BatchedStepResult and StepResult.
- `action_type` is the type of data of the action. it can be discrete or
continuous. If discrete, the action tensors are expected to be `np.int32`. If
continuous, the actions are expected to be `np.float32`.
- `action_size` is an `int` corresponding to the expected dimension of the action
array.
- In continuous action space it is the number of floats that constitute the action.
- In discrete action space (same as multi-discrete) it corresponds to the
number of branches (the number of independent actions)
- `discrete_action_branches` is a Tuple of int only for discrete action space. Each int
corresponds to the number of different options for each branch of the action.
For example : In a game direction input (no movement, left, right) and jump input
(no jump, jump) there will be two branches (direction and jump), the first one with 3
options and the second with 2 options. (`action_size = 2` and
`discrete_action_branches = (3,2,)`)
Returns a dictionary mapping Brain names to BrainInfo objects.
- **Close : `env.close()`**
Sends a shutdown signal to the environment and closes the communication
socket.
### Modifying the environment from Python
The Environment can be modified by using side channels to send data to the

var sharedProperties = academy.FloatProperties;
float property1 = sharedProperties.GetPropertyWithDefault("parameter_1", 0.0f);
```
## mlagents-learn
For more detailed documentation on using `mlagents-learn`, check out
[Training ML-Agents](Training-ML-Agents.md)

40
gym-unity/gym_unity/envs/__init__.py


import numpy as np
from mlagents.envs.environment import UnityEnvironment
from gym import error, spaces
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
class UnityGymException(error.Error):

)
# Take a single step so that the brain information will be sent over
if not self._env.brains:
if not self._env.get_agent_groups():
self.name = self._env.academy_name
self.visual_obs = None
self._current_state = None
self._n_agents = None

self._allow_multiple_visual_obs = allow_multiple_visual_obs
# Check brain configuration
if len(self._env.brains) != 1:
if len(self._env.get_agent_groups()) != 1:
if len(self._env.external_brain_names) <= 0:
raise UnityGymException(
"There are not any external brain in the UnityEnvironment"
)
self.brain_name = self._env.external_brain_names[0]
brain = self._env.brains[self.brain_name]
self.brain_name = self._env.get_agent_groups()[0]
self.name = self.brain_name
brain = group_spec_to_brain_parameters(
self.brain_name, self._env.get_agent_group_spec(self.brain_name)
)
if use_visual and brain.number_visual_observations == 0:
raise UnityGymException(

)
# Check for number of agents in scene.
initial_info = self._env.reset()[self.brain_name]
self._env.reset()
initial_info = step_result_to_brain_info(
self._env.get_step_result(self.brain_name),
self._env.get_agent_group_spec(self.brain_name),
)
self._check_agents(len(initial_info.agents))
# Set observation and action spaces

Returns: observation (object/list): the initial observation of the
space.
"""
info = self._env.reset()[self.brain_name]
self._env.reset()
info = step_result_to_brain_info(
self._env.get_step_result(self.brain_name),
self._env.get_agent_group_spec(self.brain_name),
)
n_agents = len(info.agents)
self._check_agents(n_agents)
self.game_over = False

# Translate action into list
action = self._flattener.lookup_action(action)
info = self._env.step(action)[self.brain_name]
spec = self._env.get_agent_group_spec(self.brain_name)
action = np.array(action).reshape((self._n_agents, spec.action_size))
self._env.set_actions(self.brain_name, action)
self._env.step()
info = step_result_to_brain_info(
self._env.get_step_result(self.brain_name), spec
)
n_agents = len(info.agents)
self._check_agents(n_agents)
self._current_state = info

87
gym-unity/gym_unity/tests/test_gym.py


from gym import spaces
from gym_unity.envs import UnityEnv, UnityGymException
from mlagents.envs.brain import CameraResolution
from mlagents.envs.base_env import AgentGroupSpec, ActionType, BatchedStepResult
mock_brain = create_mock_brainparams()
mock_braininfo = create_mock_vector_braininfo()
mock_brain = create_mock_group_spec()
mock_braininfo = create_mock_vector_step_result()
setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo)
env = UnityEnv(" ", use_visual=False, multiagent=False)

assert env.observation_space.contains(obs)
assert isinstance(obs, np.ndarray)
assert isinstance(rew, float)
assert isinstance(done, bool)
assert isinstance(done, (bool, np.bool_))
mock_brain = create_mock_brainparams()
mock_braininfo = create_mock_vector_braininfo(num_agents=2)
mock_brain = create_mock_group_spec()
mock_braininfo = create_mock_vector_step_result(num_agents=2)
setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo)
with pytest.raises(UnityGymException):

@mock.patch("gym_unity.envs.UnityEnvironment")
def test_branched_flatten(mock_env):
mock_brain = create_mock_brainparams(
mock_brain = create_mock_group_spec(
mock_braininfo = create_mock_vector_braininfo(num_agents=1)
mock_braininfo = create_mock_vector_step_result(num_agents=1)
setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo)
env = UnityEnv(" ", use_visual=False, multiagent=False, flatten_branched=True)

@pytest.mark.parametrize("use_uint8", [True, False], ids=["float", "uint8"])
@mock.patch("gym_unity.envs.UnityEnvironment")
def test_gym_wrapper_visual(mock_env, use_uint8):
mock_brain = create_mock_brainparams(number_visual_observations=1)
mock_braininfo = create_mock_vector_braininfo(number_visual_observations=1)
mock_brain = create_mock_group_spec(number_visual_observations=1)
mock_braininfo = create_mock_vector_step_result(number_visual_observations=1)
setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo)
env = UnityEnv(" ", use_visual=True, multiagent=False, uint8_visual=use_uint8)

assert env.observation_space.contains(obs)
assert isinstance(obs, np.ndarray)
assert isinstance(rew, float)
assert isinstance(done, bool)
assert isinstance(done, (bool, np.bool_))
assert isinstance(info, dict)

def create_mock_brainparams(
def create_mock_group_spec(
number_visual_observations=0,
vector_action_space_type="continuous",
vector_observation_space_size=3,

Creates a mock BrainParameters object with parameters.
"""
# Avoid using mutable object as default param
if vector_action_space_size is None:
vector_action_space_size = [2]
mock_brain = mock.Mock()
mock_brain.return_value.number_visual_observations = number_visual_observations
if number_visual_observations:
mock_brain.return_value.camera_resolutions = [
CameraResolution(width=8, height=8, num_channels=3)
for _ in range(number_visual_observations)
]
act_type = ActionType.DISCRETE
if vector_action_space_type == "continuous":
act_type = ActionType.CONTINUOUS
if vector_action_space_size is None:
vector_action_space_size = 2
else:
vector_action_space_size = vector_action_space_size[0]
else:
if vector_action_space_size is None:
vector_action_space_size = (2,)
else:
vector_action_space_size = tuple(vector_action_space_size)
obs_shapes = [(vector_observation_space_size,)]
for i in range(number_visual_observations):
obs_shapes += [(8, 8, 3)]
return AgentGroupSpec(obs_shapes, act_type, vector_action_space_size)
mock_brain.return_value.vector_action_space_type = vector_action_space_type
mock_brain.return_value.vector_observation_space_size = (
vector_observation_space_size
)
mock_brain.return_value.vector_action_space_size = vector_action_space_size
return mock_brain()
def create_mock_vector_braininfo(num_agents=1, number_visual_observations=0):
def create_mock_vector_step_result(num_agents=1, number_visual_observations=0):
"""
Creates a mock BrainInfo with vector observations. Imitates constant
vector observations, rewards, dones, and agents.

mock_braininfo = mock.Mock()
mock_braininfo.return_value.vector_observations = np.array([num_agents * [1, 2, 3]])
obs = [np.array([num_agents * [1, 2, 3]])]
mock_braininfo.return_value.visual_observations = [
[np.zeros(shape=(8, 8, 3), dtype=np.float32)]
]
mock_braininfo.return_value.rewards = num_agents * [1.0]
mock_braininfo.return_value.local_done = num_agents * [False]
mock_braininfo.return_value.agents = range(0, num_agents)
return mock_braininfo()
obs += [np.zeros(shape=(num_agents, 8, 8, 3), dtype=np.float32)]
rewards = np.array(num_agents * [1.0])
done = np.array(num_agents * [False])
agents = np.array(range(0, num_agents))
return BatchedStepResult(obs, rewards, done, done, agents, None)
def setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo):
def setup_mock_unityenvironment(mock_env, mock_spec, mock_result):
:Mock mock_brain: A mock Brain object that specifies the params of this environment.
:Mock mock_braininfo: A mock BrainInfo object that will be returned at each step and reset.
:Mock mock_spec: An AgentGroupSpec object that specifies the params of this environment.
:Mock mock_result: A BatchedStepResult object that will be returned at each step and reset.
mock_env.return_value.academy_name = "MockAcademy"
mock_env.return_value.brains = {"MockBrain": mock_brain}
mock_env.return_value.external_brain_names = ["MockBrain"]
mock_env.return_value.reset.return_value = {"MockBrain": mock_braininfo}
mock_env.return_value.step.return_value = {"MockBrain": mock_braininfo}
mock_env.return_value.get_agent_groups.return_value = ["MockBrain"]
mock_env.return_value.get_agent_group_spec.return_value = mock_spec
mock_env.return_value.get_step_result.return_value = mock_result

46
ml-agents-envs/mlagents/envs/brain.py


from mlagents.envs.communicator_objects.brain_parameters_pb2 import BrainParametersProto
from mlagents.envs.communicator_objects.observation_pb2 import ObservationProto
from mlagents.envs.timers import hierarchical_timer, timed
from typing import Dict, List, NamedTuple, Optional
from typing import Dict, List, NamedTuple
from PIL import Image
logger = logging.getLogger("mlagents.envs")

self.max_reached = max_reached
self.agents = agents
self.action_masks = action_mask
@staticmethod
def merge_memories(m1, m2, agents1, agents2):
if len(m1) == 0 and len(m2) != 0:
m1 = np.zeros((len(agents1), m2.shape[1]), dtype=np.float32)
elif len(m2) == 0 and len(m1) != 0:
m2 = np.zeros((len(agents2), m1.shape[1]), dtype=np.float32)
elif m2.shape[1] > m1.shape[1]:
new_m1 = np.zeros((m1.shape[0], m2.shape[1]), dtype=np.float32)
new_m1[0 : m1.shape[0], 0 : m1.shape[1]] = m1
return np.append(new_m1, m2, axis=0)
elif m1.shape[1] > m2.shape[1]:
new_m2 = np.zeros((m2.shape[0], m1.shape[1]), dtype=np.float32)
new_m2[0 : m2.shape[0], 0 : m2.shape[1]] = m2
return np.append(m1, new_m2, axis=0)
return np.append(m1, m2, axis=0)
@staticmethod
@timed

f"An agent had a NaN observation for brain {brain_params.brain_name}"
)
return vector_obs
def safe_concat_lists(l1: Optional[List], l2: Optional[List]) -> Optional[List]:
if l1 is None:
if l2 is None:
return None
else:
return l2.copy()
else:
if l2 is None:
return l1.copy()
else:
copy = l1.copy()
copy.extend(l2)
return copy
def safe_concat_np_ndarray(
a1: Optional[np.ndarray], a2: Optional[np.ndarray]
) -> Optional[np.ndarray]:
if a1 is not None and a1.size != 0:
if a2 is not None and a2.size != 0:
return np.append(a1, a2, axis=0)
else:
return a1.copy()
elif a2 is not None and a2.size != 0:
return a2.copy()
return None
# Renaming of dictionary of brain name to BrainInfo for clarity

327
ml-agents-envs/mlagents/envs/environment.py


from typing import Dict, List, Optional, Any
from mlagents.envs.side_channel.side_channel import SideChannel
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import (
BaseEnv,
BatchedStepResult,
AgentGroupSpec,
AgentGroup,
AgentId,
)
from .brain import AllBrainInfo, BrainInfo, BrainParameters
)
from mlagents.envs.rpc_utils import (
agent_group_spec_from_proto,
batched_step_result_from_proto,
)
from mlagents.envs.communicator_objects.unity_rl_input_pb2 import UnityRLInputProto

logger = logging.getLogger("mlagents.envs")
class UnityEnvironment(BaseUnityEnvironment):
class UnityEnvironment(BaseEnv):
SCALAR_ACTION_TYPES = (int, np.int32, np.int64, float, np.float32, np.float64)
SINGLE_BRAIN_ACTION_TYPES = SCALAR_ACTION_TYPES + (list, np.ndarray)
API_VERSION = "API-12"

"{1}.\nPlease go to https://github.com/Unity-Technologies/ml-agents to download the latest version "
"of ML-Agents.".format(self._version_, self._unity_version)
)
self._n_agents: Dict[str, int] = {}
self._env_state: Dict[str, BatchedStepResult] = {}
self._env_specs: Dict[str, AgentGroupSpec] = {}
self._env_actions: Dict[str, np.ndarray] = {}
self._academy_name = aca_params.name
self._log_path = aca_params.log_path
self._brains: Dict[str, BrainParameters] = {}
self._external_brain_names: List[str] = []
self._num_external_brains = 0
self._update_brain_parameters(aca_output)
logger.info(
"\n'{0}' started successfully!\n{1}".format(self._academy_name, str(self))
)
@property
def logfile_path(self):
return self._log_path
@property
def brains(self):
return self._brains
@property
def academy_name(self):
return self._academy_name
@property
def number_external_brains(self):
return self._num_external_brains
@property
def external_brain_names(self):
return self._external_brain_names
self._update_group_specs(aca_output)
@property
def external_brains(self):
external_brains = {}
for brain_name in self.external_brain_names:
external_brains[brain_name] = self.brains[brain_name]
return external_brains
def executable_launcher(self, file_name, docker_training, no_graphics, args):
cwd = os.getcwd()

shell=True,
)
def __str__(self):
return """Unity Academy name: {0}""".format(self._academy_name)
def _update_group_specs(self, output: UnityOutputProto) -> None:
init_output = output.rl_initialization_output
for brain_param in init_output.brain_parameters:
# Each BrainParameter in the rl_initialization_output should have at least one AgentInfo
# Get that agent, because we need some of its observations.
agent_infos = output.rl_output.agentInfos[brain_param.brain_name]
if agent_infos.value:
agent = agent_infos.value[0]
new_spec = agent_group_spec_from_proto(brain_param, agent)
self._env_specs[brain_param.brain_name] = new_spec
logger.info(f"Connected new brain:\n{brain_param.brain_name}")
def reset(self) -> AllBrainInfo:
def _update_state(self, output: UnityRLOutputProto) -> None:
Sends a signal to reset the unity environment.
:return: AllBrainInfo : A data structure corresponding to the initial reset state of the environment.
Collects experience information from all external brains in environment at current step.
for brain_name in self._env_specs.keys():
if brain_name in output.agentInfos:
agent_info_list = output.agentInfos[brain_name].value
self._env_state[brain_name] = batched_step_result_from_proto(
agent_info_list, self._env_specs[brain_name]
)
else:
self._env_state[brain_name] = BatchedStepResult.empty(
self._env_specs[brain_name]
)
self._parse_side_channel_message(self.side_channels, output.side_channel)
def reset(self) -> None:
self._update_brain_parameters(outputs)
self._update_group_specs(outputs)
s = self._get_state(rl_output)
for _b in self._external_brain_names:
self._n_agents[_b] = len(s[_b].agents)
self._update_state(rl_output)
return s
self._env_actions.clear()
def step(
self,
vector_action: Dict[str, np.ndarray] = None,
value: Optional[Dict[str, np.ndarray]] = None,
) -> AllBrainInfo:
"""
Provides the environment with an action, moves the environment dynamics forward accordingly,
and returns observation, state, and reward information to the agent.
:param value: Value estimates provided by agents.
:param vector_action: Agent's vector action. Can be a scalar or vector of int/floats.
:param memory: Vector corresponding to memory used for recurrent policies.
:return: AllBrainInfo : A Data structure corresponding to the new state of the environment.
"""
def step(self) -> None:
vector_action = {} if vector_action is None else vector_action
value = {} if value is None else value
# Check that environment is loaded, and episode is currently running.
else:
if isinstance(vector_action, self.SINGLE_BRAIN_ACTION_TYPES):
if self._num_external_brains == 1:
vector_action = {self._external_brain_names[0]: vector_action}
elif self._num_external_brains > 1:
raise UnityActionException(
"You have {0} brains, you need to feed a dictionary of brain names a keys, "
"and vector_actions as values".format(self._num_external_brains)
)
else:
raise UnityActionException(
"There are no external brains in the environment, "
"step cannot take a vector_action input"
)
# fill the blanks for missing actions
for group_name in self._env_specs:
if group_name not in self._env_actions:
n_agents = 0
if group_name in self._env_state:
n_agents = self._env_state[group_name].n_agents()
self._env_actions[group_name] = self._env_specs[
group_name
].create_empty_action(n_agents)
step_input = self._generate_step_input(self._env_actions)
with hierarchical_timer("communicator.exchange"):
outputs = self.communicator.exchange(step_input)
if outputs is None:
raise UnityCommunicationException("Communicator has stopped.")
self._update_group_specs(outputs)
rl_output = outputs.rl_output
self._update_state(rl_output)
self._env_actions.clear()
if isinstance(value, self.SINGLE_BRAIN_ACTION_TYPES):
if self._num_external_brains == 1:
value = {self._external_brain_names[0]: value}
elif self._num_external_brains > 1:
raise UnityActionException(
"You have {0} brains, you need to feed a dictionary of brain names as keys "
"and state/action value estimates as values".format(
self._num_external_brains
)
)
else:
raise UnityActionException(
"There are no external brains in the environment, "
"step cannot take a value input"
)
def get_agent_groups(self) -> List[AgentGroup]:
return list(self._env_specs.keys())
for brain_name in list(vector_action.keys()):
if brain_name not in self._external_brain_names:
raise UnityActionException(
"The name {0} does not correspond to an external brain "
"in the environment".format(brain_name)
)
def _assert_group_exists(self, agent_group: str) -> None:
if agent_group not in self._env_specs:
raise UnityActionException(
"The group {0} does not correspond to an existing agent group "
"in the environment".format(agent_group)
)
for brain_name in self._external_brain_names:
n_agent = self._n_agents[brain_name]
if brain_name not in vector_action:
if self._brains[brain_name].vector_action_space_type == "discrete":
vector_action[brain_name] = (
[0.0]
* n_agent
* len(self._brains[brain_name].vector_action_space_size)
)
else:
vector_action[brain_name] = (
[0.0]
* n_agent
* self._brains[brain_name].vector_action_space_size[0]
)
else:
vector_action[brain_name] = self._flatten(vector_action[brain_name])
discrete_check = (
self._brains[brain_name].vector_action_space_type == "discrete"
def set_actions(self, agent_group: AgentGroup, action: np.ndarray) -> None:
self._assert_group_exists(agent_group)
if agent_group not in self._env_state:
return
spec = self._env_specs[agent_group]
expected_type = np.float32 if spec.is_action_continuous() else np.int32
expected_shape = (self._env_state[agent_group].n_agents(), spec.action_size)
if action.shape != expected_shape:
raise UnityActionException(
"The group {0} needs an input of dimension {1} but received input of dimension {2}".format(
agent_group, expected_shape, action.shape
)
if action.dtype != expected_type:
action = action.astype(expected_type)
self._env_actions[agent_group] = action
expected_discrete_size = n_agent * len(
self._brains[brain_name].vector_action_space_size
def set_action_for_agent(
self, agent_group: AgentGroup, agent_id: AgentId, action: np.ndarray
) -> None:
self._assert_group_exists(agent_group)
if agent_group not in self._env_state:
return
spec = self._env_specs[agent_group]
expected_shape = (spec.action_size,)
if action.shape != expected_shape:
raise UnityActionException(
"The Agent {0} in group {1} needs an input of dimension {2} but received input of dimension {3}".format(
agent_id, agent_group, expected_shape, action.shape
)
expected_type = np.float32 if spec.is_action_continuous() else np.int32
if action.dtype != expected_type:
action = action.astype(expected_type)
continuous_check = (
self._brains[brain_name].vector_action_space_type == "continuous"
if agent_group not in self._env_actions:
self._env_actions[agent_group] = self._empty_action(
spec, self._env_state[agent_group].n_agents()
)
try:
index = np.where(self._env_state[agent_group].agent_id == agent_id)[0][0]
except IndexError as ie:
raise IndexError(
"agent_id {} is did not request a decision at the previous step".format(
agent_id
) from ie
self._env_actions[agent_group][index] = action
expected_continuous_size = (
self._brains[brain_name].vector_action_space_size[0] * n_agent
)
def get_step_result(self, agent_group: AgentGroup) -> BatchedStepResult:
self._assert_group_exists(agent_group)
return self._env_state[agent_group]
if not (
(
discrete_check
and len(vector_action[brain_name]) == expected_discrete_size
)
or (
continuous_check
and len(vector_action[brain_name]) == expected_continuous_size
)
):
raise UnityActionException(
"There was a mismatch between the provided action and "
"the environment's expectation: "
"The brain {0} expected {1} {2} action(s), but was provided: {3}".format(
brain_name,
str(expected_discrete_size)
if discrete_check
else str(expected_continuous_size),
self._brains[brain_name].vector_action_space_type,
str(vector_action[brain_name]),
)
)
step_input = self._generate_step_input(vector_action, value)
with hierarchical_timer("communicator.exchange"):
outputs = self.communicator.exchange(step_input)
if outputs is None:
raise UnityCommunicationException("Communicator has stopped.")
self._update_brain_parameters(outputs)
rl_output = outputs.rl_output
state = self._get_state(rl_output)
for _b in self._external_brain_names:
self._n_agents[_b] = len(state[_b].agents)
return state
def get_agent_group_spec(self, agent_group: AgentGroup) -> AgentGroupSpec:
self._assert_group_exists(agent_group)
return self._env_specs[agent_group]
def close(self):
"""

arr = [float(x) for x in arr]
return arr
def _get_state(self, output: UnityRLOutputProto) -> AllBrainInfo:
"""
Collects experience information from all external brains in environment at current step.
:return: a dictionary of BrainInfo objects.
"""
_data = {}
for brain_name in output.agentInfos:
agent_info_list = output.agentInfos[brain_name].value
_data[brain_name] = BrainInfo.from_agent_proto(
self.worker_id, agent_info_list, self.brains[brain_name]
)
self._parse_side_channel_message(self.side_channels, output.side_channel)
return _data
@staticmethod
def _parse_side_channel_message(
side_channels: Dict[int, SideChannel], data: bytearray

channel.message_queue = []
return result
def _update_brain_parameters(self, output: UnityOutputProto) -> None:
init_output = output.rl_initialization_output
for brain_param in init_output.brain_parameters:
# Each BrainParameter in the rl_initialization_output should have at least one AgentInfo
# Get that agent, because we need some of its observations.
agent_infos = output.rl_output.agentInfos[brain_param.brain_name]
if agent_infos.value:
agent = agent_infos.value[0]
new_brain = BrainParameters.from_proto(brain_param, agent)
self._brains[brain_param.brain_name] = new_brain
logger.info(f"Connected new brain:\n{new_brain}")
self._external_brain_names = list(self._brains.keys())
self._num_external_brains = len(self._external_brain_names)
self, vector_action: Dict[str, np.ndarray], value: Dict[str, np.ndarray]
self, vector_action: Dict[str, np.ndarray]
n_agents = self._n_agents[b]
n_agents = self._env_state[b].n_agents()
_a_s = len(vector_action[b]) // n_agents
action = AgentActionProto(
vector_actions=vector_action[b][i * _a_s : (i + 1) * _a_s]
)
if b in value:
if value[b] is not None:
action.value = float(value[b][i])
action = AgentActionProto(vector_actions=vector_action[b][i])
rl_in.agent_actions[b].value.extend([action])
rl_in.command = 0
rl_in.side_channel = bytes(self._generate_side_channel_data(self.side_channels))

42
ml-agents-envs/mlagents/envs/simple_env_manager.py


from typing import Dict, List
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import BaseEnv
from mlagents.envs.brain import BrainParameters
from mlagents.envs.brain import BrainParameters, AllBrainInfo
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
Simple implementation of the EnvManager interface that only handles one BaseUnityEnvironment at a time.
Simple implementation of the EnvManager interface that only handles one BaseEnv at a time.
def __init__(
self, env: BaseUnityEnvironment, float_prop_channel: FloatPropertiesChannel
):
def __init__(self, env: BaseEnv, float_prop_channel: FloatPropertiesChannel):
super().__init__()
self.shared_float_properties = float_prop_channel
self.env = env

def step(self) -> List[EnvironmentStep]:
actions = {}
values = {}
actions[brain_name] = action_info.action
values[brain_name] = action_info.value
all_brain_info = self.env.step(vector_action=actions, value=values)
self.env.set_actions(brain_name, action_info.action)
self.env.step()
all_brain_info = self._generate_all_brain_info()
step_brain_info = all_brain_info
step_info = EnvironmentStep(

if config is not None:
for k, v in config.items():
self.shared_float_properties.set_property(k, v)
all_brain_info = self.env.reset()
self.env.reset()
all_brain_info = self._generate_all_brain_info()
return self.env.external_brains
result = {}
for brain_name in self.env.get_agent_groups():
result[brain_name] = group_spec_to_brain_parameters(
brain_name, self.env.get_agent_group_spec(brain_name)
)
return result
@property
def get_properties(self) -> Dict[str, float]:

brain_info
)
return all_action_info
def _generate_all_brain_info(self) -> AllBrainInfo:
all_brain_info = {}
for brain_name in self.env.get_agent_groups():
all_brain_info[brain_name] = step_result_to_brain_info(
self.env.get_step_result(brain_name),
self.env.get_agent_group_spec(brain_name),
)
return all_brain_info

44
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


from multiprocessing import Process, Pipe, Queue
from multiprocessing.connection import Connection
from queue import Empty as EmptyQueueException
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import BaseEnv
from mlagents.envs.env_manager import EnvManager, EnvironmentStep
from mlagents.envs.timers import (
TimerNode,

EngineConfig,
)
from mlagents.envs.side_channel.side_channel import SideChannel
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
logger = logging.getLogger("mlagents.envs")

shared_float_properties = FloatPropertiesChannel()
engine_configuration_channel = EngineConfigurationChannel()
engine_configuration_channel.set_configuration(engine_configuration)
env: BaseUnityEnvironment = env_factory(
env: BaseEnv = env_factory(
worker_id, [shared_float_properties, engine_configuration_channel]
)

def _generate_all_brain_info() -> AllBrainInfo:
all_brain_info = {}
for brain_name in env.get_agent_groups():
all_brain_info[brain_name] = step_result_to_brain_info(
env.get_step_result(brain_name),
env.get_agent_group_spec(brain_name),
worker_id,
)
return all_brain_info
def external_brains():
result = {}
for brain_name in env.get_agent_groups():
result[brain_name] = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
return result
actions = {}
values = {}
actions[brain_name] = action_info.action
values[brain_name] = action_info.value
all_brain_info = env.step(vector_action=actions, value=values)
if len(action_info.action) != 0:
env.set_actions(brain_name, action_info.action)
env.step()
all_brain_info = _generate_all_brain_info()
# The timers in this process are independent from all the processes and the "main" process
# So after we send back the root timer, we can safely clear them.
# Note that we could randomly return timers a fraction of the time if we wanted to reduce

step_queue.put(EnvironmentResponse("step", worker_id, step_response))
reset_timers()
elif cmd.name == "external_brains":
_send_response("external_brains", env.external_brains)
_send_response("external_brains", external_brains())
elif cmd.name == "get_properties":
reset_params = {}
for k in shared_float_properties.list_properties():

elif cmd.name == "reset":
for k, v in cmd.payload.items():
shared_float_properties.set_property(k, v)
all_brain_info = env.reset()
env.reset()
all_brain_info = _generate_all_brain_info()
_send_response("reset", all_brain_info)
elif cmd.name == "close":
break

class SubprocessEnvManager(EnvManager):
def __init__(
self,
env_factory: Callable[[int, List[SideChannel]], BaseUnityEnvironment],
env_factory: Callable[[int, List[SideChannel]], BaseEnv],
engine_configuration: EngineConfig,
n_env: int = 1,
):

def create_worker(
worker_id: int,
step_queue: Queue,
env_factory: Callable[[int, List[SideChannel]], BaseUnityEnvironment],
env_factory: Callable[[int, List[SideChannel]], BaseEnv],
engine_configuration: EngineConfig,
) -> UnityEnvWorker:
parent_conn, child_conn = Pipe()

82
ml-agents-envs/mlagents/envs/tests/test_envs.py


import numpy as np
from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.base_env import BatchedStepResult
from mlagents.envs.brain import BrainInfo
from mlagents.envs.mock_communicator import MockCommunicator

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
assert env.external_brain_names[0] == "RealFakeBrain"
assert env.get_agent_groups() == ["RealFakeBrain"]
env.close()

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
brain = env.brains["RealFakeBrain"]
brain_info = env.reset()
spec = env.get_agent_group_spec("RealFakeBrain")
env.reset()
batched_step_result = env.get_step_result("RealFakeBrain")
assert isinstance(brain_info, dict)
assert isinstance(brain_info["RealFakeBrain"], BrainInfo)
assert isinstance(brain_info["RealFakeBrain"].visual_observations, list)
assert isinstance(brain_info["RealFakeBrain"].vector_observations, np.ndarray)
assert (
len(brain_info["RealFakeBrain"].visual_observations)
== brain.number_visual_observations
)
assert len(brain_info["RealFakeBrain"].vector_observations) == len(
brain_info["RealFakeBrain"].agents
)
assert (
len(brain_info["RealFakeBrain"].vector_observations[0])
== brain.vector_observation_space_size
)
assert isinstance(batched_step_result, BatchedStepResult)
assert len(spec.observation_shapes) == len(batched_step_result.obs)
n_agents = batched_step_result.n_agents()
for shape, obs in zip(spec.observation_shapes, batched_step_result.obs):
assert (n_agents,) + shape == obs.shape
@mock.patch("mlagents.envs.environment.UnityEnvironment.executable_launcher")

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
brain = env.brains["RealFakeBrain"]
brain_info = env.step()
brain_info = env.step(
[0]
* brain.vector_action_space_size[0]
* len(brain_info["RealFakeBrain"].agents)
spec = env.get_agent_group_spec("RealFakeBrain")
env.step()
batched_step_result = env.get_step_result("RealFakeBrain")
n_agents = batched_step_result.n_agents()
env.set_actions(
"RealFakeBrain", np.zeros((n_agents, spec.action_size), dtype=np.float32)
env.step()
env.step([0])
brain_info = env.step(
[-1]
* brain.vector_action_space_size[0]
* len(brain_info["RealFakeBrain"].agents)
env.set_actions(
"RealFakeBrain",
np.zeros((n_agents - 1, spec.action_size), dtype=np.float32),
)
batched_step_result = env.get_step_result("RealFakeBrain")
n_agents = batched_step_result.n_agents()
env.set_actions(
"RealFakeBrain", -1 * np.ones((n_agents, spec.action_size), dtype=np.float32)
env.step()
assert isinstance(brain_info, dict)
assert isinstance(brain_info["RealFakeBrain"], BrainInfo)
assert isinstance(brain_info["RealFakeBrain"].visual_observations, list)
assert isinstance(brain_info["RealFakeBrain"].vector_observations, np.ndarray)
assert (
len(brain_info["RealFakeBrain"].visual_observations)
== brain.number_visual_observations
)
assert len(brain_info["RealFakeBrain"].vector_observations) == len(
brain_info["RealFakeBrain"].agents
)
assert (
len(brain_info["RealFakeBrain"].vector_observations[0])
== brain.vector_observation_space_size
)
print("\n\n\n\n\n\n\n" + str(brain_info["RealFakeBrain"].local_done))
assert not brain_info["RealFakeBrain"].local_done[0]
assert brain_info["RealFakeBrain"].local_done[2]
assert isinstance(batched_step_result, BatchedStepResult)
assert len(spec.observation_shapes) == len(batched_step_result.obs)
for shape, obs in zip(spec.observation_shapes, batched_step_result.obs):
assert (n_agents,) + shape == obs.shape
assert not batched_step_result.done[0]
assert batched_step_result.done[2]
@mock.patch("mlagents.envs.environment.UnityEnvironment.executable_launcher")

4
ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py


EnvironmentResponse,
StepResponse,
)
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import BaseEnv
return mock.create_autospec(spec=BaseUnityEnvironment)
return mock.create_autospec(spec=BaseEnv)
class MockEnvWorker:

4
ml-agents/mlagents/trainers/learn.py


from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.sampler_class import SamplerManager
from mlagents.envs.exception import SamplerException
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.base_env import BaseEnv
from mlagents.envs.subprocess_env_manager import SubprocessEnvManager
from mlagents.envs.side_channel.side_channel import SideChannel
from mlagents.envs.side_channel.engine_configuration_channel import EngineConfig

seed: Optional[int],
start_port: int,
env_args: Optional[List[str]],
) -> Callable[[int, List[SideChannel]], BaseUnityEnvironment]:
) -> Callable[[int, List[SideChannel]], BaseEnv]:
if env_path is not None:
# Strip out executable extensions if passed
env_path = (

23
ml-agents/mlagents/trainers/tests/test_bc.py


import mlagents.trainers.tests.mock_brain as mb
from mlagents.trainers.bc.policy import BCPolicy
from mlagents.trainers.bc.offline_trainer import BCTrainer
from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
@pytest.fixture

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
brain_infos = env.reset()
brain_info = brain_infos[env.external_brain_names[0]]
env.reset()
brain_name = env.get_agent_groups()[0]
brain_info = step_result_to_brain_info(
env.get_step_result(brain_name), env.get_agent_group_spec(brain_name)
)
brain_params = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
model_path = env.external_brain_names[0]
model_path = brain_name
policy = BCPolicy(
0, env.brains[env.external_brain_names[0]], trainer_parameters, False
)
policy = BCPolicy(0, brain_params, trainer_parameters, False)
run_out = policy.evaluate(brain_info)
assert run_out["action"].shape == (3, 2)

36
ml-agents/mlagents/trainers/tests/test_ppo.py


from mlagents.envs.mock_communicator import MockCommunicator
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.mock_brain import make_brain_parameters
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
@pytest.fixture

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
brain_infos = env.reset()
brain_info = brain_infos[env.external_brain_names[0]]
env.reset()
brain_name = env.get_agent_groups()[0]
brain_info = step_result_to_brain_info(
env.get_step_result(brain_name), env.get_agent_group_spec(brain_name)
)
brain_params = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
model_path = env.external_brain_names[0]
model_path = brain_name
policy = PPOPolicy(
0, env.brains[env.external_brain_names[0]], trainer_parameters, False, False
)
policy = PPOPolicy(0, brain_params, trainer_parameters, False, False)
run_out = policy.evaluate(brain_info)
assert run_out["action"].shape == (3, 2)
env.close()

discrete_action=False, visual_inputs=0
)
env = UnityEnvironment(" ")
brain_infos = env.reset()
brain_info = brain_infos[env.external_brain_names[0]]
env.reset()
brain_name = env.get_agent_groups()[0]
brain_info = step_result_to_brain_info(
env.get_step_result(brain_name), env.get_agent_group_spec(brain_name)
)
brain_params = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
model_path = env.external_brain_names[0]
model_path = brain_name
policy = PPOPolicy(
0, env.brains[env.external_brain_names[0]], trainer_parameters, False, False
)
policy = PPOPolicy(0, brain_params, trainer_parameters, False, False)
run_out = policy.get_value_estimates(brain_info, 0, done=False)
for key, val in run_out.items():
assert type(key) is str

112
ml-agents/mlagents/trainers/tests/test_simple_rl.py


import tempfile
import pytest
import yaml
from typing import Any, Dict
from typing import Dict
import numpy as np
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.brain import BrainInfo, AllBrainInfo, BrainParameters
from mlagents.envs.communicator_objects.agent_info_pb2 import AgentInfoProto
from mlagents.envs.communicator_objects.observation_pb2 import (
ObservationProto,
NONE as COMPRESSION_TYPE_NONE,
from mlagents.envs.base_env import (
BaseEnv,
AgentGroupSpec,
BatchedStepResult,
ActionType,
from mlagents.envs.brain import BrainParameters
BRAIN_NAME = __name__
OBS_SIZE = 1

return max(min_val, min(x, max_val))
class Simple1DEnvironment(BaseUnityEnvironment):
class Simple1DEnvironment(BaseEnv):
"""
Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]).

super().__init__()
self.discrete = use_discrete
self._brains: Dict[str, BrainParameters] = {}
brain_params = BrainParameters(
brain_name=BRAIN_NAME,
vector_observation_space_size=OBS_SIZE,
camera_resolutions=[],
vector_action_space_size=[2] if use_discrete else [1],
vector_action_descriptions=["moveDirection"],
vector_action_space_type=0 if use_discrete else 1,
action_type = ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS
self.group_spec = AgentGroupSpec(
[(OBS_SIZE,)], action_type, (2,) if use_discrete else 1
self._brains[BRAIN_NAME] = brain_params
self.random = random.Random(str(brain_params))
self.random = random.Random(str(self.group_spec))
self.action = None
self.step_result = None
def step(
self,
vector_action: Dict[str, Any] = None,
memory: Dict[str, Any] = None,
value: Dict[str, Any] = None,
) -> AllBrainInfo:
assert vector_action is not None
def get_agent_groups(self):
return [BRAIN_NAME]
def get_agent_group_spec(self, name):
return self.group_spec
def set_action_for_agent(self, name, id, data):
pass
def set_actions(self, name, data):
self.action = data
def get_step_result(self, name):
return self.step_result
def step(self) -> None:
assert self.action is not None
act = vector_action[BRAIN_NAME][0][0]
act = self.action[0][0]
delta = vector_action[BRAIN_NAME][0][0]
delta = self.action[0][0]
delta = clamp(delta, -STEP_SIZE, STEP_SIZE)
self.position += delta
self.position = clamp(self.position, -1, 1)

else:
reward = -TIME_PENALTY
vector_obs = [self.goal] * OBS_SIZE
vector_obs_proto = ObservationProto(
float_data=ObservationProto.FloatData(data=vector_obs),
shape=[len(vector_obs)],
compression_type=COMPRESSION_TYPE_NONE,
)
agent_info = AgentInfoProto(
reward=reward, done=bool(done), observations=[vector_obs_proto]
)
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal]
m_reward = np.array([reward], dtype=np.float32)
m_done = np.array([done], dtype=np.bool)
m_agent_id = np.array([0], dtype=np.int32)
return {
BRAIN_NAME: BrainInfo.from_agent_proto(
0, [agent_info], self._brains[BRAIN_NAME]
)
}
self.step_result = BatchedStepResult(
m_vector_obs, m_reward, m_done, m_done, m_agent_id, None
)
def _reset_agent(self):
self.position = 0.0

def reset(
self,
config: Dict[str, float] = None,
train_mode: bool = True,
custom_reset_parameters: Any = None,
) -> AllBrainInfo: # type: ignore
def reset(self) -> None: # type: ignore
vector_obs = [self.goal] * OBS_SIZE
vector_obs_proto = ObservationProto(
float_data=ObservationProto.FloatData(data=vector_obs),
shape=[len(vector_obs)],
compression_type=COMPRESSION_TYPE_NONE,
)
agent_info = AgentInfoProto(
done=False, max_step_reached=False, observations=[vector_obs_proto]
)
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal]
m_reward = np.array([0], dtype=np.float32)
m_done = np.array([False], dtype=np.bool)
m_agent_id = np.array([0], dtype=np.int32)
return {
BRAIN_NAME: BrainInfo.from_agent_proto(
0, [agent_info], self._brains[BRAIN_NAME]
)
}
self.step_result = BatchedStepResult(
m_vector_obs, m_reward, m_done, m_done, m_agent_id, None
)
@property
def external_brains(self) -> Dict[str, BrainParameters]:

4
notebooks/getting-started-gym.ipynb


"metadata": {},
"outputs": [],
"source": [
"env_name = \"../envs/GridWorld\" # Name of the Unity environment binary to launch\n",
"env_name = \"../envs/3DBall\" # Name of the Unity environment binary to launch\n",
"env = UnityEnv(env_name, worker_id=0, use_visual=True)\n",
"\n",
"# Examine environment parameters\n",

"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
"version": "3.6.5"
}
},
"nbformat": 4,

80
notebooks/getting-started.ipynb


"metadata": {},
"outputs": [],
"source": [
"env_name = \"../envs/3DBall\" # Name of the Unity environment binary to launch\n",
"env_name = \"../envs/GridWorld\" # Name of the Unity environment binary to launch\n",
"train_mode = True # Whether to run the environment in training or inference mode"
]
},

"import sys\n",
"\n",
"from mlagents.envs.environment import UnityEnvironment\n",
"from mlagents.envs.side_channel.engine_configuration_channel import EngineConfig, EngineConfigurationChannel\n",
"\n",
"%matplotlib inline\n",
"\n",

"metadata": {},
"outputs": [],
"source": [
"env = UnityEnvironment(file_name=env_name)\n",
"engine_configuration_channel = EngineConfigurationChannel()\n",
"env = UnityEnvironment(base_port = 5006, file_name=env_name, side_channels = [engine_configuration_channel])\n",
"\n",
"#Reset the environment\n",
"env.reset()\n",
"default_brain = env.external_brain_names[0]\n",
"brain = env.brains[default_brain]"
"group_name = env.get_agent_groups()[0]\n",
"group_spec = env.get_agent_group_spec(group_name)\n",
"\n",
"# Set the time scale of the engine\n",
"engine_configuration_channel.set_configuration_parameters(time_scale = 3.0)"
]
},
{

"metadata": {},
"outputs": [],
"source": [
"# Reset the environment\n",
"env_info = env.reset(train_mode=train_mode)[default_brain]\n",
"# Get the state of the agents\n",
"step_result = env.get_step_result(group_name)\n",
"# Examine the state space for the default brain\n",
"print(\"Agent state looks like: \\n{}\".format(env_info.vector_observations[0]))\n",
"# Examine the number of observations per Agent\n",
"print(\"Number of observations : \", len(group_spec.observation_shapes))\n",
"# Examine the observation space for the default brain\n",
"for observation in env_info.visual_observations:\n",
" print(\"Agent observations look like:\")\n",
" if observation.shape[3] == 3:\n",
" plt.imshow(observation[0,:,:,:])\n",
" else:\n",
" plt.imshow(observation[0,:,:,0])"
"# Examine the state space for the first observation for all agents\n",
"print(\"Agent state looks like: \\n{}\".format(step_result.obs[0]))\n",
"\n",
"# Examine the state space for the first observation for the first agent\n",
"print(\"Agent state looks like: \\n{}\".format(step_result.obs[0][0]))\n",
"\n",
"# Is there a visual observation ?\n",
"vis_obs = any([len(shape) == 3 for shape in group_spec.observation_shapes])\n",
"print(\"Is there a visual observation ?\", vis_obs)\n",
"\n",
"# Examine the visual observations\n",
"if vis_obs:\n",
" vis_obs_index = next(i for i,v in enumerate(group_spec.observation_shapes) if len(v) == 3)\n",
" print(\"Agent visual observation look like:\")\n",
" obs = step_result.obs[vis_obs_index]\n",
" plt.imshow(obs[0,:,:,:])\n"
]
},
{

"outputs": [],
"source": [
"for episode in range(10):\n",
" env_info = env.reset(train_mode=train_mode)[default_brain]\n",
" env.reset()\n",
" step_result = env.get_step_result(group_name)\n",
" action_size = brain.vector_action_space_size\n",
" if brain.vector_action_space_type == 'continuous':\n",
" env_info = env.step(np.random.randn(len(env_info.agents), \n",
" action_size[0]))[default_brain]\n",
" else:\n",
" action = np.column_stack([np.random.randint(0, action_size[i], size=(len(env_info.agents))) for i in range(len(action_size))])\n",
" env_info = env.step(action)[default_brain]\n",
" episode_rewards += env_info.rewards[0]\n",
" done = env_info.local_done[0]\n",
" action_size = group_spec.action_size\n",
" if group_spec.is_action_continuous():\n",
" action = np.random.randn(step_result.n_agents(), group_spec.action_size)\n",
" \n",
" if group_spec.is_action_discrete():\n",
" branch_size = group_spec.discrete_action_branches\n",
" action = np.column_stack([np.random.randint(0, branch_size[i], size=(step_result.n_agents())) for i in range(len(branch_size))])\n",
" env.set_actions(group_name, action)\n",
" env.step()\n",
" step_result = env.get_step_result(group_name)\n",
" episode_rewards += step_result.reward[0]\n",
" done = step_result.done[0]\n",
" print(\"Total reward this episode: {}\".format(episode_rewards))"
]
},

"source": [
"env.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {

"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
"version": "3.6.5"
}
},
"nbformat": 4,

301
ml-agents-envs/mlagents/envs/base_env.py


"""
Python Environment API for the ML-Agents toolkit
The aim of this API is to expose groups of similar Agents evolving in Unity
to perform reinforcement learning on.
There can be multiple groups of similar Agents (same observations and actions
spaces) in the simulation. These groups are identified by a agent_group that
corresponds to a single group of Agents in the simulation.
For performance reasons, the data of each group of agents is processed in a
batched manner. When retrieving the state of a group of Agents, said state
contains the data for the whole group. Agents in these groups are identified
by a unique int identifier that allows tracking of Agents across simulation
steps. Note that there is no guarantee that the number or order of the Agents
in the state will be consistent across simulation steps.
A simulation steps corresponds to moving the simulation forward until at least
one agent in the simulation sends its observations to Python again. Since
Agents can request decisions at different frequencies, a simulation step does
not necessarily correspond to a fixed simulation time increment.
"""
from abc import ABC, abstractmethod
from typing import List, NamedTuple, Tuple, Optional, Union, Dict, NewType
import numpy as np
from enum import Enum
AgentId = NewType("AgentId", int)
AgentGroup = NewType("AgentGroup", str)
class StepResult(NamedTuple):
"""
Contains the data a single Agent collected since the last
simulation step.
- obs is a list of numpy arrays observations collected by the group of
agent.
- reward is a float. Corresponds to the rewards collected by the agent
since the last simulation step.
- done is a bool. Is true if the Agent was terminated during the last
simulation step.
- max_step is a bool. Is true if the Agent reached its maximum number of
steps during the last simulation step.
- agent_id is an int and an unique identifier for the corresponding Agent.
- action_mask is an optional list of one dimensional array of booleans.
Only available in multi-discrete action space type.
Each array corresponds to an action branch. Each array contains a mask
for each action of the branch. If true, the action is not available for
the agent during this simulation step.
"""
obs: List[np.ndarray]
reward: float
done: bool
max_step: bool
agent_id: AgentId
action_mask: Optional[List[np.ndarray]]
class BatchedStepResult:
"""
Contains the data a group of similar Agents collected since the last
simulation step. Note that all Agents do not necessarily have new
information to send at each simulation step. Therefore, the ordering of
agents and the batch size of the BatchedStepResult are not fixed across
simulation steps.
- obs is a list of numpy arrays observations collected by the group of
agent. Each obs has one extra dimension compared to StepResult: the first
dimension of the array corresponds to the batch size of
the group.
- reward is a float vector of length batch size. Corresponds to the
rewards collected by each agent since the last simulation step.
- done is an array of booleans of length batch size. Is true if the
associated Agent was terminated during the last simulation step.
- max_step is an array of booleans of length batch size. Is true if the
associated Agent reached its maximum number of steps during the last
simulation step.
- agent_id is an int vector of length batch size containing unique
identifier for the corresponding Agent. This is used to track Agents
across simulation steps.
- action_mask is an optional list of two dimensional array of booleans.
Only available in multi-discrete action space type.
Each array corresponds to an action branch. The first dimension of each
array is the batch size and the second contains a mask for each action of
the branch. If true, the action is not available for the agent during
this simulation step.
"""
def __init__(self, obs, reward, done, max_step, agent_id, action_mask):
self.obs: List[np.ndarray] = obs
self.reward: np.ndarray = reward
self.done: np.ndarray = done
self.max_step: np.ndarray = max_step
self.agent_id: np.ndarray = agent_id
self.action_mask: Optional[List[np.ndarray]] = action_mask
self._agent_id_to_index: Optional[Dict[int, int]] = None
def contains_agent(self, agent_id: AgentId) -> bool:
if self._agent_id_to_index is None:
self._agent_id_to_index = {}
for a_idx, a_id in enumerate(self.agent_id):
self._agent_id_to_index[a_id] = a_idx
return agent_id in self._agent_id_to_index
def get_agent_step_result(self, agent_id: AgentId) -> StepResult:
"""
returns the step result for a specific agent.
:param agent_id: The id of the agent
:returns: obs, reward, done, agent_id and optional action mask for a
specific agent
"""
if not self.contains_agent(agent_id):
raise IndexError(
"agent_id {} is not present in the BatchedStepResult".format(agent_id)
)
agent_index = self._agent_id_to_index[agent_id] # type: ignore
agent_obs = []
for batched_obs in self.obs:
agent_obs.append(batched_obs[agent_index])
agent_mask = None
if self.action_mask is not None:
agent_mask = []
for mask in self.action_mask:
agent_mask.append(mask[agent_index])
return StepResult(
obs=agent_obs,
reward=self.reward[agent_index],
done=self.done[agent_index],
max_step=self.max_step[agent_index],
agent_id=agent_id,
action_mask=agent_mask,
)
@staticmethod
def empty(spec: "AgentGroupSpec") -> "BatchedStepResult":
"""
Returns an empty BatchedStepResult.
:param spec: The AgentGroupSpec for the BatchedStepResult
"""
obs: List[np.ndarray] = []
for shape in spec.observation_shapes:
obs += [np.zeros((0,) + shape, dtype=np.float32)]
return BatchedStepResult(
obs=obs,
reward=np.zeros(0, dtype=np.float32),
done=np.zeros(0, dtype=np.bool),
max_step=np.zeros(0, dtype=np.bool),
agent_id=np.zeros(0, dtype=np.int32),
action_mask=None,
)
def n_agents(self) -> int:
return len(self.agent_id)
class ActionType(Enum):
DISCRETE = 0
CONTINUOUS = 1
class AgentGroupSpec(NamedTuple):
"""
A NamedTuple to containing information about the observations and actions
spaces for a group of Agents.
- observation_shapes is a List of Tuples of int : Each Tuple corresponds
to an observation's dimensions. The shape tuples have the same ordering as
the ordering of the BatchedStepResult and StepResult.
- action_type is the type of data of the action. it can be discrete or
continuous. If discrete, the action tensors are expected to be int32. If
continuous, the actions are expected to be float32.
- action_shape is:
- An int in continuous action space corresponding to the number of
floats that constitute the action.
- A Tuple of int in discrete action space where each int corresponds to
the number of discrete actions available to the agent.
"""
observation_shapes: List[Tuple]
action_type: ActionType
action_shape: Union[int, Tuple[int, ...]]
def is_action_discrete(self) -> bool:
"""
Returns true if the Agent group uses discrete actions
"""
return self.action_type == ActionType.DISCRETE
def is_action_continuous(self) -> bool:
"""
Returns true if the Agent group uses continuous actions
"""
return self.action_type == ActionType.CONTINUOUS
@property
def action_size(self) -> int:
"""
Returns the dimension of the action.
- In the continuous case, will return the number of continuous actions.
- In the (multi-)discrete case, will return the number of action.
branches.
"""
if self.action_type == ActionType.DISCRETE:
return len(self.action_shape) # type: ignore
else:
return self.action_shape # type: ignore
@property
def discrete_action_branches(self) -> Optional[Tuple[int, ...]]:
"""
Returns a Tuple of int corresponding to the number of possible actions
for each branch (only for discrete actions). Will return None in
for continuous actions.
"""
if self.action_type == ActionType.DISCRETE:
return self.action_shape # type: ignore
else:
return None
def create_empty_action(self, n_agents: int) -> np.ndarray:
if self.action_type == ActionType.DISCRETE:
return np.zeros((n_agents, self.action_size), dtype=np.int32)
else:
return np.zeros((n_agents, self.action_size), dtype=np.float32)
class BaseEnv(ABC):
@abstractmethod
def step(self) -> None:
"""
Signals the environment that it must move the simulation forward
by one step.
"""
pass
@abstractmethod
def reset(self) -> None:
"""
Signals the environment that it must reset the simulation.
"""
pass
@abstractmethod
def close(self) -> None:
"""
Signals the environment that it must close.
"""
pass
@abstractmethod
def get_agent_groups(self) -> List[AgentGroup]:
"""
Returns the list of the agent group names present in the environment.
Agents grouped under the same group name have the same action and
observation specs, and are expected to behave similarly in the environment.
This list can grow with time as new policies are instantiated.
:return: the list of agent group names.
"""
pass
@abstractmethod
def set_actions(self, agent_group: AgentGroup, action: np.ndarray) -> None:
"""
Sets the action for all of the agents in the simulation for the next
step. The Actions must be in the same order as the order received in
the step result.
:param agent_group: The name of the group the agents are part of
:param action: A two dimensional np.ndarray corresponding to the action
(either int or float)
"""
pass
@abstractmethod
def set_action_for_agent(
self, agent_group: AgentGroup, agent_id: AgentId, action: np.ndarray
) -> None:
"""
Sets the action for one of the agents in the simulation for the next
step.
:param agent_group: The name of the group the agent is part of
:param agent_id: The id of the agent the action is set for
:param action: A two dimensional np.ndarray corresponding to the action
(either int or float)
"""
pass
@abstractmethod
def get_step_result(self, agent_group: AgentGroup) -> BatchedStepResult:
"""
Retrieves the observations of the agents that requested a step in the
simulation.
:param agent_group: The name of the group the agents are part of
:return: A BatchedStepResult NamedTuple containing the observations,
the rewards and the done flags for this group of agents.
"""
pass
@abstractmethod
def get_agent_group_spec(self, agent_group: AgentGroup) -> AgentGroupSpec:
"""
Get the AgentGroupSpec corresponding to the agent group name
:param agent_group: The name of the group the agents are part of
:return: A AgentGroupSpec corresponding to that agent group name
"""
pass

70
ml-agents-envs/mlagents/envs/brain_conversion_utils.py


from mlagents.envs.brain import BrainInfo, BrainParameters, CameraResolution
from mlagents.envs.base_env import BatchedStepResult, AgentGroupSpec
from mlagents.envs.exception import UnityEnvironmentException
import numpy as np
from typing import List
def step_result_to_brain_info(
step_result: BatchedStepResult,
group_spec: AgentGroupSpec,
agent_id_prefix: int = None,
) -> BrainInfo:
n_agents = step_result.n_agents()
vis_obs_indices = []
vec_obs_indices = []
for index, observation in enumerate(step_result.obs):
if len(observation.shape) == 2:
vec_obs_indices.append(index)
elif len(observation.shape) == 4:
vis_obs_indices.append(index)
else:
raise UnityEnvironmentException(
"Invalid input received from the environment, the observation should "
"either be a vector of float or a PNG image"
)
if len(vec_obs_indices) == 0:
vec_obs = np.zeros((n_agents, 0), dtype=np.float32)
else:
vec_obs = np.concatenate([step_result.obs[i] for i in vec_obs_indices], axis=1)
vis_obs = [step_result.obs[i] for i in vis_obs_indices]
mask = np.ones((n_agents, np.sum(group_spec.action_size)), dtype=np.float32)
if group_spec.is_action_discrete():
mask = np.ones(
(n_agents, np.sum(group_spec.discrete_action_branches)), dtype=np.float32
)
if step_result.action_mask is not None:
mask = 1 - np.concatenate(step_result.action_mask, axis=1)
if agent_id_prefix is None:
agent_ids = [str(ag_id) for ag_id in list(step_result.agent_id)]
else:
agent_ids = [f"${agent_id_prefix}-{ag_id}" for ag_id in step_result.agent_id]
return BrainInfo(
vis_obs,
vec_obs,
list(step_result.reward),
agent_ids,
list(step_result.done),
list(step_result.max_step),
mask,
)
def group_spec_to_brain_parameters(
name: str, group_spec: AgentGroupSpec
) -> BrainParameters:
vec_size = np.sum(
[shape[0] for shape in group_spec.observation_shapes if len(shape) == 1]
)
vis_sizes = [shape for shape in group_spec.observation_shapes if len(shape) == 3]
cam_res = [CameraResolution(s[0], s[1], s[2]) for s in vis_sizes]
a_size: List[int] = []
if group_spec.is_action_discrete():
a_size += list(group_spec.discrete_action_branches)
vector_action_space_type = 0
else:
a_size += [group_spec.action_size]
vector_action_space_type = 1
return BrainParameters(
name, int(vec_size), cam_res, a_size, [], vector_action_space_type
)

165
ml-agents-envs/mlagents/envs/rpc_utils.py


from mlagents.envs.base_env import AgentGroupSpec, ActionType, BatchedStepResult
from mlagents.envs.timers import hierarchical_timer, timed
from mlagents.envs.communicator_objects.agent_info_pb2 import AgentInfoProto
from mlagents.envs.communicator_objects.brain_parameters_pb2 import BrainParametersProto
import logging
import numpy as np
import io
from typing import List, Tuple
from PIL import Image
logger = logging.getLogger("mlagents.envs")
def agent_group_spec_from_proto(
brain_param_proto: BrainParametersProto, agent_info: AgentInfoProto
) -> AgentGroupSpec:
"""
Converts brain parameter and agent info proto to AgentGroupSpec object.
:param brain_param_proto: protobuf object.
:param agent_info: protobuf object.
:return: AgentGroupSpec object.
"""
observation_shape = [tuple(obs.shape) for obs in agent_info.observations]
action_type = (
ActionType.DISCRETE
if brain_param_proto.vector_action_space_type == 0
else ActionType.CONTINUOUS
)
action_shape = None
if action_type == ActionType.CONTINUOUS:
action_shape = brain_param_proto.vector_action_size[0]
else:
action_shape = tuple(brain_param_proto.vector_action_size)
return AgentGroupSpec(observation_shape, action_type, action_shape)
@timed
def process_pixels(image_bytes: bytes, gray_scale: bool) -> np.ndarray:
"""
Converts byte array observation image into numpy array, re-sizes it,
and optionally converts it to grey scale
:param gray_scale: Whether to convert the image to grayscale.
:param image_bytes: input byte array corresponding to image
:return: processed numpy array of observation from environment
"""
with hierarchical_timer("image_decompress"):
image_bytearray = bytearray(image_bytes)
image = Image.open(io.BytesIO(image_bytearray))
# Normally Image loads lazily, this forces it to do loading in the timer scope.
image.load()
s = np.array(image) / 255.0
if gray_scale:
s = np.mean(s, axis=2)
s = np.reshape(s, [s.shape[0], s.shape[1], 1])
return s
@timed
def _process_visual_observation(
obs_index: int, shape: Tuple[int, int, int], agent_info_list: List[AgentInfoProto]
) -> np.ndarray:
if len(agent_info_list) == 0:
return np.zeros((0, shape[0], shape[1], shape[2]), dtype=np.float32)
gray_scale = shape[2] == 1
batched_visual = [
process_pixels(agent_obs.observations[obs_index].compressed_data, gray_scale)
for agent_obs in agent_info_list
]
return np.array(batched_visual, dtype=np.float32)
@timed
def _process_vector_observation(
obs_index: int, shape: Tuple[int, ...], agent_info_list: List[AgentInfoProto]
) -> np.ndarray:
if len(agent_info_list) == 0:
return np.zeros((0, shape[0]), dtype=np.float32)
np_obs = np.array(
[
agent_obs.observations[obs_index].float_data.data
for agent_obs in agent_info_list
],
dtype=np.float32,
)
# Check for NaNs or infs in the observations
# If there's a NaN in the observations, the np.mean() result will be NaN
# If there's an Inf (either sign) then the result will be Inf
# See https://stackoverflow.com/questions/6736590/fast-check-for-nan-in-numpy for background
# Note that a very large values (larger than sqrt(float_max)) will result in an Inf value here
# This is OK though, worst case it results in an unnecessary (but harmless) nan_to_num call.
d = np.mean(np_obs)
has_nan = np.isnan(d)
has_inf = not np.isfinite(d)
# In we have any NaN or Infs, use np.nan_to_num to replace these with finite values
if has_nan or has_inf:
np_obs = np.nan_to_num(np_obs)
if has_nan:
logger.warning(f"An agent had a NaN observation in the environment")
return np_obs
@timed
def batched_step_result_from_proto(
agent_info_list: List[AgentInfoProto], group_spec: AgentGroupSpec
) -> BatchedStepResult:
obs_list: List[np.ndarray] = []
for obs_index, obs_shape in enumerate(group_spec.observation_shapes):
is_visual = len(obs_shape) == 3
if is_visual:
obs_list += [
_process_visual_observation(obs_index, obs_shape, agent_info_list)
]
else:
obs_list += [
_process_vector_observation(obs_index, obs_shape, agent_info_list)
]
rewards = np.array(
[agent_info.reward for agent_info in agent_info_list], dtype=np.float32
)
d = np.dot(rewards, rewards)
has_nan = np.isnan(d)
has_inf = not np.isfinite(d)
# In we have any NaN or Infs, use np.nan_to_num to replace these with finite values
if has_nan or has_inf:
rewards = np.nan_to_num(rewards)
if has_nan:
logger.warning(f"An agent had a NaN reward in the environment")
done = np.array([agent_info.done for agent_info in agent_info_list], dtype=np.bool)
max_step = np.array(
[agent_info.max_step_reached for agent_info in agent_info_list], dtype=np.bool
)
agent_id = np.array(
[agent_info.id for agent_info in agent_info_list], dtype=np.int32
)
action_mask = None
if group_spec.is_action_discrete():
if any([agent_info.action_mask is not None] for agent_info in agent_info_list):
n_agents = len(agent_info_list)
a_size = np.sum(group_spec.discrete_action_branches)
mask_matrix = np.ones((n_agents, a_size), dtype=np.bool)
for agent_index, agent_info in enumerate(agent_info_list):
if agent_info.action_mask is not None:
if len(agent_info.action_mask) == a_size:
mask_matrix[agent_index, :] = [
False if agent_info.action_mask[k] else True
for k in range(a_size)
]
action_mask = (1 - mask_matrix).astype(np.bool)
indices = _generate_split_indices(group_spec.discrete_action_branches)
action_mask = np.split(action_mask, indices, axis=1)
return BatchedStepResult(obs_list, rewards, done, max_step, agent_id, action_mask)
def _generate_split_indices(dims):
if len(dims) <= 1:
return ()
result = (dims[0],)
for i in range(len(dims) - 2):
result += (dims[i + 1] + result[i],)
return result

187
ml-agents-envs/mlagents/envs/tests/test_rpc_utils.py


from typing import List, Tuple
from mlagents.envs.communicator_objects.agent_info_pb2 import AgentInfoProto
from mlagents.envs.communicator_objects.observation_pb2 import ObservationProto
from mlagents.envs.communicator_objects.brain_parameters_pb2 import BrainParametersProto
import numpy as np
from mlagents.envs.base_env import AgentGroupSpec, ActionType
import io
from mlagents.envs.rpc_utils import (
agent_group_spec_from_proto,
process_pixels,
_process_visual_observation,
_process_vector_observation,
batched_step_result_from_proto,
)
from PIL import Image
def generate_list_agent_proto(
n_agent: int, shape: List[Tuple[int]]
) -> List[AgentInfoProto]:
result = []
for agent_index in range(n_agent):
ap = AgentInfoProto()
ap.reward = agent_index
ap.done = agent_index % 2 == 0
ap.max_step_reached = agent_index % 2 == 1
ap.id = agent_index
ap.action_mask.extend([True, False] * 5)
obs_proto_list = []
for obs_index in range(len(shape)):
obs_proto = ObservationProto()
obs_proto.shape.extend(list(shape[obs_index]))
obs_proto.compression_type = 0
obs_proto.float_data.data.extend([0.1] * np.prod(shape[obs_index]))
obs_proto_list.append(obs_proto)
ap.observations.extend(obs_proto_list)
result.append(ap)
return result
def generate_compressed_data(in_array: np.ndarray) -> bytes:
image_arr = (in_array * 255).astype(np.uint8)
im = Image.fromarray(image_arr, "RGB")
byteIO = io.BytesIO()
im.save(byteIO, format="PNG")
return byteIO.getvalue()
def generate_compressed_proto_obs(in_array: np.ndarray) -> ObservationProto:
obs_proto = ObservationProto()
obs_proto.compressed_data = generate_compressed_data(in_array)
obs_proto.compression_type = 1
obs_proto.shape.extend(in_array.shape)
return obs_proto
def test_process_pixels():
in_array = np.random.rand(128, 128, 3)
byte_arr = generate_compressed_data(in_array)
out_array = process_pixels(byte_arr, False)
assert out_array.shape == (128, 128, 3)
assert np.sum(in_array - out_array) / np.prod(in_array.shape) < 0.01
assert (in_array - out_array < 0.01).all()
def test_process_pixels_gray():
in_array = np.random.rand(128, 128, 3)
byte_arr = generate_compressed_data(in_array)
out_array = process_pixels(byte_arr, True)
assert out_array.shape == (128, 128, 1)
assert np.mean(in_array.mean(axis=2, keepdims=True) - out_array) < 0.01
assert (in_array.mean(axis=2, keepdims=True) - out_array < 0.01).all()
def test_vector_observation():
n_agents = 10
shapes = [(3,), (4,)]
list_proto = generate_list_agent_proto(n_agents, shapes)
for obs_index, shape in enumerate(shapes):
arr = _process_vector_observation(obs_index, shape, list_proto)
assert list(arr.shape) == ([n_agents] + list(shape))
assert (np.abs(arr - 0.1) < 0.01).all()
def test_process_visual_observation():
in_array_1 = np.random.rand(128, 128, 3)
proto_obs_1 = generate_compressed_proto_obs(in_array_1)
in_array_2 = np.random.rand(128, 128, 3)
proto_obs_2 = generate_compressed_proto_obs(in_array_2)
ap1 = AgentInfoProto()
ap1.observations.extend([proto_obs_1])
ap2 = AgentInfoProto()
ap2.observations.extend([proto_obs_2])
ap_list = [ap1, ap2]
arr = _process_visual_observation(0, (128, 128, 3), ap_list)
assert list(arr.shape) == [2, 128, 128, 3]
assert (arr[0, :, :, :] - in_array_1 < 0.01).all()
assert (arr[1, :, :, :] - in_array_2 < 0.01).all()
def test_batched_step_result_from_proto():
n_agents = 10
shapes = [(3,), (4,)]
group_spec = AgentGroupSpec(shapes, ActionType.CONTINUOUS, 3)
ap_list = generate_list_agent_proto(n_agents, shapes)
result = batched_step_result_from_proto(ap_list, group_spec)
assert list(result.reward) == list(range(n_agents))
assert list(result.agent_id) == list(range(n_agents))
for index in range(n_agents):
assert result.done[index] == (index % 2 == 0)
assert result.max_step[index] == (index % 2 == 1)
assert list(result.obs[0].shape) == [n_agents] + list(shapes[0])
assert list(result.obs[1].shape) == [n_agents] + list(shapes[1])
def test_action_masking_discrete():
n_agents = 10
shapes = [(3,), (4,)]
group_spec = AgentGroupSpec(shapes, ActionType.DISCRETE, (7, 3))
ap_list = generate_list_agent_proto(n_agents, shapes)
result = batched_step_result_from_proto(ap_list, group_spec)
masks = result.action_mask
assert isinstance(masks, list)
assert len(masks) == 2
assert masks[0].shape == (n_agents, 7)
assert masks[1].shape == (n_agents, 3)
assert masks[0][0, 0]
assert not masks[1][0, 0]
assert masks[1][0, 1]
def test_action_masking_discrete_1():
n_agents = 10
shapes = [(3,), (4,)]
group_spec = AgentGroupSpec(shapes, ActionType.DISCRETE, (10,))
ap_list = generate_list_agent_proto(n_agents, shapes)
result = batched_step_result_from_proto(ap_list, group_spec)
masks = result.action_mask
assert isinstance(masks, list)
assert len(masks) == 1
assert masks[0].shape == (n_agents, 10)
assert masks[0][0, 0]
def test_action_masking_discrete_2():
n_agents = 10
shapes = [(3,), (4,)]
group_spec = AgentGroupSpec(shapes, ActionType.DISCRETE, (2, 2, 6))
ap_list = generate_list_agent_proto(n_agents, shapes)
result = batched_step_result_from_proto(ap_list, group_spec)
masks = result.action_mask
assert isinstance(masks, list)
assert len(masks) == 3
assert masks[0].shape == (n_agents, 2)
assert masks[1].shape == (n_agents, 2)
assert masks[2].shape == (n_agents, 6)
assert masks[0][0, 0]
def test_action_masking_continuous():
n_agents = 10
shapes = [(3,), (4,)]
group_spec = AgentGroupSpec(shapes, ActionType.CONTINUOUS, 10)
ap_list = generate_list_agent_proto(n_agents, shapes)
result = batched_step_result_from_proto(ap_list, group_spec)
masks = result.action_mask
assert masks is None
def test_agent_group_spec_from_proto():
agent_proto = generate_list_agent_proto(1, [(3,), (4,)])[0]
bp = BrainParametersProto()
bp.vector_action_size.extend([5, 4])
bp.vector_action_space_type = 0
group_spec = agent_group_spec_from_proto(bp, agent_proto)
assert group_spec.is_action_discrete()
assert not group_spec.is_action_continuous()
assert group_spec.observation_shapes == [(3,), (4,)]
assert group_spec.discrete_action_branches == (5, 4)
assert group_spec.action_size == 2
bp = BrainParametersProto()
bp.vector_action_size.extend([6])
bp.vector_action_space_type = 1
group_spec = agent_group_spec_from_proto(bp, agent_proto)
assert not group_spec.is_action_discrete()
assert group_spec.is_action_continuous()
assert group_spec.action_size == 6

73
ml-agents-envs/mlagents/envs/tests/test_brain.py


from typing import List
import logging
import numpy as np
from unittest import mock
from mlagents.envs.communicator_objects.agent_info_pb2 import AgentInfoProto
from mlagents.envs.communicator_objects.observation_pb2 import (
ObservationProto,
NONE as COMPRESSION_TYPE_NONE,
)
from mlagents.envs.brain import BrainInfo, BrainParameters
test_brain = BrainParameters(
brain_name="test_brain",
vector_observation_space_size=3,
camera_resolutions=[],
vector_action_space_size=[],
vector_action_descriptions=[],
vector_action_space_type=1,
)
def _make_agent_info_proto(vector_obs: List[float]) -> AgentInfoProto:
obs = ObservationProto(
float_data=ObservationProto.FloatData(data=vector_obs),
shape=[len(vector_obs)],
compression_type=COMPRESSION_TYPE_NONE,
)
agent_info_proto = AgentInfoProto(observations=[obs])
return agent_info_proto
@mock.patch.object(np, "nan_to_num", wraps=np.nan_to_num)
@mock.patch.object(logging.Logger, "warning")
def test_from_agent_proto_nan(mock_warning, mock_nan_to_num):
agent_info_proto = _make_agent_info_proto([1.0, 2.0, float("nan")])
brain_info = BrainInfo.from_agent_proto(1, [agent_info_proto], test_brain)
# nan gets set to 0.0
expected = [1.0, 2.0, 0.0]
assert (brain_info.vector_observations == expected).all()
mock_nan_to_num.assert_called()
mock_warning.assert_called()
@mock.patch.object(np, "nan_to_num", wraps=np.nan_to_num)
@mock.patch.object(logging.Logger, "warning")
def test_from_agent_proto_inf(mock_warning, mock_nan_to_num):
agent_info_proto = _make_agent_info_proto([1.0, float("inf"), 0.0])
brain_info = BrainInfo.from_agent_proto(1, [agent_info_proto], test_brain)
# inf should get set to float32_max
float32_max = np.finfo(np.float32).max
expected = [1.0, float32_max, 0.0]
assert (brain_info.vector_observations == expected).all()
mock_nan_to_num.assert_called()
# We don't warn on inf, just NaN
mock_warning.assert_not_called()
@mock.patch.object(np, "nan_to_num", wraps=np.nan_to_num)
@mock.patch.object(logging.Logger, "warning")
def test_from_agent_proto_fast_path(mock_warning, mock_nan_to_num):
"""
Check that all finite values skips the nan_to_num call
"""
agent_info_proto = _make_agent_info_proto([1.0, 2.0, 3.0])
brain_info = BrainInfo.from_agent_proto(1, [agent_info_proto], test_brain)
expected = [1.0, 2.0, 3.0]
assert (brain_info.vector_observations == expected).all()
mock_nan_to_num.assert_not_called()
mock_warning.assert_not_called()

25
ml-agents-envs/mlagents/envs/base_unity_environment.py


from abc import ABC, abstractmethod
from typing import Dict, Optional
from mlagents.envs.brain import AllBrainInfo, BrainParameters
class BaseUnityEnvironment(ABC):
@abstractmethod
def step(
self, vector_action: Optional[Dict] = None, value: Optional[Dict] = None
) -> AllBrainInfo:
pass
@abstractmethod
def reset(self) -> AllBrainInfo:
pass
@property
@abstractmethod
def external_brains(self) -> Dict[str, BrainParameters]:
pass
@abstractmethod
def close(self):
pass
正在加载...
取消
保存