浏览代码

Merge branch 'master' into soccer-2v1

/asymm-envs
Andrew Cohen 5 年前
当前提交
9bec75ee
共有 49 个文件被更改,包括 1079 次插入1118 次删除
  1. 2
      .yamato/gym-interface-test.yml
  2. 2
      .yamato/python-ll-api-test.yml
  3. 1
      .yamato/standalone-build-test.yml
  4. 3
      Project/Assets/ML-Agents/Examples/SharedAssets/Scripts/SensorBase.cs
  5. 2
      com.unity.ml-agents/CHANGELOG.md
  6. 11
      com.unity.ml-agents/Runtime/Agent.cs
  7. 5
      com.unity.ml-agents/Runtime/Communicator/RpcCommunicator.cs
  8. 3
      com.unity.ml-agents/Runtime/Sensors/CameraSensor.cs
  9. 6
      com.unity.ml-agents/Runtime/Sensors/ISensor.cs
  10. 3
      com.unity.ml-agents/Runtime/Sensors/RayPerceptionSensor.cs
  11. 3
      com.unity.ml-agents/Runtime/Sensors/RenderTextureSensor.cs
  12. 13
      com.unity.ml-agents/Runtime/Sensors/StackingSensor.cs
  13. 6
      com.unity.ml-agents/Runtime/Sensors/VectorSensor.cs
  14. 9
      com.unity.ml-agents/Tests/Editor/MLAgentsEditModeTest.cs
  15. 1
      com.unity.ml-agents/Tests/Editor/ParameterLoaderTest.cs
  16. 1
      com.unity.ml-agents/Tests/Editor/Sensor/FloatVisualSensorTests.cs
  17. 1
      com.unity.ml-agents/Tests/Editor/Sensor/SensorShapeValidatorTests.cs
  18. 18
      com.unity.ml-agents/Tests/Editor/Sensor/StackingSensorTests.cs
  19. 2
      docs/Migrating.md
  20. 141
      docs/Python-API.md
  21. 15
      gym-unity/README.md
  22. 342
      gym-unity/gym_unity/envs/__init__.py
  23. 198
      gym-unity/gym_unity/tests/test_gym.py
  24. 249
      ml-agents-envs/mlagents_envs/base_env.py
  25. 102
      ml-agents-envs/mlagents_envs/environment.py
  26. 99
      ml-agents-envs/mlagents_envs/rpc_utils.py
  27. 43
      ml-agents-envs/mlagents_envs/tests/test_envs.py
  28. 170
      ml-agents-envs/mlagents_envs/tests/test_rpc_utils.py
  29. 179
      ml-agents/mlagents/trainers/agent_processor.py
  30. 16
      ml-agents/mlagents/trainers/brain_conversion_utils.py
  31. 59
      ml-agents/mlagents/trainers/demo_loader.py
  32. 36
      ml-agents/mlagents/trainers/env_manager.py
  33. 10
      ml-agents/mlagents/trainers/policy/nn_policy.py
  34. 4
      ml-agents/mlagents/trainers/policy/policy.py
  35. 25
      ml-agents/mlagents/trainers/policy/tf_policy.py
  36. 26
      ml-agents/mlagents/trainers/simple_env_manager.py
  37. 24
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  38. 42
      ml-agents/mlagents/trainers/tests/mock_brain.py
  39. 137
      ml-agents/mlagents/trainers/tests/simple_test_envs.py
  40. 54
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  41. 8
      ml-agents/mlagents/trainers/tests/test_demo_loader.py
  42. 12
      ml-agents/mlagents/trainers/tests/test_nn_policy.py
  43. 24
      ml-agents/mlagents/trainers/tests/test_policy.py
  44. 4
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  45. 5
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py
  46. 27
      ml-agents/tests/yamato/scripts/run_gym.py
  47. 29
      ml-agents/tests/yamato/scripts/run_llapi.py
  48. 18
      ml-agents/tests/yamato/standalone_build_tests.py
  49. 7
      ml-agents/tests/yamato/yamato_utils.py

2
.yamato/gym-interface-test.yml


commands:
- pip install pyyaml
- python -u -m ml-agents.tests.yamato.setup_venv
- ./venv/bin/python ml-agents/tests/yamato/scripts/run_gym.py
- ./venv/bin/python ml-agents/tests/yamato/scripts/run_gym.py --env=Project/testPlayer-Basic
dependencies:
- .yamato/standalone-build-test.yml#test_mac_standalone_{{ editor.version }}
triggers:

2
.yamato/python-ll-api-test.yml


- python -u -m ml-agents.tests.yamato.setup_venv
- ./venv/bin/python ml-agents/tests/yamato/scripts/run_llapi.py
dependencies:
- .yamato/standalone-build-test.yml#test_mac_standalone_{{ editor.version }}
- .yamato/standalone-build-test.yml#test_mac_standalone_{{ editor.version }} --env=Project/testPlayer
triggers:
cancel_old_ci: true
changes:

1
.yamato/standalone-build-test.yml


commands:
- pip install pyyaml
- python -u -m ml-agents.tests.yamato.standalone_build_tests
- python -u -m ml-agents.tests.yamato.standalone_build_tests --scene=Assets/ML-Agents/Examples/Basic/Scenes/Basic.unity
triggers:
cancel_old_ci: true
changes:

3
Project/Assets/ML-Agents/Examples/SharedAssets/Scripts/SensorBase.cs


public void Update() {}
/// <inheritdoc/>
public void Reset() { }
/// <inheritdoc/>
public virtual byte[] GetCompressedObservation()
{
return null;

2
com.unity.ml-agents/CHANGELOG.md


- The Jupyter notebooks have been removed from the repository.
- Introduced the `SideChannelUtils` to register, unregister and access side channels.
- `Academy.FloatProperties` was removed, please use `SideChannelUtils.GetSideChannel<FloatPropertiesChannel>()` instead.
- Removed the multi-agent gym option from the gym wrapper. For multi-agent scenarios, use the [Low Level Python API](Python-API.md).
- The low level Python API has changed. You can look at the document [Low Level Python API documentation](Python-API.md) for more information. If you use `mlagents-learn` for training, this should be a transparent change.
- Added ability to start training (initialize model weights) from a previous run ID. (#3710)
### Minor Changes

11
com.unity.ml-agents/Runtime/Agent.cs


// Request the last decision with no callbacks
// We request a decision so Python knows the Agent is done immediately
m_Brain?.RequestDecision(m_Info, sensors);
ResetSensors();
// We also have to write any to any DemonstationStores so that they get the "done" flag.
foreach (var demoWriter in DemonstrationWriters)

UpdateRewardStats();
}
// The Agent is done, so we give it a new episode Id
m_EpisodeId = EpisodeIdCounter.GetEpisodeId();
m_Reward = 0f;
m_CumulativeReward = 0f;
m_RequestAction = false;

foreach (var sensor in sensors)
{
sensor.Update();
}
}
void ResetSensors()
{
foreach (var sensor in sensors)
{
sensor.Reset();
}
}

