|
|
|
|
|
|
import logging |
|
|
|
import itertools |
|
|
|
import numpy as np |
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
from gym import error, spaces |
|
|
|
|
|
|
|
from gym import error, spaces |
|
|
|
from mlagents_envs.base_env import BatchedStepResult |
|
|
|
|
|
|
|
|
|
|
|
class UnityGymException(error.Error): |
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
logger = logging.getLogger("gym_unity") |
|
|
|
|
|
|
|
|
|
|
|
GymSingleStepResult = Tuple[np.ndarray, float, bool, Dict] |
|
|
|
GymMultiStepResult = Tuple[List[np.ndarray], List[float], List[bool], Dict] |
|
|
|
GymStepResult = Union[GymSingleStepResult, GymMultiStepResult] |
|
|
|
|
|
|
|
|
|
|
|
class UnityEnv(gym.Env): |
|
|
|
|
|
|
else: |
|
|
|
self._observation_space = spaces.Box(-high, high, dtype=np.float32) |
|
|
|
|
|
|
|
def reset(self): |
|
|
|
def reset(self) -> Union[List[np.ndarray], np.ndarray]: |
|
|
|
"""Resets the state of the environment and returns an initial observation. |
|
|
|
In the case of multi-agent environments, this is a list. |
|
|
|
Returns: observation (object/list): the initial observation of the |
|
|
|
|
|
|
self.game_over = False |
|
|
|
|
|
|
|
if not self._multiagent: |
|
|
|
obs, reward, done, info = self._single_step(info) |
|
|
|
res: GymStepResult = self._single_step(info) |
|
|
|
obs, reward, done, info = self._multi_step(info) |
|
|
|
return obs |
|
|
|
res = self._multi_step(info) |
|
|
|
return res[0] |
|
|
|
def step(self, action): |
|
|
|
def step(self, action: List[Any]) -> GymStepResult: |
|
|
|
"""Run one timestep of the environment's dynamics. When end of |
|
|
|
episode is reached, you are responsible for calling `reset()` |
|
|
|
to reset this environment's state. |
|
|
|
|
|
|
self._current_state = info |
|
|
|
|
|
|
|
if not self._multiagent: |
|
|
|
obs, reward, done, info = self._single_step(info) |
|
|
|
self.game_over = done |
|
|
|
single_res = self._single_step(info) |
|
|
|
self.game_over = single_res[2] |
|
|
|
return single_res |
|
|
|
obs, reward, done, info = self._multi_step(info) |
|
|
|
self.game_over = all(done) |
|
|
|
return obs, reward, done, info |
|
|
|
multi_res = self._multi_step(info) |
|
|
|
self.game_over = all(multi_res[2]) |
|
|
|
return multi_res |
|
|
|
def _single_step(self, info): |
|
|
|
def _single_step(self, info: BatchedStepResult) -> GymSingleStepResult: |
|
|
|
if self.use_visual: |
|
|
|
visual_obs = self._get_vis_obs_list(info) |
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
default_observation = self._get_vector_obs(info)[0, :] |
|
|
|
|
|
|
|
return (default_observation, info.reward[0], info.done[0], info) |
|
|
|
return ( |
|
|
|
default_observation, |
|
|
|
info.reward[0], |
|
|
|
info.done[0], |
|
|
|
{"batched_step_result": info}, |
|
|
|
) |
|
|
|
def _preprocess_single(self, single_visual_obs): |
|
|
|
def _preprocess_single(self, single_visual_obs: np.ndarray) -> np.ndarray: |
|
|
|
def _multi_step(self, info): |
|
|
|
def _multi_step(self, info: BatchedStepResult) -> GymMultiStepResult: |
|
|
|
return (list(default_observation), list(info.reward), list(info.done), info) |
|
|
|
return ( |
|
|
|
list(default_observation), |
|
|
|
list(info.reward), |
|
|
|
list(info.done), |
|
|
|
{"batched_step_result": info}, |
|
|
|
) |
|
|
|
|
|
|
|
def _get_n_vis_obs(self) -> int: |
|
|
|
result = 0 |
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
def _get_vis_obs_shape(self): |
|
|
|
def _get_vis_obs_shape(self) -> Optional[Tuple]: |
|
|
|
return None |
|
|
|
def _get_vis_obs_list(self, step_result): |
|
|
|
result = [] |
|
|
|
def _get_vis_obs_list(self, step_result: BatchedStepResult) -> List[np.ndarray]: |
|
|
|
result: List[np.ndarray] = [] |
|
|
|
result += [obs] |
|
|
|
result.append(obs) |
|
|
|
def _get_vector_obs(self, step_result): |
|
|
|
result = [] |
|
|
|
def _get_vector_obs(self, step_result: BatchedStepResult) -> np.ndarray: |
|
|
|
result: List[np.ndarray] = [] |
|
|
|
result += [obs] |
|
|
|
result.append(obs) |
|
|
|
return np.concatenate(result, axis=1) |
|
|
|
|
|
|
|
def _get_vec_obs_size(self) -> int: |
|
|
|
|
|
|
result += shape[0] |
|
|
|
return result |
|
|
|
|
|
|
|
def _preprocess_multi(self, multiple_visual_obs): |
|
|
|
def _preprocess_multi( |
|
|
|
self, multiple_visual_obs: List[np.ndarray] |
|
|
|
) -> List[np.ndarray]: |
|
|
|
if self.uint8_visual: |
|
|
|
return [ |
|
|
|
(255.0 * _visual_obs).astype(np.uint8) |
|
|
|
|
|
|
def render(self, mode="rgb_array"): |
|
|
|
return self.visual_obs |
|
|
|
|
|
|
|
def close(self): |
|
|
|
def close(self) -> None: |
|
|
|
"""Override _close in your subclass to perform any necessary cleanup. |
|
|
|
Environments will automatically close() themselves when |
|
|
|
garbage collected or when the program exits. |
|
|
|
|
|
|
def seed(self, seed=None): |
|
|
|
def seed(self, seed: Any = None) -> None: |
|
|
|
"""Sets the seed for this env's random number generator(s). |
|
|
|
Currently not implemented. |
|
|
|
""" |
|
|
|
|
|
|
def _check_agents(self, n_agents): |
|
|
|
def _check_agents(self, n_agents: int) -> None: |
|
|
|
if not self._multiagent and n_agents > 1: |
|
|
|
raise UnityGymException( |
|
|
|
"The environment was launched as a single-agent environment, however" |
|
|
|
|
|
|
return {"render.modes": ["rgb_array"]} |
|
|
|
|
|
|
|
@property |
|
|
|
def reward_range(self): |
|
|
|
def reward_range(self) -> Tuple[float, float]: |
|
|
|
return -float("inf"), float("inf") |
|
|
|
|
|
|
|
@property |
|
|
|