您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
611 行
24 KiB
611 行
24 KiB
import itertools
|
|
import numpy as np
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
import gym
|
|
from gym import error, spaces
|
|
|
|
from mlagents_envs.environment import UnityEnvironment
|
|
from mlagents_envs.base_env import BatchedStepResult
|
|
from mlagents_envs import logging_util
|
|
|
|
|
|
class UnityGymException(error.Error):
|
|
"""
|
|
Any error related to the gym wrapper of ml-agents.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
logger = logging_util.get_logger(__name__)
|
|
logging_util.set_log_level(logging_util.INFO)
|
|
|
|
GymSingleStepResult = Tuple[np.ndarray, float, bool, Dict]
|
|
GymMultiStepResult = Tuple[List[np.ndarray], List[float], List[bool], Dict]
|
|
GymStepResult = Union[GymSingleStepResult, GymMultiStepResult]
|
|
|
|
|
|
class UnityEnv(gym.Env):
|
|
"""
|
|
Provides Gym wrapper for Unity Learning Environments.
|
|
Multi-agent environments use lists for object types, as done here:
|
|
https://github.com/openai/multiagent-particle-envs
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
environment_filename: str,
|
|
worker_id: int = 0,
|
|
use_visual: bool = False,
|
|
uint8_visual: bool = False,
|
|
multiagent: bool = False,
|
|
flatten_branched: bool = False,
|
|
no_graphics: bool = False,
|
|
allow_multiple_visual_obs: bool = False,
|
|
):
|
|
"""
|
|
Environment initialization
|
|
:param environment_filename: The UnityEnvironment path or file to be wrapped in the gym.
|
|
:param worker_id: Worker number for environment.
|
|
:param use_visual: Whether to use visual observation or vector observation.
|
|
:param uint8_visual: Return visual observations as uint8 (0-255) matrices instead of float (0.0-1.0).
|
|
:param multiagent: Whether to run in multi-agent mode (lists of obs, reward, done).
|
|
:param flatten_branched: If True, turn branched discrete action spaces into a Discrete space rather than
|
|
MultiDiscrete.
|
|
:param no_graphics: Whether to run the Unity simulator in no-graphics mode
|
|
:param allow_multiple_visual_obs: If True, return a list of visual observations instead of only one.
|
|
"""
|
|
base_port = UnityEnvironment.BASE_ENVIRONMENT_PORT
|
|
if environment_filename is None:
|
|
base_port = UnityEnvironment.DEFAULT_EDITOR_PORT
|
|
|
|
self._env = UnityEnvironment(
|
|
environment_filename,
|
|
worker_id,
|
|
base_port=base_port,
|
|
no_graphics=no_graphics,
|
|
)
|
|
|
|
# Take a single step so that the brain information will be sent over
|
|
if not self._env.get_agent_groups():
|
|
self._env.step()
|
|
|
|
self.visual_obs = None
|
|
self._n_agents = -1
|
|
|
|
self.agent_mapper = AgentIdIndexMapper()
|
|
|
|
# Save the step result from the last time all Agents requested decisions.
|
|
self._previous_step_result: BatchedStepResult = None
|
|
self._multiagent = multiagent
|
|
self._flattener = None
|
|
# Hidden flag used by Atari environments to determine if the game is over
|
|
self.game_over = False
|
|
self._allow_multiple_visual_obs = allow_multiple_visual_obs
|
|
|
|
# Check brain configuration
|
|
if len(self._env.get_agent_groups()) != 1:
|
|
raise UnityGymException(
|
|
"There can only be one brain in a UnityEnvironment "
|
|
"if it is wrapped in a gym."
|
|
)
|
|
|
|
self.brain_name = self._env.get_agent_groups()[0]
|
|
self.name = self.brain_name
|
|
self.group_spec = self._env.get_agent_group_spec(self.brain_name)
|
|
|
|
if use_visual and self._get_n_vis_obs() == 0:
|
|
raise UnityGymException(
|
|
"`use_visual` was set to True, however there are no"
|
|
" visual observations as part of this environment."
|
|
)
|
|
self.use_visual = self._get_n_vis_obs() >= 1 and use_visual
|
|
|
|
if not use_visual and uint8_visual:
|
|
logger.warning(
|
|
"`uint8_visual was set to true, but visual observations are not in use. "
|
|
"This setting will not have any effect."
|
|
)
|
|
else:
|
|
self.uint8_visual = uint8_visual
|
|
|
|
if self._get_n_vis_obs() > 1 and not self._allow_multiple_visual_obs:
|
|
logger.warning(
|
|
"The environment contains more than one visual observation. "
|
|
"You must define allow_multiple_visual_obs=True to received them all. "
|
|
"Otherwise, please note that only the first will be provided in the observation."
|
|
)
|
|
|
|
# Check for number of agents in scene.
|
|
self._env.reset()
|
|
step_result = self._env.get_step_result(self.brain_name)
|
|
self._check_agents(step_result.n_agents())
|
|
self._previous_step_result = step_result
|
|
self.agent_mapper.set_initial_agents(list(self._previous_step_result.agent_id))
|
|
|
|
# Set observation and action spaces
|
|
if self.group_spec.is_action_discrete():
|
|
branches = self.group_spec.discrete_action_branches
|
|
if self.group_spec.action_shape == 1:
|
|
self._action_space = spaces.Discrete(branches[0])
|
|
else:
|
|
if flatten_branched:
|
|
self._flattener = ActionFlattener(branches)
|
|
self._action_space = self._flattener.action_space
|
|
else:
|
|
self._action_space = spaces.MultiDiscrete(branches)
|
|
|
|
else:
|
|
if flatten_branched:
|
|
logger.warning(
|
|
"The environment has a non-discrete action space. It will "
|
|
"not be flattened."
|
|
)
|
|
high = np.array([1] * self.group_spec.action_shape)
|
|
self._action_space = spaces.Box(-high, high, dtype=np.float32)
|
|
high = np.array([np.inf] * self._get_vec_obs_size())
|
|
if self.use_visual:
|
|
shape = self._get_vis_obs_shape()
|
|
if uint8_visual:
|
|
self._observation_space = spaces.Box(
|
|
0, 255, dtype=np.uint8, shape=shape
|
|
)
|
|
else:
|
|
self._observation_space = spaces.Box(
|
|
0, 1, dtype=np.float32, shape=shape
|
|
)
|
|
|
|
else:
|
|
self._observation_space = spaces.Box(-high, high, dtype=np.float32)
|
|
|
|
def reset(self) -> Union[List[np.ndarray], np.ndarray]:
|
|
"""Resets the state of the environment and returns an initial observation.
|
|
In the case of multi-agent environments, this is a list.
|
|
Returns: observation (object/list): the initial observation of the
|
|
space.
|
|
"""
|
|
step_result = self._step(True)
|
|
n_agents = step_result.n_agents()
|
|
self._check_agents(n_agents)
|
|
self.game_over = False
|
|
|
|
if not self._multiagent:
|
|
res: GymStepResult = self._single_step(step_result)
|
|
else:
|
|
res = self._multi_step(step_result)
|
|
return res[0]
|
|
|
|
def step(self, action: List[Any]) -> GymStepResult:
|
|
"""Run one timestep of the environment's dynamics. When end of
|
|
episode is reached, you are responsible for calling `reset()`
|
|
to reset this environment's state.
|
|
Accepts an action and returns a tuple (observation, reward, done, info).
|
|
In the case of multi-agent environments, these are lists.
|
|
Args:
|
|
action (object/list): an action provided by the environment
|
|
Returns:
|
|
observation (object/list): agent's observation of the current environment
|
|
reward (float/list) : amount of reward returned after previous action
|
|
done (boolean/list): whether the episode has ended.
|
|
info (dict): contains auxiliary diagnostic information, including BatchedStepResult.
|
|
"""
|
|
|
|
# Use random actions for all other agents in environment.
|
|
if self._multiagent:
|
|
if not isinstance(action, list):
|
|
raise UnityGymException(
|
|
"The environment was expecting `action` to be a list."
|
|
)
|
|
if len(action) != self._n_agents:
|
|
raise UnityGymException(
|
|
"The environment was expecting a list of {} actions.".format(
|
|
self._n_agents
|
|
)
|
|
)
|
|
else:
|
|
if self._flattener is not None:
|
|
# Action space is discrete and flattened - we expect a list of scalars
|
|
action = [self._flattener.lookup_action(_act) for _act in action]
|
|
action = np.array(action)
|
|
else:
|
|
if self._flattener is not None:
|
|
# Translate action into list
|
|
action = self._flattener.lookup_action(action)
|
|
|
|
spec = self.group_spec
|
|
action = np.array(action).reshape((self._n_agents, spec.action_size))
|
|
action = self._sanitize_action(action)
|
|
self._env.set_actions(self.brain_name, action)
|
|
|
|
step_result = self._step()
|
|
|
|
n_agents = step_result.n_agents()
|
|
self._check_agents(n_agents)
|
|
|
|
if not self._multiagent:
|
|
single_res = self._single_step(step_result)
|
|
self.game_over = single_res[2]
|
|
return single_res
|
|
else:
|
|
multi_res = self._multi_step(step_result)
|
|
self.game_over = all(multi_res[2])
|
|
return multi_res
|
|
|
|
def _single_step(self, info: BatchedStepResult) -> GymSingleStepResult:
|
|
if self.use_visual:
|
|
visual_obs = self._get_vis_obs_list(info)
|
|
|
|
if self._allow_multiple_visual_obs:
|
|
visual_obs_list = []
|
|
for obs in visual_obs:
|
|
visual_obs_list.append(self._preprocess_single(obs[0]))
|
|
self.visual_obs = visual_obs_list
|
|
else:
|
|
self.visual_obs = self._preprocess_single(visual_obs[0][0])
|
|
|
|
default_observation = self.visual_obs
|
|
elif self._get_vec_obs_size() > 0:
|
|
default_observation = self._get_vector_obs(info)[0, :]
|
|
else:
|
|
raise UnityGymException(
|
|
"The Agent does not have vector observations and the environment was not setup "
|
|
+ "to use visual observations."
|
|
)
|
|
|
|
return (
|
|
default_observation,
|
|
info.reward[0],
|
|
info.done[0],
|
|
{"batched_step_result": info},
|
|
)
|
|
|
|
def _preprocess_single(self, single_visual_obs: np.ndarray) -> np.ndarray:
|
|
if self.uint8_visual:
|
|
return (255.0 * single_visual_obs).astype(np.uint8)
|
|
else:
|
|
return single_visual_obs
|
|
|
|
def _multi_step(self, info: BatchedStepResult) -> GymMultiStepResult:
|
|
if self.use_visual:
|
|
self.visual_obs = self._preprocess_multi(self._get_vis_obs_list(info))
|
|
default_observation = self.visual_obs
|
|
else:
|
|
default_observation = self._get_vector_obs(info)
|
|
return (
|
|
list(default_observation),
|
|
list(info.reward),
|
|
list(info.done),
|
|
{"batched_step_result": info},
|
|
)
|
|
|
|
def _get_n_vis_obs(self) -> int:
|
|
result = 0
|
|
for shape in self.group_spec.observation_shapes:
|
|
if len(shape) == 3:
|
|
result += 1
|
|
return result
|
|
|
|
def _get_vis_obs_shape(self) -> Optional[Tuple]:
|
|
for shape in self.group_spec.observation_shapes:
|
|
if len(shape) == 3:
|
|
return shape
|
|
return None
|
|
|
|
def _get_vis_obs_list(self, step_result: BatchedStepResult) -> List[np.ndarray]:
|
|
result: List[np.ndarray] = []
|
|
for obs in step_result.obs:
|
|
if len(obs.shape) == 4:
|
|
result.append(obs)
|
|
return result
|
|
|
|
def _get_vector_obs(self, step_result: BatchedStepResult) -> np.ndarray:
|
|
result: List[np.ndarray] = []
|
|
for obs in step_result.obs:
|
|
if len(obs.shape) == 2:
|
|
result.append(obs)
|
|
return np.concatenate(result, axis=1)
|
|
|
|
def _get_vec_obs_size(self) -> int:
|
|
result = 0
|
|
for shape in self.group_spec.observation_shapes:
|
|
if len(shape) == 1:
|
|
result += shape[0]
|
|
return result
|
|
|
|
def _preprocess_multi(
|
|
self, multiple_visual_obs: List[np.ndarray]
|
|
) -> List[np.ndarray]:
|
|
if self.uint8_visual:
|
|
return [
|
|
(255.0 * _visual_obs).astype(np.uint8)
|
|
for _visual_obs in multiple_visual_obs
|
|
]
|
|
else:
|
|
return multiple_visual_obs
|
|
|
|
def render(self, mode="rgb_array"):
|
|
return self.visual_obs
|
|
|
|
def close(self) -> None:
|
|
"""Override _close in your subclass to perform any necessary cleanup.
|
|
Environments will automatically close() themselves when
|
|
garbage collected or when the program exits.
|
|
"""
|
|
self._env.close()
|
|
|
|
def seed(self, seed: Any = None) -> None:
|
|
"""Sets the seed for this env's random number generator(s).
|
|
Currently not implemented.
|
|
"""
|
|
logger.warning("Could not seed environment %s", self.name)
|
|
return
|
|
|
|
def _check_agents(self, n_agents: int) -> None:
|
|
if not self._multiagent and n_agents > 1:
|
|
raise UnityGymException(
|
|
"The environment was launched as a single-agent environment, however "
|
|
"there is more than one agent in the scene."
|
|
)
|
|
elif self._multiagent and n_agents <= 1:
|
|
raise UnityGymException(
|
|
"The environment was launched as a mutli-agent environment, however "
|
|
"there is only one agent in the scene."
|
|
)
|
|
if self._n_agents == -1:
|
|
self._n_agents = n_agents
|
|
logger.info("{} agents within environment.".format(n_agents))
|
|
elif self._n_agents != n_agents:
|
|
raise UnityGymException(
|
|
"The number of agents in the environment has changed since "
|
|
"initialization. This is not supported."
|
|
)
|
|
|
|
def _sanitize_info(self, step_result: BatchedStepResult) -> BatchedStepResult:
|
|
n_extra_agents = step_result.n_agents() - self._n_agents
|
|
if n_extra_agents < 0:
|
|
# In this case, some Agents did not request a decision when expected
|
|
raise UnityGymException(
|
|
"The number of agents in the scene does not match the expected number."
|
|
)
|
|
|
|
if step_result.n_agents() - sum(step_result.done) != self._n_agents:
|
|
raise UnityGymException(
|
|
"The number of agents in the scene does not match the expected number."
|
|
)
|
|
|
|
for index, agent_id in enumerate(step_result.agent_id):
|
|
if step_result.done[index]:
|
|
self.agent_mapper.mark_agent_done(agent_id, step_result.reward[index])
|
|
|
|
# Set the new AgentDone flags to True
|
|
# Note that the corresponding agent_id that gets marked done will be different
|
|
# than the original agent that was done, but this is OK since the gym interface
|
|
# only cares about the ordering.
|
|
for index, agent_id in enumerate(step_result.agent_id):
|
|
if not self._previous_step_result.contains_agent(agent_id):
|
|
if step_result.done[index]:
|
|
# If the Agent is already done (e.g. it ended its epsiode twice in one step)
|
|
# Don't try to register it here.
|
|
continue
|
|
# Register this agent, and get the reward of the previous agent that
|
|
# was in its index, so that we can return it to the gym.
|
|
last_reward = self.agent_mapper.register_new_agent_id(agent_id)
|
|
step_result.done[index] = True
|
|
step_result.reward[index] = last_reward
|
|
|
|
self._previous_step_result = step_result # store the new original
|
|
|
|
# Get a permutation of the agent IDs so that a given ID stays in the same
|
|
# index as where it was first seen.
|
|
new_id_order = self.agent_mapper.get_id_permutation(list(step_result.agent_id))
|
|
|
|
_mask: Optional[List[np.array]] = None
|
|
if step_result.action_mask is not None:
|
|
_mask = []
|
|
for mask_index in range(len(step_result.action_mask)):
|
|
_mask.append(step_result.action_mask[mask_index][new_id_order])
|
|
new_obs: List[np.array] = []
|
|
for obs_index in range(len(step_result.obs)):
|
|
new_obs.append(step_result.obs[obs_index][new_id_order])
|
|
return BatchedStepResult(
|
|
obs=new_obs,
|
|
reward=step_result.reward[new_id_order],
|
|
done=step_result.done[new_id_order],
|
|
max_step=step_result.max_step[new_id_order],
|
|
agent_id=step_result.agent_id[new_id_order],
|
|
action_mask=_mask,
|
|
)
|
|
|
|
def _sanitize_action(self, action: np.array) -> np.array:
|
|
sanitized_action = np.zeros(
|
|
(self._previous_step_result.n_agents(), self.group_spec.action_size)
|
|
)
|
|
for index, agent_id in enumerate(self._previous_step_result.agent_id):
|
|
if not self._previous_step_result.done[index]:
|
|
array_index = self.agent_mapper.get_gym_index(agent_id)
|
|
sanitized_action[index, :] = action[array_index, :]
|
|
return sanitized_action
|
|
|
|
def _step(self, needs_reset: bool = False) -> BatchedStepResult:
|
|
if needs_reset:
|
|
self._env.reset()
|
|
else:
|
|
self._env.step()
|
|
info = self._env.get_step_result(self.brain_name)
|
|
# Two possible cases here:
|
|
# 1) all agents requested decisions (some of which might be done)
|
|
# 2) some Agents were marked Done in between steps.
|
|
# In case 2, we re-request decisions until all agents request a real decision.
|
|
while info.n_agents() - sum(info.done) < self._n_agents:
|
|
if not info.done.all():
|
|
raise UnityGymException(
|
|
"The environment does not have the expected amount of agents. "
|
|
+ "Some agents did not request decisions at the same time."
|
|
)
|
|
for agent_id, reward in zip(info.agent_id, info.reward):
|
|
self.agent_mapper.mark_agent_done(agent_id, reward)
|
|
|
|
self._env.step()
|
|
info = self._env.get_step_result(self.brain_name)
|
|
return self._sanitize_info(info)
|
|
|
|
@property
|
|
def metadata(self):
|
|
return {"render.modes": ["rgb_array"]}
|
|
|
|
@property
|
|
def reward_range(self) -> Tuple[float, float]:
|
|
return -float("inf"), float("inf")
|
|
|
|
@property
|
|
def spec(self):
|
|
return None
|
|
|
|
@property
|
|
def action_space(self):
|
|
return self._action_space
|
|
|
|
@property
|
|
def observation_space(self):
|
|
return self._observation_space
|
|
|
|
@property
|
|
def number_agents(self):
|
|
return self._n_agents
|
|
|
|
|
|
class ActionFlattener:
|
|
"""
|
|
Flattens branched discrete action spaces into single-branch discrete action spaces.
|
|
"""
|
|
|
|
def __init__(self, branched_action_space):
|
|
"""
|
|
Initialize the flattener.
|
|
:param branched_action_space: A List containing the sizes of each branch of the action
|
|
space, e.g. [2,3,3] for three branches with size 2, 3, and 3 respectively.
|
|
"""
|
|
self._action_shape = branched_action_space
|
|
self.action_lookup = self._create_lookup(self._action_shape)
|
|
self.action_space = spaces.Discrete(len(self.action_lookup))
|
|
|
|
@classmethod
|
|
def _create_lookup(self, branched_action_space):
|
|
"""
|
|
Creates a Dict that maps discrete actions (scalars) to branched actions (lists).
|
|
Each key in the Dict maps to one unique set of branched actions, and each value
|
|
contains the List of branched actions.
|
|
"""
|
|
possible_vals = [range(_num) for _num in branched_action_space]
|
|
all_actions = [list(_action) for _action in itertools.product(*possible_vals)]
|
|
# Dict should be faster than List for large action spaces
|
|
action_lookup = {
|
|
_scalar: _action for (_scalar, _action) in enumerate(all_actions)
|
|
}
|
|
return action_lookup
|
|
|
|
def lookup_action(self, action):
|
|
"""
|
|
Convert a scalar discrete action into a unique set of branched actions.
|
|
:param: action: A scalar value representing one of the discrete actions.
|
|
:return: The List containing the branched actions.
|
|
"""
|
|
return self.action_lookup[action]
|
|
|
|
|
|
class AgentIdIndexMapper:
|
|
def __init__(self) -> None:
|
|
self._agent_id_to_gym_index: Dict[int, int] = {}
|
|
self._done_agents_index_to_last_reward: Dict[int, float] = {}
|
|
|
|
def set_initial_agents(self, agent_ids: List[int]) -> None:
|
|
"""
|
|
Provide the initial list of agent ids for the mapper
|
|
"""
|
|
for idx, agent_id in enumerate(agent_ids):
|
|
self._agent_id_to_gym_index[agent_id] = idx
|
|
|
|
def mark_agent_done(self, agent_id: int, reward: float) -> None:
|
|
"""
|
|
Declare the agent done with the corresponding final reward.
|
|
"""
|
|
if agent_id in self._agent_id_to_gym_index:
|
|
gym_index = self._agent_id_to_gym_index.pop(agent_id)
|
|
self._done_agents_index_to_last_reward[gym_index] = reward
|
|
else:
|
|
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
|
|
pass
|
|
|
|
def register_new_agent_id(self, agent_id: int) -> float:
|
|
"""
|
|
Adds the new agent ID and returns the reward to use for the previous agent in this index
|
|
"""
|
|
# Any free index is OK here.
|
|
free_index, last_reward = self._done_agents_index_to_last_reward.popitem()
|
|
self._agent_id_to_gym_index[agent_id] = free_index
|
|
return last_reward
|
|
|
|
def get_id_permutation(self, agent_ids: List[int]) -> List[int]:
|
|
"""
|
|
Get the permutation from new agent ids to the order that preserves the positions of previous agents.
|
|
The result is a list with each integer from 0 to len(_agent_id_to_gym_index)-1
|
|
appearing exactly once.
|
|
"""
|
|
# Map the new agent ids to the their index
|
|
new_agent_ids_to_index = {
|
|
agent_id: idx for idx, agent_id in enumerate(agent_ids)
|
|
}
|
|
|
|
# Make the output list. We don't write to it sequentially, so start with dummy values.
|
|
new_permutation = [-1] * len(self._agent_id_to_gym_index)
|
|
|
|
# For each agent ID, find the new index of the agent, and write it in the original index.
|
|
for agent_id, original_index in self._agent_id_to_gym_index.items():
|
|
new_permutation[original_index] = new_agent_ids_to_index[agent_id]
|
|
return new_permutation
|
|
|
|
def get_gym_index(self, agent_id: int) -> int:
|
|
"""
|
|
Get the gym index for the current agent.
|
|
"""
|
|
return self._agent_id_to_gym_index[agent_id]
|
|
|
|
|
|
class AgentIdIndexMapperSlow:
|
|
"""
|
|
Reference implementation of AgentIdIndexMapper.
|
|
The operations are O(N^2) so it shouldn't be used for large numbers of agents.
|
|
See AgentIdIndexMapper for method descriptions
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._gym_id_order: List[int] = []
|
|
self._done_agents_index_to_last_reward: Dict[int, float] = {}
|
|
|
|
def set_initial_agents(self, agent_ids: List[int]) -> None:
|
|
self._gym_id_order = list(agent_ids)
|
|
|
|
def mark_agent_done(self, agent_id: int, reward: float) -> None:
|
|
try:
|
|
gym_index = self._gym_id_order.index(agent_id)
|
|
self._done_agents_index_to_last_reward[gym_index] = reward
|
|
self._gym_id_order[gym_index] = -1
|
|
except ValueError:
|
|
# Agent was never registered in the first place (e.g. EndEpisode called multiple times)
|
|
pass
|
|
|
|
def register_new_agent_id(self, agent_id: int) -> float:
|
|
original_index = self._gym_id_order.index(-1)
|
|
self._gym_id_order[original_index] = agent_id
|
|
reward = self._done_agents_index_to_last_reward.pop(original_index)
|
|
return reward
|
|
|
|
def get_id_permutation(self, agent_ids):
|
|
new_id_order = []
|
|
for agent_id in self._gym_id_order:
|
|
new_id_order.append(agent_ids.index(agent_id))
|
|
return new_id_order
|
|
|
|
def get_gym_index(self, agent_id: int) -> int:
|
|
return self._gym_id_order.index(agent_id)
|