5
com.unity.ml-agents/Runtime/Communicator/RpcCommunicator.cs


{
m_OrderedAgentsRequestingDecisions[behaviorName] = new List<int>();
}
m_OrderedAgentsRequestingDecisions[behaviorName].Add(info.episodeId);
if (!info.done)
{
m_OrderedAgentsRequestingDecisions[behaviorName].Add(info.episodeId);
}
if (!m_LastActionsReceived.ContainsKey(behaviorName))
{
m_LastActionsReceived[behaviorName] = new Dictionary<int, float[]>();

3
com.unity.ml-agents/Runtime/Sensors/CameraSensor.cs


public void Update() {}
/// <inheritdoc/>
public void Reset() { }
/// <inheritdoc/>
public SensorCompressionType GetCompressionType()
{
return m_CompressionType;

6
com.unity.ml-agents/Runtime/Sensors/ISensor.cs


void Update();
/// <summary>
/// Resets the internal states of the sensor. This is called at the end of an Agent's episode.
/// Most implementations can leave this empty.
/// </summary>
void Reset();
/// <summary>
/// Return the compression type being used. If no compression is used, return
/// <see cref="SensorCompressionType.None"/>.
/// </summary>

3
com.unity.ml-agents/Runtime/Sensors/RayPerceptionSensor.cs


}
/// <inheritdoc/>
public void Reset() { }
/// <inheritdoc/>
public int[] GetObservationShape()
{
return m_Shape;

3
com.unity.ml-agents/Runtime/Sensors/RenderTextureSensor.cs


public void Update() {}
/// <inheritdoc/>
public void Reset() { }
/// <inheritdoc/>
public SensorCompressionType GetCompressionType()
{
return m_CompressionType;

13
com.unity.ml-agents/Runtime/Sensors/StackingSensor.cs


using System;
namespace MLAgents.Sensors
{
/// <summary>

{
m_WrappedSensor.Update();
m_CurrentIndex = (m_CurrentIndex + 1) % m_NumStackedObservations;
}
/// <inheritdoc/>
public void Reset()
{
m_WrappedSensor.Reset();
// Zero out the buffer.
for (var i = 0; i < m_NumStackedObservations; i++)
{
Array.Clear(m_StackedObservations[i], 0, m_StackedObservations[i].Length);
}
}
/// <inheritdoc/>

6
com.unity.ml-agents/Runtime/Sensors/VectorSensor.cs


}
/// <inheritdoc/>
public void Reset()
{
Clear();
}
/// <inheritdoc/>
public int[] GetObservationShape()
{
return m_Shape;

9
com.unity.ml-agents/Tests/Editor/MLAgentsEditModeTest.cs


public string sensorName;
public int numWriteCalls;
public int numCompressedCalls;
public int numResetCalls;
public SensorCompressionType compressionType = SensorCompressionType.None;
public TestSensor(string n)

}
public void Update() {}
public void Reset()
{
numResetCalls++;
}
}
[TestFixture]

var expectedAgentActionForEpisode = 0;
var expectedCollectObsCalls = 0;
var expectedCollectObsCallsForEpisode = 0;
var expectedSensorResetCalls = 0;
for (var i = 0; i < 15; i++)
{

expectedAgentActionForEpisode = 0;
expectedCollectObsCallsForEpisode = 0;
expectedAgentStepCount = 0;
expectedSensorResetCalls++;
}
aca.EnvironmentStep();

Assert.AreEqual(expectedAgentActionForEpisode, agent1.agentActionCallsForEpisode);
Assert.AreEqual(expectedCollectObsCalls, agent1.collectObservationsCalls);
Assert.AreEqual(expectedCollectObsCallsForEpisode, agent1.collectObservationsCallsForEpisode);
Assert.AreEqual(expectedSensorResetCalls, agent1.sensor1.numResetCalls);
}
}

1
com.unity.ml-agents/Tests/Editor/ParameterLoaderTest.cs


}
public void Update() {}
public void Reset() { }
public SensorCompressionType GetCompressionType()
{

1
com.unity.ml-agents/Tests/Editor/Sensor/FloatVisualSensorTests.cs


}
public void Update() {}
public void Reset() { }
public SensorCompressionType GetCompressionType()
{

1
com.unity.ml-agents/Tests/Editor/Sensor/SensorShapeValidatorTests.cs


}
public void Update() { }
public void Reset() { }
public SensorCompressionType GetCompressionType()
{

18
com.unity.ml-agents/Tests/Editor/Sensor/StackingSensorTests.cs


// Check that if we don't call Update(), the same observations are produced
SensorTestHelper.CompareObservation(sensor, new[] {5f, 6f, 7f, 8f, 9f, 10f});
}
[Test]
public void TestStackingReset()
{
VectorSensor wrapped = new VectorSensor(2);
ISensor sensor = new StackingSensor(wrapped, 3);
wrapped.AddObservation(new[] {1f, 2f});
SensorTestHelper.CompareObservation(sensor, new[] {0f, 0f, 0f, 0f, 1f, 2f});
sensor.Update();
wrapped.AddObservation(new[] {3f, 4f});
SensorTestHelper.CompareObservation(sensor, new[] {0f, 0f, 1f, 2f, 3f, 4f});
sensor.Reset();
wrapped.AddObservation(new[] {5f, 6f});
SensorTestHelper.CompareObservation(sensor, new[] {0f, 0f, 0f, 0f, 5f, 6f});
}
}
}

2
docs/Migrating.md


* The `--load` and `--train` command-line flags have been deprecated and replaced with `--resume` and `--inference`.
* Running with the same `--run-id` twice will now throw an error.
* The `play_against_current_self_ratio` self-play trainer hyperparameter has been renamed to `play_against_latest_model_ratio`
* Removed the multi-agent gym option from the gym wrapper. For multi-agent scenarios, use the [Low Level Python API](Python-API.md).
* The low level Python API has changed. You can look at the document [Low Level Python API documentation](Python-API.md) for more information. If you use `mlagents-learn` for training, this should be a transparent change.
### Steps to Migrate
* Replace the `--load` flag with `--resume` when calling `mlagents-learn`, and don't use the `--train` flag as training

141
docs/Python-API.md


