|
|
|
|
|
|
BehaviorName, |
|
|
|
AgentId, |
|
|
|
ActionType, |
|
|
|
BehaviorMapping, |
|
|
|
from typing import List, Tuple, Union, Optional |
|
|
|
from typing import Tuple, Union, Optional |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
if isinstance(self._gym_env.action_space, gym.spaces.Box): |
|
|
|
action_type = ActionType.CONTINUOUS |
|
|
|
action_shape = np.prod(self._gym_env.action_space.shape) |
|
|
|
self.act_ratio = np.maximum( |
|
|
|
self._act_ratio = np.maximum( |
|
|
|
self.act_ratio[self.act_ratio > 1e38] = 1 |
|
|
|
self._act_ratio[self._act_ratio > 1e38] = 1 |
|
|
|
elif isinstance(self._gym_env.action_space, gym.spaces.Discrete): |
|
|
|
action_shape = (self._gym_env.action_space.n,) |
|
|
|
action_type = ActionType.DISCRETE |
|
|
|
|
|
|
raise UnityObservationException( |
|
|
|
f"Unknown observation type {self._gym_env.observation_space}" |
|
|
|
) |
|
|
|
self.obs_ratio = np.maximum( |
|
|
|
self._obs_ratio = np.maximum( |
|
|
|
self.obs_ratio[self.obs_ratio > 1e38] = 1 |
|
|
|
self._obs_ratio[self._obs_ratio > 1e38] = 1 |
|
|
|
self._behavior_specs = BehaviorSpec( |
|
|
|
observation_shapes=[self._gym_env.observation_space.shape], |
|
|
|
action_type=action_type, |
|
|
|
|
|
|
TerminalSteps.empty(self._behavior_specs), |
|
|
|
) |
|
|
|
|
|
|
|
@property |
|
|
|
def behavior_specs(self) -> BehaviorMapping: |
|
|
|
return BehaviorMapping({self._behavior_name: self._behavior_specs}) |
|
|
|
|
|
|
|
def step(self) -> None: |
|
|
|
if self._first_message: |
|
|
|
self.reset() |
|
|
|
|
|
|
self._current_steps = ( |
|
|
|
DecisionSteps( |
|
|
|
obs=[np.expand_dims(obs / self.obs_ratio, axis=0)], |
|
|
|
obs=[np.expand_dims(obs / self._obs_ratio, axis=0)], |
|
|
|
reward=np.array([rew], dtype=np.float32), |
|
|
|
agent_id=np.array([self._AGENT_ID], dtype=np.int32), |
|
|
|
action_mask=None, |
|
|
|
|
|
|
self._current_steps = ( |
|
|
|
DecisionSteps.empty(self._behavior_specs), |
|
|
|
TerminalSteps( |
|
|
|
obs=[np.expand_dims(obs / self.obs_ratio, axis=0)], |
|
|
|
obs=[np.expand_dims(obs / self._obs_ratio, axis=0)], |
|
|
|
reward=np.array([rew], dtype=np.float32), |
|
|
|
interrupted=np.array( |
|
|
|
[info.get("TimeLimit.truncated", False)], dtype=np.bool |
|
|
|
|
|
|
obs = self._gym_env.reset() |
|
|
|
self._current_steps = ( |
|
|
|
DecisionSteps( |
|
|
|
obs=[np.expand_dims(obs / self.obs_ratio, axis=0)], |
|
|
|
obs=[np.expand_dims(obs / self._obs_ratio, axis=0)], |
|
|
|
reward=np.array([0], dtype=np.float32), |
|
|
|
agent_id=np.array([self._AGENT_ID], dtype=np.int32), |
|
|
|
action_mask=None, |
|
|
|
|
|
|
|
|
|
|
def close(self) -> None: |
|
|
|
self._gym_env.close() |
|
|
|
|
|
|
|
def get_behavior_names(self) -> List[BehaviorName]: |
|
|
|
return [self._behavior_name] |
|
|
|
|
|
|
|
def set_actions(self, behavior_name: BehaviorName, action: np.ndarray) -> None: |
|
|
|
assert behavior_name == self._behavior_name |
|
|
|
|
|
|
if isinstance(self._gym_env.action_space, gym.spaces.Discrete): |
|
|
|
self._g_action = int(action[0, 0]) |
|
|
|
elif isinstance(self._gym_env.action_space, gym.spaces.Box): |
|
|
|
self._g_action = action[0] / self.act_ratio |
|
|
|
self._g_action = action[0] / self._act_ratio |
|
|
|
else: |
|
|
|
raise UnityActionException( |
|
|
|
f"Unknown action type {self._gym_env.action_space}" |
|
|
|
|
|
|
if isinstance(self._gym_env.action_space, gym.spaces.Discrete): |
|
|
|
self._g_action = int(action[0]) |
|
|
|
elif isinstance(self._gym_env.action_space, gym.spaces.Box): |
|
|
|
self._g_action = action / self.act_ratio |
|
|
|
self._g_action = action / self._act_ratio |
|
|
|
else: |
|
|
|
raise UnityActionException( |
|
|
|
f"Unknown action type {self._gym_env.action_space}" |
|
|
|
|
|
|
) -> Tuple[DecisionSteps, TerminalSteps]: |
|
|
|
assert behavior_name == self._behavior_name |
|
|
|
return self._current_steps |
|
|
|
|
|
|
|
def get_behavior_spec(self, behavior_name: BehaviorName) -> BehaviorSpec: |
|
|
|
assert behavior_name == self._behavior_name |
|
|
|
return self._behavior_specs |