|
|
|
|
|
|
import logging |
|
|
|
import itertools |
|
|
|
import numpy as np |
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union, Set |
|
|
|
|
|
|
|
import gym |
|
|
|
from gym import error, spaces |
|
|
|
|
|
|
:param no_graphics: Whether to run the Unity simulator in no-graphics mode |
|
|
|
:param allow_multiple_visual_obs: If True, return a list of visual observations instead of only one. |
|
|
|
""" |
|
|
|
base_port = 5005 |
|
|
|
if environment_filename is None: |
|
|
|
base_port = UnityEnvironment.DEFAULT_EDITOR_PORT |
|
|
|
|
|
|
|
environment_filename, worker_id, no_graphics=no_graphics |
|
|
|
environment_filename, |
|
|
|
worker_id, |
|
|
|
base_port=base_port, |
|
|
|
no_graphics=no_graphics, |
|
|
|
) |
|
|
|
|
|
|
|
# Take a single step so that the brain information will be sent over |
|
|
|
|
|
|
self.visual_obs = None |
|
|
|
self._current_state = None |
|
|
|
self._n_agents = None |
|
|
|
self._n_agents = -1 |
|
|
|
self._done_agents: Set[int] = set() |
|
|
|
# Save the step result from the last time all Agents requested decisions. |
|
|
|
self._previous_step_result: BatchedStepResult = None |
|
|
|
self._multiagent = multiagent |
|
|
|
self._flattener = None |
|
|
|
# Hidden flag used by Atari environments to determine if the game is over |
|
|
|
|
|
|
self._env.reset() |
|
|
|
step_result = self._env.get_step_result(self.brain_name) |
|
|
|
self._check_agents(step_result.n_agents()) |
|
|
|
self._previous_step_result = step_result |
|
|
|
|
|
|
|
# Set observation and action spaces |
|
|
|
if self.group_spec.is_action_discrete(): |
|
|
|
|
|
|
Returns: observation (object/list): the initial observation of the |
|
|
|
space. |
|
|
|
""" |
|
|
|
self._env.reset() |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
n_agents = info.n_agents() |
|
|
|
step_result = self._step(True) |
|
|
|
n_agents = step_result.n_agents() |
|
|
|
res: GymStepResult = self._single_step(info) |
|
|
|
res: GymStepResult = self._single_step(step_result) |
|
|
|
res = self._multi_step(info) |
|
|
|
res = self._multi_step(step_result) |
|
|
|
return res[0] |
|
|
|
|
|
|
|
def step(self, action: List[Any]) -> GymStepResult: |
|
|
|
|
|
|
|
|
|
|
spec = self.group_spec |
|
|
|
action = np.array(action).reshape((self._n_agents, spec.action_size)) |
|
|
|
action = self._sanitize_action(action) |
|
|
|
self._env.step() |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
n_agents = info.n_agents() |
|
|
|
|
|
|
|
step_result = self._step() |
|
|
|
|
|
|
|
n_agents = step_result.n_agents() |
|
|
|
self._current_state = info |
|
|
|
single_res = self._single_step(info) |
|
|
|
single_res = self._single_step(step_result) |
|
|
|
multi_res = self._multi_step(info) |
|
|
|
multi_res = self._multi_step(step_result) |
|
|
|
self.game_over = all(multi_res[2]) |
|
|
|
return multi_res |
|
|
|
|
|
|
|
|
|
|
self.visual_obs = self._preprocess_single(visual_obs[0][0]) |
|
|
|
|
|
|
|
default_observation = self.visual_obs |
|
|
|
elif self._get_vec_obs_size() > 0: |
|
|
|
default_observation = self._get_vector_obs(info)[0, :] |
|
|
|
default_observation = self._get_vector_obs(info)[0, :] |
|
|
|
raise UnityGymException( |
|
|
|
"The Agent does not have vector observations and the environment was not setup" |
|
|
|
+ "to use visual observations." |
|
|
|
) |
|
|
|
|
|
|
|
return ( |
|
|
|
default_observation, |
|
|
|
|
|
|
"The environment was launched as a mutli-agent environment, however" |
|
|
|
"there is only one agent in the scene." |
|
|
|
) |
|
|
|
if self._n_agents is None: |
|
|
|
if self._n_agents == -1: |
|
|
|
self._n_agents = n_agents |
|
|
|
logger.info("{} agents within environment.".format(n_agents)) |
|
|
|
elif self._n_agents != n_agents: |
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
def _sanitize_info(self, step_result: BatchedStepResult) -> BatchedStepResult: |
|
|
|
n_extra_agents = step_result.n_agents() - self._n_agents |
|
|
|
if n_extra_agents < 0 or n_extra_agents > self._n_agents: |
|
|
|
# In this case, some Agents did not request a decision when expected |
|
|
|
# or too many requested a decision |
|
|
|
raise UnityGymException( |
|
|
|
"The number of agents in the scene does not match the expected number." |
|
|
|
) |
|
|
|
|
|
|
|
# remove the done Agents |
|
|
|
indices_to_keep: List[int] = [] |
|
|
|
for index, is_done in enumerate(step_result.done): |
|
|
|
if not is_done: |
|
|
|
indices_to_keep.append(index) |
|
|
|
|
|
|
|
# Set the new AgentDone flags to True |
|
|
|
# Note that the corresponding agent_id that gets marked done will be different |
|
|
|
# than the original agent that was done, but this is OK since the gym interface |
|
|
|
# only cares about the ordering. |
|
|
|
for index, agent_id in enumerate(step_result.agent_id): |
|
|
|
if not self._previous_step_result.contains_agent(agent_id): |
|
|
|
step_result.done[index] = True |
|
|
|
if agent_id in self._done_agents: |
|
|
|
step_result.done[index] = True |
|
|
|
self._done_agents = set() |
|
|
|
self._previous_step_result = step_result # store the new original |
|
|
|
|
|
|
|
_mask: Optional[List[np.array]] = None |
|
|
|
if step_result.action_mask is not None: |
|
|
|
_mask = [] |
|
|
|
for mask_index in range(len(step_result.action_mask)): |
|
|
|
_mask.append(step_result.action_mask[mask_index][indices_to_keep]) |
|
|
|
new_obs: List[np.array] = [] |
|
|
|
for obs_index in range(len(step_result.obs)): |
|
|
|
new_obs.append(step_result.obs[obs_index][indices_to_keep]) |
|
|
|
return BatchedStepResult( |
|
|
|
obs=new_obs, |
|
|
|
reward=step_result.reward[indices_to_keep], |
|
|
|
done=step_result.done[indices_to_keep], |
|
|
|
max_step=step_result.max_step[indices_to_keep], |
|
|
|
agent_id=step_result.agent_id[indices_to_keep], |
|
|
|
action_mask=_mask, |
|
|
|
) |
|
|
|
|
|
|
|
def _sanitize_action(self, action: np.array) -> np.array: |
|
|
|
if self._previous_step_result.n_agents() == self._n_agents: |
|
|
|
return action |
|
|
|
sanitized_action = np.zeros( |
|
|
|
(self._previous_step_result.n_agents(), self.group_spec.action_size) |
|
|
|
) |
|
|
|
input_index = 0 |
|
|
|
for index in range(self._previous_step_result.n_agents()): |
|
|
|
if not self._previous_step_result.done[index]: |
|
|
|
sanitized_action[index, :] = action[input_index, :] |
|
|
|
input_index = input_index + 1 |
|
|
|
return sanitized_action |
|
|
|
|
|
|
|
def _step(self, needs_reset: bool = False) -> BatchedStepResult: |
|
|
|
if needs_reset: |
|
|
|
self._env.reset() |
|
|
|
else: |
|
|
|
self._env.step() |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
# Two possible cases here: |
|
|
|
# 1) all agents requested decisions (some of which might be done) |
|
|
|
# 2) some Agents were marked Done in between steps. |
|
|
|
# In case 2, we re-request decisions until all agents request a real decision. |
|
|
|
while info.n_agents() - sum(info.done) < self._n_agents: |
|
|
|
if not info.done.all(): |
|
|
|
raise UnityGymException( |
|
|
|
"The environment does not have the expected amount of agents." |
|
|
|
+ "Some agents did not request decisions at the same time." |
|
|
|
) |
|
|
|
self._done_agents.update(list(info.agent_id)) |
|
|
|
self._env.step() |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
return self._sanitize_info(info) |
|
|
|
|
|
|
|
@property |
|
|
|
def metadata(self): |
|
|
|