- **UnityEnvironment** — the main interface between the Unity application and
your code. Use UnityEnvironment to start and control a simulation or training
session.
- **BatchedStepResult** — contains the data from Agents belonging to the same
"AgentGroup" in the simulation, such as observations and rewards.
- **AgentGroupSpec** — describes the shape of the data inside a BatchedStepResult.
For example, provides the dimensions of the observations of a group.
- **BehaviorName** - is a string that identifies a behavior in the simulation.
- **AgentId** - is an `int` that serves as unique identifier for Agents in the
simulation.
- **DecisionSteps** — contains the data from Agents belonging to the same
"Behavior" in the simulation, such as observations and rewards. Only Agents
that requested a decision since the last call to `env.step()` are in the
DecisionSteps object.
- **TerminalSteps** — contains the data from Agents belonging to the same
"Behavior" in the simulation, such as observations and rewards. Only Agents
whose episode ended since the last call to `env.step()` are in the
TerminalSteps object.
- **BehaviorSpec** — describes the shape of the observation data inside
DecisionSteps and TerminalSteps as well as the expected action shapes.
An Agent Group is a group of Agents identified by a string name that share the same
observations and action types. You can think about Agent Group as a group of agents
that will share the same policy or behavior. All Agents in a group have the same goal
and reward signals.
An Agent "Behavior" is a group of Agents identified by a `BehaviorName` that share the same
observations and action types (described in their `BehaviorSpec`). You can think about Agent
Behavior as a group of agents that will share the same policy. All Agents with the same
behavior have the same goal and reward signals.
__Note__: The `Behavior Name` corresponds to the Agent Group name on Python.
_Notice: Currently communication between Unity and Python takes place over an
open socket without authentication. As such, please make sure that the network

move forward until an Agent in the simulation needs a input from Python to act.
- **Close : `env.close()`** Sends a shutdown signal to the environment and terminates
the communication.
- **Get Agent Group Names : `env.get_agent_groups()`** Returns a list of agent group ids.
- **Get Behavior Names : `env.get_behavior_names()`** Returns a list of `BehaviorName`.
agent groups are created in the simulation.
- **Get Agent Group Spec : `env.get_agent_group_spec(agent_group: str)`** Returns
the `AgentGroupSpec` corresponding to the agent_group given as input. An
`AgentGroupSpec` contains information such as the observation shapes, the action
type (multi-discrete or continuous) and the action shape. Note that the `AgentGroupSpec`
Agent behaviors are created in the simulation.
- **Get Behavior Spec : `env.get_behavior_spec(behavior_name: str)`** Returns
the `BehaviorSpec` corresponding to the behavior_name given as input. A
`BehaviorSpec` contains information such as the observation shapes, the action
type (multi-discrete or continuous) and the action shape. Note that the `BehaviorSpec`
- **Get Batched Step Result for Agent Group : `env.get_step_result(agent_group: str)`**
Returns a `BatchedStepResult` corresponding to the agent_group given as input.
A `BatchedStepResult` contains information about the state of the agents in a group
such as the observations, the rewards, the done flags and the agent identifiers. The
data is in `np.array` of which the first dimension is always the number of agents which
requested a decision in the simulation since the last call to `env.step()` note that the
number of agents is not guaranteed to remain constant during the simulation.
- **Set Actions for Agent Group :`env.set_actions(agent_group: str, action: np.array)`**
- **Get Steps : `env.get_steps(behavior_name: str)`**
Returns a tuple `DecisionSteps, TerminalSteps` corresponding to the behavior_name
given as input.
The `DecisionSteps` contains information about the state of the agents
**that need an action this step** and have the behavior behavior_name.
The `TerminalSteps` contains information about the state of the agents
**whose episode ended** and have the behavior behavior_name.
Both `DecisionSteps` and `TerminalSteps` contain information such as
the observations, the rewards and the agent identifiers.
`DecisionSteps` also contains action masks for the next action while `TerminalSteps`
contains the reason for termination (did the Agent reach its maximum step and was
interrupted). The data is in `np.array` of which the first dimension is always the
number of agents note that the number of agents is not guaranteed to remain constant
during the simulation and it is not unusual to have either `DecisionSteps` or `TerminalSteps`
contain no Agents at all.
- **Set Actions :`env.set_actions(behavior_name: str, action: np.array)`**
Sets the actions for a whole agent group. `action` is a 2D `np.array` of `dtype=np.int32`
in the discrete action case and `dtype=np.float32` in the continuous action case.
The first dimension of `action` is the number of agents that requested a decision

__Note:__ If no action is provided for an agent group between two calls to `env.step()` then
the default action will be all zeros (in either discrete or continuous action space)
#### BathedStepResult and StepResult
#### DecisionSteps and DecisionStep
`DecisionSteps` (with `s`) contains information about a whole batch of Agents while
`DecisionStep` (no `s`) only contains information about a single Agent.
A `BatchedStepResult` has the following fields :
A `DecisionSteps` has the following fields :
- `obs` is a list of numpy arrays observations collected by the group of
agent. The first dimension of the array corresponds to the batch size of

rewards collected by each agent since the last simulation step.
- `done` is an array of booleans of length batch size. Is true if the
associated Agent was terminated during the last simulation step.
- `max_step` is an array of booleans of length batch size. Is true if the
associated Agent reached its maximum number of steps during the last
simulation step.
- `agent_id` is an int vector of length batch size containing unique
identifier for the corresponding Agent. This is used to track Agents
across simulation steps.

It also has the two following methods:
- `n_agents()` Returns the number of agents requesting a decision since
the last call to `env.step()`
- `get_agent_step_result(agent_id: int)` Returns a `StepResult`
- `len(DecisionSteps)` Returns the number of agents requesting a decision since
the last call to `env.step()`.
- `DecisionSteps[agent_id]` Returns a `DecisionStep`
A `StepResult` has the following fields:
A `DecisionStep` has the following fields:
- `obs` is a list of numpy arrays observations collected by the group of
agent. (Each array has one less dimension than the arrays in `BatchedStepResult`)
- `obs` is a list of numpy arrays observations collected by the agent.
(Each array has one less dimension than the arrays in `DecisionSteps`)
- `max_step` is a bool. Is true if the Agent reached its maximum number of
steps during the last simulation step.
- `agent_id` is an int and an unique identifier for the corresponding Agent.
- `action_mask` is an optional list of one dimensional array of booleans.
Only available in multi-discrete action space type.

