浏览代码

type annotations on gym interface, return dict from step (#3136)

/asymm-envs
GitHub 4 年前
当前提交
ebefb735
共有 3 个文件被更改,包括 59 次插入33 次删除
  1. 81
      gym-unity/gym_unity/envs/__init__.py
  2. 3
      gym-unity/gym_unity/tests/test_gym.py
  3. 8
      ml-agents-envs/mlagents_envs/rpc_utils.py

81
gym-unity/gym_unity/envs/__init__.py


import logging
import itertools
import numpy as np
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from gym import error, spaces
from gym import error, spaces
from mlagents_envs.base_env import BatchedStepResult
class UnityGymException(error.Error):

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gym_unity")
GymSingleStepResult = Tuple[np.ndarray, float, bool, Dict]
GymMultiStepResult = Tuple[List[np.ndarray], List[float], List[bool], Dict]
GymStepResult = Union[GymSingleStepResult, GymMultiStepResult]
class UnityEnv(gym.Env):

else:
self._observation_space = spaces.Box(-high, high, dtype=np.float32)
def reset(self):
def reset(self) -> Union[List[np.ndarray], np.ndarray]:
"""Resets the state of the environment and returns an initial observation.
In the case of multi-agent environments, this is a list.
Returns: observation (object/list): the initial observation of the

self.game_over = False
if not self._multiagent:
obs, reward, done, info = self._single_step(info)
res: GymStepResult = self._single_step(info)
obs, reward, done, info = self._multi_step(info)
return obs
res = self._multi_step(info)
return res[0]
def step(self, action):
def step(self, action: List[Any]) -> GymStepResult:
"""Run one timestep of the environment's dynamics. When end of
episode is reached, you are responsible for calling `reset()`
to reset this environment's state.

self._current_state = info
if not self._multiagent:
obs, reward, done, info = self._single_step(info)
self.game_over = done
single_res = self._single_step(info)
self.game_over = single_res[2]
return single_res
obs, reward, done, info = self._multi_step(info)
self.game_over = all(done)
return obs, reward, done, info
multi_res = self._multi_step(info)
self.game_over = all(multi_res[2])
return multi_res
def _single_step(self, info):
def _single_step(self, info: BatchedStepResult) -> GymSingleStepResult:
if self.use_visual:
visual_obs = self._get_vis_obs_list(info)

else:
default_observation = self._get_vector_obs(info)[0, :]
return (default_observation, info.reward[0], info.done[0], info)
return (
default_observation,
info.reward[0],
info.done[0],
{"batched_step_result": info},
)
def _preprocess_single(self, single_visual_obs):
def _preprocess_single(self, single_visual_obs: np.ndarray) -> np.ndarray:
def _multi_step(self, info):
def _multi_step(self, info: BatchedStepResult) -> GymMultiStepResult:
return (list(default_observation), list(info.reward), list(info.done), info)
return (
list(default_observation),
list(info.reward),
list(info.done),
{"batched_step_result": info},
)
def _get_n_vis_obs(self) -> int:
result = 0

return result
def _get_vis_obs_shape(self):
def _get_vis_obs_shape(self) -> Optional[Tuple]:
return None
def _get_vis_obs_list(self, step_result):
result = []
def _get_vis_obs_list(self, step_result: BatchedStepResult) -> List[np.ndarray]:
result: List[np.ndarray] = []
result += [obs]
result.append(obs)
def _get_vector_obs(self, step_result):
result = []
def _get_vector_obs(self, step_result: BatchedStepResult) -> np.ndarray:
result: List[np.ndarray] = []
result += [obs]
result.append(obs)
return np.concatenate(result, axis=1)
def _get_vec_obs_size(self) -> int:

result += shape[0]
return result
def _preprocess_multi(self, multiple_visual_obs):
def _preprocess_multi(
self, multiple_visual_obs: List[np.ndarray]
) -> List[np.ndarray]:
if self.uint8_visual:
return [
(255.0 * _visual_obs).astype(np.uint8)

def render(self, mode="rgb_array"):
return self.visual_obs
def close(self):
def close(self) -> None:
"""Override _close in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when
garbage collected or when the program exits.

def seed(self, seed=None):
def seed(self, seed: Any = None) -> None:
"""Sets the seed for this env's random number generator(s).
Currently not implemented.
"""

def _check_agents(self, n_agents):
def _check_agents(self, n_agents: int) -> None:
if not self._multiagent and n_agents > 1:
raise UnityGymException(
"The environment was launched as a single-agent environment, however"

return {"render.modes": ["rgb_array"]}
@property
def reward_range(self):
def reward_range(self) -> Tuple[float, float]:
return -float("inf"), float("inf")
@property

3
gym-unity/gym_unity/tests/test_gym.py


assert isinstance(obs, np.ndarray)
assert isinstance(rew, float)
assert isinstance(done, (bool, np.bool_))
assert isinstance(info, dict)
@mock.patch("gym_unity.envs.UnityEnvironment")

assert isinstance(obs, list)
assert isinstance(rew, list)
assert isinstance(done, list)
assert isinstance(info, dict)
@mock.patch("gym_unity.envs.UnityEnvironment")

assert isinstance(obs, np.ndarray)
assert isinstance(rew, float)
assert isinstance(done, (bool, np.bool_))
assert isinstance(info, dict)
# Helper methods

8
ml-agents-envs/mlagents_envs/rpc_utils.py


is_visual = len(obs_shape) == 3
if is_visual:
obs_shape = cast(Tuple[int, int, int], obs_shape)
obs_list += [
obs_list.append(
]
)
obs_list += [
obs_list.append(
]
)
rewards = np.array(
[agent_info.reward for agent_info in agent_info_list], dtype=np.float32
)

正在加载...
取消
保存