浏览代码

Fixing Gym for single and multi-agent following reset changes on C# (#3417)

* """""Fixing""""""

* Update gym-unity/gym_unity/envs/__init__.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update gym-unity/gym_unity/envs/__init__.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* addressing comments

* Update gym-unity/gym_unity/envs/__init__.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update gym-unity/gym_unity/envs/__init__.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* Update gym-unity/gym_unity/envs/__init__.py

Co-Authored-By: Chris Elion <chris.elion@unity3d.com>

* bug fix

* Fixing the test

* gym multiagent comments (#3421)

* rename and comments

* enumerate

Co-authored-by: Chris Elion <celion@gmail.com>
/release-0.14.0
GitHub 5 年前
当前提交
c3ff7993
共有 2 个文件被更改,包括 111 次插入18 次删除
  1. 127
      gym-unity/gym_unity/envs/__init__.py
  2. 2
      gym-unity/gym_unity/tests/test_gym.py

127
gym-unity/gym_unity/envs/__init__.py


import logging
import itertools
import numpy as np
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, Set
import gym
from gym import error, spaces

:param no_graphics: Whether to run the Unity simulator in no-graphics mode
:param allow_multiple_visual_obs: If True, return a list of visual observations instead of only one.
"""
base_port = 5005
if environment_filename is None:
base_port = UnityEnvironment.DEFAULT_EDITOR_PORT
environment_filename, worker_id, no_graphics=no_graphics
environment_filename,
worker_id,
base_port=base_port,
no_graphics=no_graphics,
)
# Take a single step so that the brain information will be sent over

self.visual_obs = None
self._current_state = None
self._n_agents = None
self._n_agents = -1
self._done_agents: Set[int] = set()
# Save the step result from the last time all Agents requested decisions.
self._previous_step_result: BatchedStepResult = None
self._multiagent = multiagent
self._flattener = None
# Hidden flag used by Atari environments to determine if the game is over

self._env.reset()
step_result = self._env.get_step_result(self.brain_name)
self._check_agents(step_result.n_agents())
self._previous_step_result = step_result
# Set observation and action spaces
if self.group_spec.is_action_discrete():

Returns: observation (object/list): the initial observation of the
space.
"""
self._env.reset()
info = self._env.get_step_result(self.brain_name)
n_agents = info.n_agents()
step_result = self._step(True)
n_agents = step_result.n_agents()
res: GymStepResult = self._single_step(info)
res: GymStepResult = self._single_step(step_result)
res = self._multi_step(info)
res = self._multi_step(step_result)
return res[0]
def step(self, action: List[Any]) -> GymStepResult:

spec = self.group_spec
action = np.array(action).reshape((self._n_agents, spec.action_size))
action = self._sanitize_action(action)
self._env.step()
info = self._env.get_step_result(self.brain_name)
n_agents = info.n_agents()
step_result = self._step()
n_agents = step_result.n_agents()
self._current_state = info
single_res = self._single_step(info)
single_res = self._single_step(step_result)
multi_res = self._multi_step(info)
multi_res = self._multi_step(step_result)
self.game_over = all(multi_res[2])
return multi_res

self.visual_obs = self._preprocess_single(visual_obs[0][0])
default_observation = self.visual_obs
elif self._get_vec_obs_size() > 0:
default_observation = self._get_vector_obs(info)[0, :]
default_observation = self._get_vector_obs(info)[0, :]
raise UnityGymException(
"The Agent does not have vector observations and the environment was not setup"
+ "to use visual observations."
)
return (
default_observation,

"The environment was launched as a mutli-agent environment, however"
"there is only one agent in the scene."
)
if self._n_agents is None:
if self._n_agents == -1:
self._n_agents = n_agents
logger.info("{} agents within environment.".format(n_agents))
elif self._n_agents != n_agents:

)
def _sanitize_info(self, step_result: BatchedStepResult) -> BatchedStepResult:
n_extra_agents = step_result.n_agents() - self._n_agents
if n_extra_agents < 0 or n_extra_agents > self._n_agents:
# In this case, some Agents did not request a decision when expected
# or too many requested a decision
raise UnityGymException(
"The number of agents in the scene does not match the expected number."
)
# remove the done Agents
indices_to_keep: List[int] = []
for index, is_done in enumerate(step_result.done):
if not is_done:
indices_to_keep.append(index)
# Set the new AgentDone flags to True
# Note that the corresponding agent_id that gets marked done will be different
# than the original agent that was done, but this is OK since the gym interface
# only cares about the ordering.
for index, agent_id in enumerate(step_result.agent_id):
if not self._previous_step_result.contains_agent(agent_id):
step_result.done[index] = True
if agent_id in self._done_agents:
step_result.done[index] = True
self._done_agents = set()
self._previous_step_result = step_result # store the new original
_mask: Optional[List[np.array]] = None
if step_result.action_mask is not None:
_mask = []
for mask_index in range(len(step_result.action_mask)):
_mask.append(step_result.action_mask[mask_index][indices_to_keep])
new_obs: List[np.array] = []
for obs_index in range(len(step_result.obs)):
new_obs.append(step_result.obs[obs_index][indices_to_keep])
return BatchedStepResult(
obs=new_obs,
reward=step_result.reward[indices_to_keep],
done=step_result.done[indices_to_keep],
max_step=step_result.max_step[indices_to_keep],
agent_id=step_result.agent_id[indices_to_keep],
action_mask=_mask,
)
def _sanitize_action(self, action: np.array) -> np.array:
if self._previous_step_result.n_agents() == self._n_agents:
return action
sanitized_action = np.zeros(
(self._previous_step_result.n_agents(), self.group_spec.action_size)
)
input_index = 0
for index in range(self._previous_step_result.n_agents()):
if not self._previous_step_result.done[index]:
sanitized_action[index, :] = action[input_index, :]
input_index = input_index + 1
return sanitized_action
def _step(self, needs_reset: bool = False) -> BatchedStepResult:
if needs_reset:
self._env.reset()
else:
self._env.step()
info = self._env.get_step_result(self.brain_name)
# Two possible cases here:
# 1) all agents requested decisions (some of which might be done)
# 2) some Agents were marked Done in between steps.
# In case 2, we re-request decisions until all agents request a real decision.
while info.n_agents() - sum(info.done) < self._n_agents:
if not info.done.all():
raise UnityGymException(
"The environment does not have the expected amount of agents."
+ "Some agents did not request decisions at the same time."
)
self._done_agents.update(list(info.agent_id))
self._env.step()
info = self._env.get_step_result(self.brain_name)
return self._sanitize_info(info)
@property
def metadata(self):

2
gym-unity/gym_unity/tests/test_gym.py


:int num_agents: Number of "agents" to imitate in your BatchedStepResult values.
"""
obs = [np.array([num_agents * [1, 2, 3]])]
obs = [np.array([num_agents * [1, 2, 3]]).reshape(num_agents, 3)]
if number_visual_observations:
obs += [np.zeros(shape=(num_agents, 8, 8, 3), dtype=np.float32)]
rewards = np.array(num_agents * [1.0])

正在加载...
取消
保存