#### AgentGroupSpec
#### TerminalSteps and TerminalStep
Similarly to `DecisionSteps` and `DecisionStep`,
`TerminalSteps` (with `s`) contains information about a whole batch of Agents while
`TerminalStep` (no `s`) only contains information about a single Agent.
An Agent group can either have discrete or continuous actions. To check which type
A `TerminalSteps` has the following fields :
- `obs` is a list of numpy arrays observations collected by the group of
agent. The first dimension of the array corresponds to the batch size of
the group (number of agents requesting a decision since the last call to
`env.step()`).
- `reward` is a float vector of length batch size. Corresponds to the
rewards collected by each agent since the last simulation step.
- `done` is an array of booleans of length batch size. Is true if the
associated Agent was terminated during the last simulation step.
- `agent_id` is an int vector of length batch size containing unique
identifier for the corresponding Agent. This is used to track Agents
across simulation steps.
- `max_step` is an array of booleans of length batch size. Is true if the
associated Agent reached its maximum number of steps during the last
simulation step.
It also has the two following methods:
- `len(TerminalSteps)` Returns the number of agents requesting a decision since
the last call to `env.step()`.
- `TerminalSteps[agent_id]` Returns a `TerminalStep`
for the Agent with the `agent_id` unique identifier.
A `TerminalStep` has the following fields:
- `obs` is a list of numpy arrays observations collected by the agent.
(Each array has one less dimension than the arrays in `TerminalSteps`)
- `reward` is a float. Corresponds to the rewards collected by the agent
since the last simulation step.
- `done` is a bool. Is true if the Agent was terminated during the last
simulation step.
- `agent_id` is an int and an unique identifier for the corresponding Agent.
- `max_step` is a bool. Is true if the Agent reached its maximum number of
steps during the last simulation step.
#### BehaviorSpec
An Agent behavior can either have discrete or continuous actions. To check which type
An `AgentGroupSpec` has the following fields :
A `BehaviorSpec` has the following fields :
BatchedStepResult and StepResult.
DecisionSteps, DecisionStep, TerminalSteps and TerminalStep.
- `action_type` is the type of data of the action. it can be discrete or
continuous. If discrete, the action tensors are expected to be `np.int32`. If
continuous, the actions are expected to be `np.float32`.

### Communicating additional information with the Environment
In addition to the means of communicating between Unity and python described above,
we also provide methods for sharing agent-agnostic information. These
additional methods are referred to as side channels. ML-Agents includes two ready-made

15
gym-unity/README.md


information on the gym interface, see [here](https://github.com/openai/gym).
We provide a gym wrapper and instructions for using it with existing machine
learning algorithms which utilize gyms. Both wrappers provide interfaces on top
learning algorithms which utilize gym. Our wrapper provides interfaces on top
of our `UnityEnvironment` class, which is the default way of interfacing with a
Unity environment via Python.

or by running the following from the `/gym-unity` directory of the repository:
```sh
pip install .
pip install -e .
```
## Using the Gym Wrapper

```python
from gym_unity.envs import UnityEnv
env = UnityEnv(environment_filename, worker_id, use_visual, uint8_visual, multiagent)
env = UnityEnv(environment_filename, worker_id, use_visual, uint8_visual)
```
* `environment_filename` refers to the path to the Unity environment.

(0-255). Many common Gym environments (e.g. Atari) do this. By default they
will be floats (0.0-1.0). Defaults to `False`.
* `multiagent` refers to whether you intent to launch an environment which
contains more than one agent. Defaults to `False`.
* `flatten_branched` will flatten a branched discrete action space into a Gym Discrete.
Otherwise, it will be converted into a MultiDiscrete. Defaults to `False`.

## Limitations
* It is only possible to use an environment with a single Agent.
* It is only possible to use an environment with a **single** Agent.
* The `BatchedStepResult` output from the environment can still be accessed from the
`info` provided by `env.step(action)`.
* The `TerminalSteps` or `DecisionSteps` output from the environment can still be
accessed from the `info` provided by `env.step(action)`.
* Stacked vector observations are not supported.
* Environment registration for use with `gym.make()` is currently not supported.

342
gym-unity/gym_unity/envs/__init__.py


from gym import error, spaces
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.base_env import BatchedStepResult
from mlagents_envs.base_env import DecisionSteps, TerminalSteps
from mlagents_envs import logging_util

logger = logging_util.get_logger(__name__)
logging_util.set_log_level(logging_util.INFO)
GymSingleStepResult = Tuple[np.ndarray, float, bool, Dict]
GymMultiStepResult = Tuple[List[np.ndarray], List[float], List[bool], Dict]
GymStepResult = Union[GymSingleStepResult, GymMultiStepResult]
GymStepResult = Tuple[np.ndarray, float, bool, Dict]
Multi-agent environments use lists for object types, as done here:
https://github.com/openai/multiagent-particle-envs
"""
def __init__(

use_visual: bool = False,
uint8_visual: bool = False,
multiagent: bool = False,
flatten_branched: bool = False,
no_graphics: bool = False,
allow_multiple_visual_obs: bool = False,

:param worker_id: Worker number for environment.
:param use_visual: Whether to use visual observation or vector observation.
:param uint8_visual: Return visual observations as uint8 (0-255) matrices instead of float (0.0-1.0).
:param multiagent: Whether to run in multi-agent mode (lists of obs, reward, done).
:param flatten_branched: If True, turn branched discrete action spaces into a Discrete space rather than
MultiDiscrete.
:param no_graphics: Whether to run the Unity simulator in no-graphics mode

)
# Take a single step so that the brain information will be sent over
if not self._env.get_agent_groups():
if not self._env.get_behavior_names():
self.agent_mapper = AgentIdIndexMapper()
self._previous_step_result: BatchedStepResult = None
self._multiagent = multiagent
self._previous_decision_step: DecisionSteps = None
self._flattener = None
# Hidden flag used by Atari environments to determine if the game is over
self.game_over = False

if len(self._env.get_agent_groups()) != 1:
if len(self._env.get_behavior_names()) != 1:
"There can only be one brain in a UnityEnvironment "
"There can only be one behavior in a UnityEnvironment "
self.brain_name = self._env.get_agent_groups()[0]
self.name = self.brain_name
self.group_spec = self._env.get_agent_group_spec(self.brain_name)
self.name = self._env.get_behavior_names()[0]
self.group_spec = self._env.get_behavior_spec(self.name)
if use_visual and self._get_n_vis_obs() == 0:
raise UnityGymException(

# Check for number of agents in scene.
self._env.reset()
step_result = self._env.get_step_result(self.brain_name)
self._check_agents(step_result.n_agents())
self._previous_step_result = step_result
self.agent_mapper.set_initial_agents(list(self._previous_step_result.agent_id))
decision_steps, _ = self._env.get_steps(self.name)
self._check_agents(len(decision_steps))
self._previous_decision_step = decision_steps
# Set observation and action spaces
if self.group_spec.is_action_discrete():

def reset(self) -> Union[List[np.ndarray], np.ndarray]:
"""Resets the state of the environment and returns an initial observation.
In the case of multi-agent environments, this is a list.
step_result = self._step(True)
n_agents = step_result.n_agents()
self._env.reset()
decision_step, _ = self._env.get_steps(self.name)
n_agents = len(decision_step)
if not self._multiagent:
res: GymStepResult = self._single_step(step_result)
else:
res = self._multi_step(step_result)
res: GymStepResult = self._single_step(decision_step)
return res[0]
def step(self, action: List[Any]) -> GymStepResult:

Accepts an action and returns a tuple (observation, reward, done, info).
In the case of multi-agent environments, these are lists.
Args:
action (object/list): an action provided by the environment
Returns:

info (dict): contains auxiliary diagnostic information, including BatchedStepResult.
info (dict): contains auxiliary diagnostic information.
# Use random actions for all other agents in environment.
if self._multiagent:
if not isinstance(action, list):
raise UnityGymException(
"The environment was expecting `action` to be a list."
)
if len(action) != self._n_agents:
raise UnityGymException(
"The environment was expecting a list of {} actions.".format(
self._n_agents
)
)
else:
if self._flattener is not None:
# Action space is discrete and flattened - we expect a list of scalars
action = [self._flattener.lookup_action(_act) for _act in action]
action = np.array(action)
else:
if self._flattener is not None:
# Translate action into list
action = self._flattener.lookup_action(action)
if self._flattener is not None:
# Translate action into list
action = self._flattener.lookup_action(action)
action = np.array(action).reshape((self._n_agents, spec.action_size))
action = self._sanitize_action(action)
self._env.set_actions(self.brain_name, action)
action = np.array(action).reshape((1, spec.action_size))
self._env.set_actions(self.name, action)
step_result = self._step()
n_agents = step_result.n_agents()
self._check_agents(n_agents)
if not self._multiagent:
single_res = self._single_step(step_result)
self.game_over = single_res[2]
return single_res
self._env.step()
decision_step, terminal_step = self._env.get_steps(self.name)
if len(terminal_step) != 0:
# The agent is done
self.game_over = True
return self._single_step(terminal_step)
multi_res = self._multi_step(step_result)
self.game_over = all(multi_res[2])
return multi_res
return self._single_step(decision_step)
def _single_step(self, info: BatchedStepResult) -> GymSingleStepResult:
def _single_step(self, info: Union[DecisionSteps, TerminalSteps]) -> GymStepResult:
if self.use_visual:
visual_obs = self._get_vis_obs_list(info)

"The Agent does not have vector observations and the environment was not setup "
+ "to use visual observations."
)
done = isinstance(info, TerminalSteps)
return (
default_observation,
info.reward[0],
info.done[0],
{"batched_step_result": info},
)
return (default_observation, info.reward[0], done, {"step": info})
def _preprocess_single(self, single_visual_obs: np.ndarray) -> np.ndarray:
if self.uint8_visual:

def _multi_step(self, info: BatchedStepResult) -> GymMultiStepResult:
if self.use_visual:
self.visual_obs = self._preprocess_multi(self._get_vis_obs_list(info))
default_observation = self.visual_obs
else:
default_observation = self._get_vector_obs(info)
return (
list(default_observation),
list(info.reward),
list(info.done),
{"batched_step_result": info},
)
def _get_n_vis_obs(self) -> int:
result = 0
for shape in self.group_spec.observation_shapes:

return shape
return None
def _get_vis_obs_list(self, step_result: BatchedStepResult) -> List[np.ndarray]:
def _get_vis_obs_list(
self, step_result: Union[DecisionSteps, TerminalSteps]
) -> List[np.ndarray]:
result: List[np.ndarray] = []
for obs in step_result.obs:
if len(obs.shape) == 4:

def _get_vector_obs(self, step_result: BatchedStepResult) -> np.ndarray:
def _get_vector_obs(
self, step_result: Union[DecisionSteps, TerminalSteps]
) -> np.ndarray:
result: List[np.ndarray] = []
for obs in step_result.obs:
if len(obs.shape) == 2:

result += shape[0]
return result
def _preprocess_multi(
self, multiple_visual_obs: List[np.ndarray]
) -> List[np.ndarray]:
if self.uint8_visual:
return [
(255.0 * _visual_obs).astype(np.uint8)
for _visual_obs in multiple_visual_obs
]
else:
return multiple_visual_obs
def render(self, mode="rgb_array"):
return self.visual_obs

return
def _check_agents(self, n_agents: int) -> None:
if not self._multiagent and n_agents > 1:
raise UnityGymException(
"The environment was launched as a single-agent environment, however "
"there is more than one agent in the scene."
)
elif self._multiagent and n_agents <= 1:
raise UnityGymException(
"The environment was launched as a mutli-agent environment, however "
"there is only one agent in the scene."
)
if self._n_agents == -1:
self._n_agents = n_agents
logger.info("{} agents within environment.".format(n_agents))
elif self._n_agents != n_agents:
if self._n_agents > 1:
"The number of agents in the environment has changed since "
"initialization. This is not supported."
"There can only be one Agent in the environment but {n_agents} were detected."
def _sanitize_info(self, step_result: BatchedStepResult) -> BatchedStepResult:
n_extra_agents = step_result.n_agents() - self._n_agents
if n_extra_agents < 0:
# In this case, some Agents did not request a decision when expected
raise UnityGymException(
"The number of agents in the scene does not match the expected number."
)
if step_result.n_agents() - sum(step_result.done) != self._n_agents:
raise UnityGymException(
"The number of agents in the scene does not match the expected number."
)
for index, agent_id in enumerate(step_result.agent_id):
if step_result.done[index]:
self.agent_mapper.mark_agent_done(agent_id, step_result.reward[index])
# Set the new AgentDone flags to True
# Note that the corresponding agent_id that gets marked done will be different
# than the original agent that was done, but this is OK since the gym interface
# only cares about the ordering.
for index, agent_id in enumerate(step_result.agent_id):
if not self._previous_step_result.contains_agent(agent_id):
if step_result.done[index]:
# If the Agent is already done (e.g. it ended its epsiode twice in one step)
# Don't try to register it here.
continue
# Register this agent, and get the reward of the previous agent that
# was in its index, so that we can return it to the gym.
last_reward = self.agent_mapper.register_new_agent_id(agent_id)
step_result.done[index] = True
step_result.reward[index] = last_reward
self._previous_step_result = step_result # store the new original
# Get a permutation of the agent IDs so that a given ID stays in the same
# index as where it was first seen.
new_id_order = self.agent_mapper.get_id_permutation(list(step_result.agent_id))
_mask: Optional[List[np.array]] = None
if step_result.action_mask is not None:
_mask = []
for mask_index in range(len(step_result.action_mask)):
_mask.append(step_result.action_mask[mask_index][new_id_order])
new_obs: List[np.array] = []
for obs_index in range(len(step_result.obs)):
new_obs.append(step_result.obs[obs_index][new_id_order])
return BatchedStepResult(
obs=new_obs,
reward=step_result.reward[new_id_order],
done=step_result.done[new_id_order],
max_step=step_result.max_step[new_id_order],
agent_id=step_result.agent_id[new_id_order],
action_mask=_mask,
)
def _sanitize_action(self, action: np.array) -> np.array:
sanitized_action = np.zeros(
(self._previous_step_result.n_agents(), self.group_spec.action_size)
)
for index, agent_id in enumerate(self._previous_step_result.agent_id):
if not self._previous_step_result.done[index]:
array_index = self.agent_mapper.get_gym_index(agent_id)
sanitized_action[index, :] = action[array_index, :]
return sanitized_action
def _step(self, needs_reset: bool = False) -> BatchedStepResult:
if needs_reset:
self._env.reset()
else:
self._env.step()
info = self._env.get_step_result(self.brain_name)
# Two possible cases here:
# 1) all agents requested decisions (some of which might be done)
# 2) some Agents were marked Done in between steps.
# In case 2, we re-request decisions until all agents request a real decision.
while info.n_agents() - sum(info.done) < self._n_agents:
if not info.done.all():
raise UnityGymException(
"The environment does not have the expected amount of agents. "
+ "Some agents did not request decisions at the same time."
)
for agent_id, reward in zip(info.agent_id, info.reward):
self.agent_mapper.mark_agent_done(agent_id, reward)
self._env.step()
info = self._env.get_step_result(self.brain_name)
return self._sanitize_info(info)
@property
def metadata(self):
return {"render.modes": ["rgb_array"]}

:return: The List containing the branched actions.
"""
return self.action_lookup[action]
class AgentIdIndexMapper:
def __init__(self) -> None:
self._agent_id_to_gym_index: Dict[int, int] = {}
self._done_agents_index_to_last_reward: Dict[int, float] = {}
def set_initial_agents(self, agent_ids: List[int]) -> None:
"""
Provide the initial list of agent ids for the mapper
"""
for idx, agent_id in enumerate(agent_ids):
self._agent_id_to_gym_index[agent_id] = idx
def mark_agent_done(self, agent_id: int, reward: float) -> None:
"""
Declare the agent done with the corresponding final reward.
"""
if agent_id in self._agent_id_to_gym_index:
gym_index = self._agent_id_to_gym_index.pop(agent_id)
self._done_agents_index_to_last_reward[gym_index] = reward
else:
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
pass
def register_new_agent_id(self, agent_id: int) -> float:
"""
Adds the new agent ID and returns the reward to use for the previous agent in this index
"""
# Any free index is OK here.
free_index, last_reward = self._done_agents_index_to_last_reward.popitem()
self._agent_id_to_gym_index[agent_id] = free_index
return last_reward
def get_id_permutation(self, agent_ids: List[int]) -> List[int]:
"""
Get the permutation from new agent ids to the order that preserves the positions of previous agents.
The result is a list with each integer from 0 to len(_agent_id_to_gym_index)-1
appearing exactly once.
"""
# Map the new agent ids to the their index
new_agent_ids_to_index = {
agent_id: idx for idx, agent_id in enumerate(agent_ids)
}
# Make the output list. We don't write to it sequentially, so start with dummy values.
new_permutation = [-1] * len(self._agent_id_to_gym_index)
# For each agent ID, find the new index of the agent, and write it in the original index.
for agent_id, original_index in self._agent_id_to_gym_index.items():
new_permutation[original_index] = new_agent_ids_to_index[agent_id]
return new_permutation
def get_gym_index(self, agent_id: int) -> int:
"""
Get the gym index for the current agent.
"""
return self._agent_id_to_gym_index[agent_id]
class AgentIdIndexMapperSlow:
"""
Reference implementation of AgentIdIndexMapper.
The operations are O(N^2) so it shouldn't be used for large numbers of agents.
See AgentIdIndexMapper for method descriptions
"""
def __init__(self) -> None:
self._gym_id_order: List[int] = []
self._done_agents_index_to_last_reward: Dict[int, float] = {}
def set_initial_agents(self, agent_ids: List[int]) -> None:
self._gym_id_order = list(agent_ids)
def mark_agent_done(self, agent_id: int, reward: float) -> None:
try:
gym_index = self._gym_id_order.index(agent_id)
self._done_agents_index_to_last_reward[gym_index] = reward
self._gym_id_order[gym_index] = -1
except ValueError:
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
pass
def register_new_agent_id(self, agent_id: int) -> float:
original_index = self._gym_id_order.index(-1)
self._gym_id_order[original_index] = agent_id
reward = self._done_agents_index_to_last_reward.pop(original_index)
return reward
def get_id_permutation(self, agent_ids):
new_id_order = []
for agent_id in self._gym_id_order:
new_id_order.append(agent_ids.index(agent_id))
return new_id_order
def get_gym_index(self, agent_id: int) -> int:
return self._gym_id_order.index(agent_id)

198
gym-unity/gym_unity/tests/test_gym.py


import numpy as np
from gym import spaces
from gym_unity.envs import (
UnityEnv,
UnityGymException,
AgentIdIndexMapper,
AgentIdIndexMapperSlow,
from gym_unity.envs import UnityEnv
from mlagents_envs.base_env import (
BehaviorSpec,
ActionType,
DecisionSteps,
TerminalSteps,
from mlagents_envs.base_env import AgentGroupSpec, ActionType, BatchedStepResult
mock_step = create_mock_vector_step_result()
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
mock_decision_step, mock_terminal_step = create_mock_vector_steps(mock_spec)
setup_mock_unityenvironment(
mock_env, mock_spec, mock_decision_step, mock_terminal_step
)
env = UnityEnv(" ", use_visual=False, multiagent=False)
env = UnityEnv(" ", use_visual=False)
assert isinstance(env, UnityEnv)
assert isinstance(env.reset(), np.ndarray)
actions = env.action_space.sample()

@mock.patch("gym_unity.envs.UnityEnvironment")
def test_multi_agent(mock_env):
mock_spec = create_mock_group_spec()
mock_step = create_mock_vector_step_result(num_agents=2)
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
with pytest.raises(UnityGymException):
UnityEnv(" ", multiagent=False)
env = UnityEnv(" ", use_visual=False, multiagent=True)
assert isinstance(env.reset(), list)
actions = [env.action_space.sample() for i in range(env.number_agents)]
obs, rew, done, info = env.step(actions)
assert isinstance(obs, list)
assert isinstance(rew, list)
assert isinstance(done, list)
assert isinstance(info, dict)
@mock.patch("gym_unity.envs.UnityEnvironment")
mock_step = create_mock_vector_step_result(num_agents=1)
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
mock_spec, num_agents=1
)
setup_mock_unityenvironment(
mock_env, mock_spec, mock_decision_step, mock_terminal_step
)
env = UnityEnv(" ", use_visual=False, multiagent=False, flatten_branched=True)
env = UnityEnv(" ", use_visual=False, flatten_branched=True)
assert isinstance(env.action_space, spaces.Discrete)
assert env.action_space.n == 12
assert env._flattener.lookup_action(0) == [0, 0, 0]

env = UnityEnv(" ", use_visual=False, multiagent=False, flatten_branched=False)
env = UnityEnv(" ", use_visual=False, flatten_branched=False)
assert isinstance(env.action_space, spaces.MultiDiscrete)

mock_spec = create_mock_group_spec(number_visual_observations=1)
mock_step = create_mock_vector_step_result(number_visual_observations=1)
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
mock_spec, number_visual_observations=1
)
setup_mock_unityenvironment(
mock_env, mock_spec, mock_decision_step, mock_terminal_step
)
env = UnityEnv(" ", use_visual=True, multiagent=False, uint8_visual=use_uint8)
env = UnityEnv(" ", use_visual=True, uint8_visual=use_uint8)
assert isinstance(env, UnityEnv)
assert isinstance(env.reset(), np.ndarray)
actions = env.action_space.sample()

assert isinstance(info, dict)
@mock.patch("gym_unity.envs.UnityEnvironment")
def test_sanitize_action_shuffled_id(mock_env):
mock_spec = create_mock_group_spec(
vector_action_space_type="discrete", vector_action_space_size=[2, 2, 3]
)
mock_step = create_mock_vector_step_result(num_agents=5)
mock_step.agent_id = np.array(range(5))
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
env = UnityEnv(" ", use_visual=False, multiagent=True)
shuffled_step_result = create_mock_vector_step_result(num_agents=5)
shuffled_order = [4, 2, 3, 1, 0]
shuffled_step_result.reward = np.array(shuffled_order)
shuffled_step_result.agent_id = np.array(shuffled_order)
sanitized_result = env._sanitize_info(shuffled_step_result)
for expected_reward, reward in zip(range(5), sanitized_result.reward):
assert expected_reward == reward
for expected_agent_id, agent_id in zip(range(5), sanitized_result.agent_id):
assert expected_agent_id == agent_id
@mock.patch("gym_unity.envs.UnityEnvironment")
def test_sanitize_action_one_agent_done(mock_env):
mock_spec = create_mock_group_spec(
vector_action_space_type="discrete", vector_action_space_size=[2, 2, 3]
)
mock_step = create_mock_vector_step_result(num_agents=5)
mock_step.agent_id = np.array(range(5))
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
env = UnityEnv(" ", use_visual=False, multiagent=True)
received_step_result = create_mock_vector_step_result(num_agents=6)
received_step_result.agent_id = np.array(range(6))
# agent #3 (id = 2) is Done
received_step_result.done = np.array([False] * 2 + [True] + [False] * 3)
sanitized_result = env._sanitize_info(received_step_result)
for expected_agent_id, agent_id in zip([0, 1, 5, 3, 4], sanitized_result.agent_id):
assert expected_agent_id == agent_id
@mock.patch("gym_unity.envs.UnityEnvironment")
def test_sanitize_action_new_agent_done(mock_env):
mock_spec = create_mock_group_spec(
vector_action_space_type="discrete", vector_action_space_size=[2, 2, 3]
)
mock_step = create_mock_vector_step_result(num_agents=3)
mock_step.agent_id = np.array(range(5))
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
env = UnityEnv(" ", use_visual=False, multiagent=True)
received_step_result = create_mock_vector_step_result(num_agents=7)
received_step_result.agent_id = np.array(range(7))
# agent #3 (id = 2) is Done
# so is the "new" agent (id = 5)
done = [False] * 7
done[2] = True
done[5] = True
received_step_result.done = np.array(done)
sanitized_result = env._sanitize_info(received_step_result)
for expected_agent_id, agent_id in zip([0, 1, 6, 3, 4], sanitized_result.agent_id):
assert expected_agent_id == agent_id
@mock.patch("gym_unity.envs.UnityEnvironment")
def test_sanitize_action_single_agent_multiple_done(mock_env):
mock_spec = create_mock_group_spec(
vector_action_space_type="discrete", vector_action_space_size=[2, 2, 3]
)
mock_step = create_mock_vector_step_result(num_agents=1)
mock_step.agent_id = np.array(range(1))
setup_mock_unityenvironment(mock_env, mock_spec, mock_step)
env = UnityEnv(" ", use_visual=False, multiagent=False)
received_step_result = create_mock_vector_step_result(num_agents=3)
received_step_result.agent_id = np.array(range(3))
# original agent (id = 0) is Done
# so is the "new" agent (id = 1)
done = [True, True, False]
received_step_result.done = np.array(done)
sanitized_result = env._sanitize_info(received_step_result)
for expected_agent_id, agent_id in zip([2], sanitized_result.agent_id):
assert expected_agent_id == agent_id
# Helper methods

obs_shapes = [(vector_observation_space_size,)]
for _ in range(number_visual_observations):
obs_shapes += [(8, 8, 3)]
return AgentGroupSpec(obs_shapes, act_type, vector_action_space_size)
return BehaviorSpec(obs_shapes, act_type, vector_action_space_size)
def create_mock_vector_step_result(num_agents=1, number_visual_observations=0):
def create_mock_vector_steps(specs, num_agents=1, number_visual_observations=0):
:BehaviorSpecs specs: The BehaviorSpecs for this mock
:int num_agents: Number of "agents" to imitate in your BatchedStepResult values.
"""
obs = [np.array([num_agents * [1, 2, 3]]).reshape(num_agents, 3)]

done = np.array(num_agents * [False])
return BatchedStepResult(obs, rewards, done, done, agents, None)
return DecisionSteps(obs, rewards, agents, None), TerminalSteps.empty(specs)
def setup_mock_unityenvironment(mock_env, mock_spec, mock_result):
def setup_mock_unityenvironment(mock_env, mock_spec, mock_decision, mock_termination):
"""
Takes a mock UnityEnvironment and adds the appropriate properties, defined by the mock
GroupSpec and BatchedStepResult.

:Mock mock_result: A BatchedStepResult object that will be returned at each step and reset.
:Mock mock_decision: A DecisionSteps object that will be returned at each step and reset.
:Mock mock_termination: A TerminationSteps object that will be returned at each step and reset.
mock_env.return_value.get_agent_groups.return_value = ["MockBrain"]
mock_env.return_value.get_agent_group_spec.return_value = mock_spec
mock_env.return_value.get_step_result.return_value = mock_result
@pytest.mark.parametrize("mapper_cls", [AgentIdIndexMapper, AgentIdIndexMapperSlow])
def test_agent_id_index_mapper(mapper_cls):
mapper = mapper_cls()
initial_agent_ids = [1001, 1002, 1003, 1004]
mapper.set_initial_agents(initial_agent_ids)
# Mark some agents as done with their last rewards.
mapper.mark_agent_done(1001, 42.0)
mapper.mark_agent_done(1004, 1337.0)
# Make sure we can handle an unknown agent id being marked done.
# This can happen when an agent ends an episode on the same step it starts.
mapper.mark_agent_done(9999, -1.0)
# Now add new agents, and get the rewards of the agent they replaced.
old_reward1 = mapper.register_new_agent_id(2001)
old_reward2 = mapper.register_new_agent_id(2002)
# Order of the rewards don't matter
assert {old_reward1, old_reward2} == {42.0, 1337.0}
new_agent_ids = [1002, 1003, 2001, 2002]
permutation = mapper.get_id_permutation(new_agent_ids)
# Make sure it's actually a permutation - needs to contain 0..N-1 with no repeats.
assert set(permutation) == set(range(0, 4))
# For initial agents that were in the initial group, they need to be in the same slot.
# Agents that were added later can appear in any free slot.
permuted_ids = [new_agent_ids[i] for i in permutation]
for idx, agent_id in enumerate(initial_agent_ids):
if agent_id in permuted_ids:
assert permuted_ids[idx] == agent_id
mock_env.return_value.get_behavior_names.return_value = ["MockBrain"]
mock_env.return_value.get_behavior_spec.return_value = mock_spec
mock_env.return_value.get_steps.return_value = (mock_decision, mock_termination)

249
ml-agents-envs/mlagents_envs/base_env.py


"""
Python Environment API for the ML-Agents toolkit
The aim of this API is to expose groups of similar Agents evolving in Unity
The aim of this API is to expose Agents evolving in a simulation
There can be multiple groups of similar Agents (same observations and actions
spaces) in the simulation. These groups are identified by a agent_group that
corresponds to a single group of Agents in the simulation.
This API supports multi-agent scenarios and groups similar Agents (same
observations, actions spaces and behavior) together. These groups of Agents are
identified by their BehaviorName.
batched manner. When retrieving the state of a group of Agents, said state
contains the data for the whole group. Agents in these groups are identified
by a unique int identifier that allows tracking of Agents across simulation
steps. Note that there is no guarantee that the number or order of the Agents
in the state will be consistent across simulation steps.
batched manner. Agents are identified by a unique AgentId identifier that
allows tracking of Agents across simulation steps. Note that there is no
guarantee that the number or order of the Agents in the state will be
consistent across simulation steps.
A simulation steps corresponds to moving the simulation forward until at least
one agent in the simulation sends its observations to Python again. Since
Agents can request decisions at different frequencies, a simulation step does

from abc import ABC, abstractmethod
from typing import List, NamedTuple, Tuple, Optional, Union, Dict
from collections.abc import Mapping
from typing import List, NamedTuple, Tuple, Optional, Union, Dict, Iterator, Any
AgentGroup = str
BehaviorName = str
class StepResult(NamedTuple):
class DecisionStep(NamedTuple):
- obs is a list of numpy arrays observations collected by the group of
agent.
- obs is a list of numpy arrays observations collected by the agent.
- done is a bool. Is true if the Agent was terminated during the last
simulation step.
- max_step is a bool. Is true if the Agent reached its maximum number of
steps during the last simulation step.
- agent_id is an int and an unique identifier for the corresponding Agent.
- action_mask is an optional list of one dimensional array of booleans.
Only available in multi-discrete action space type.

obs: List[np.ndarray]
reward: float
done: bool
max_step: bool
class BatchedStepResult:
class DecisionSteps(Mapping):
Contains the data a group of similar Agents collected since the last
Contains the data a batch of similar Agents collected since the last
agents and the batch size of the BatchedStepResult are not fixed across
agents and the batch size of the DecisionSteps are not fixed across
- obs is a list of numpy arrays observations collected by the group of
agent. Each obs has one extra dimension compared to StepResult: the first
dimension of the array corresponds to the batch size of
the group.
- obs is a list of numpy arrays observations collected by the batch of
agent. Each obs has one extra dimension compared to DecisionStep: the
first dimension of the array corresponds to the batch size of the batch.
- done is an array of booleans of length batch size. Is true if the
associated Agent was terminated during the last simulation step.
- max_step is an array of booleans of length batch size. Is true if the
associated Agent reached its maximum number of steps during the last
simulation step.
- agent_id is an int vector of length batch size containing unique
identifier for the corresponding Agent. This is used to track Agents
across simulation steps.

this simulation step.
"""
def __init__(self, obs, reward, done, max_step, agent_id, action_mask):
def __init__(self, obs, reward, agent_id, action_mask):
self.done: np.ndarray = done
self.max_step: np.ndarray = max_step
self.agent_id: np.ndarray = agent_id
self.action_mask: Optional[List[np.ndarray]] = action_mask
self._agent_id_to_index: Optional[Dict[AgentId, int]] = None

"""
:returns: A Dict that maps agent_id to the index of those agents in
this BatchedStepResult.
this DecisionSteps.
"""
if self._agent_id_to_index is None:
self._agent_id_to_index = {}

def contains_agent(self, agent_id: AgentId) -> bool:
return agent_id in self.agent_id_to_index
def __len__(self) -> int:
return len(self.agent_id)
def get_agent_step_result(self, agent_id: AgentId) -> StepResult:
def __getitem__(self, agent_id: AgentId) -> DecisionStep:
returns the step result for a specific agent.
returns the DecisionStep for a specific agent.
:returns: obs, reward, done, agent_id and optional action mask for a
specific agent
:returns: The DecisionStep
if not self.contains_agent(agent_id):
raise IndexError(
"get_agent_step_result failed. agent_id {} is not present in the BatchedStepResult".format(
agent_id
)
if agent_id not in self.agent_id_to_index:
raise KeyError(
"agent_id {} is not present in the DecisionSteps".format(agent_id)
)
agent_index = self._agent_id_to_index[agent_id] # type: ignore
agent_obs = []

agent_mask = []
for mask in self.action_mask:
agent_mask.append(mask[agent_index])
return StepResult(
return DecisionStep(
done=self.done[agent_index],
max_step=self.max_step[agent_index],
def __iter__(self) -> Iterator[Any]:
yield from self.agent_id
def empty(spec: "AgentGroupSpec") -> "BatchedStepResult":
def empty(spec: "BehaviorSpec") -> "DecisionSteps":
Returns an empty BatchedStepResult.
:param spec: The AgentGroupSpec for the BatchedStepResult
Returns an empty DecisionSteps.
:param spec: The BehaviorSpec for the DecisionSteps
return BatchedStepResult(
return DecisionSteps(
done=np.zeros(0, dtype=np.bool),
max_step=np.zeros(0, dtype=np.bool),
def n_agents(self) -> int: