|
|
|
|
|
|
import numpy as np |
|
|
|
from mlagents.envs.environment import UnityEnvironment |
|
|
|
from gym import error, spaces |
|
|
|
from mlagents.envs.brain_conversion_utils import ( |
|
|
|
step_result_to_brain_info, |
|
|
|
group_spec_to_brain_parameters, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class UnityGymException(error.Error): |
|
|
|
|
|
|
|
|
|
|
self.brain_name = self._env.get_agent_groups()[0] |
|
|
|
self.name = self.brain_name |
|
|
|
brain = group_spec_to_brain_parameters( |
|
|
|
self.brain_name, self._env.get_agent_group_spec(self.brain_name) |
|
|
|
) |
|
|
|
self.group_spec = self._env.get_agent_group_spec(self.brain_name) |
|
|
|
if use_visual and brain.number_visual_observations == 0: |
|
|
|
if use_visual and self._get_n_vis_obs() == 0: |
|
|
|
self.use_visual = brain.number_visual_observations >= 1 and use_visual |
|
|
|
self.use_visual = self._get_n_vis_obs() >= 1 and use_visual |
|
|
|
|
|
|
|
if not use_visual and uint8_visual: |
|
|
|
logger.warning( |
|
|
|
|
|
|
else: |
|
|
|
self.uint8_visual = uint8_visual |
|
|
|
|
|
|
|
if brain.number_visual_observations > 1 and not self._allow_multiple_visual_obs: |
|
|
|
if self._get_n_vis_obs() > 1 and not self._allow_multiple_visual_obs: |
|
|
|
logger.warning( |
|
|
|
"The environment contains more than one visual observation. " |
|
|
|
"You must define allow_multiple_visual_obs=True to received them all. " |
|
|
|
|
|
|
# Check for number of agents in scene. |
|
|
|
self._env.reset() |
|
|
|
initial_info = step_result_to_brain_info( |
|
|
|
self._env.get_step_result(self.brain_name), |
|
|
|
self._env.get_agent_group_spec(self.brain_name), |
|
|
|
) |
|
|
|
self._check_agents(len(initial_info.agents)) |
|
|
|
step_result = self._env.get_step_result(self.brain_name) |
|
|
|
self._check_agents(step_result.n_agents()) |
|
|
|
if brain.vector_action_space_type == "discrete": |
|
|
|
if len(brain.vector_action_space_size) == 1: |
|
|
|
self._action_space = spaces.Discrete(brain.vector_action_space_size[0]) |
|
|
|
if self.group_spec.is_action_discrete(): |
|
|
|
branches = self.group_spec.discrete_action_branches |
|
|
|
if self.group_spec.action_shape == 1: |
|
|
|
self._action_space = spaces.Discrete(branches[0]) |
|
|
|
self._flattener = ActionFlattener(brain.vector_action_space_size) |
|
|
|
self._flattener = ActionFlattener(branches) |
|
|
|
self._action_space = spaces.MultiDiscrete( |
|
|
|
brain.vector_action_space_size |
|
|
|
) |
|
|
|
self._action_space = spaces.MultiDiscrete(branches) |
|
|
|
|
|
|
|
else: |
|
|
|
if flatten_branched: |
|
|
|
|
|
|
) |
|
|
|
high = np.array([1] * brain.vector_action_space_size[0]) |
|
|
|
high = np.array([1] * self.group_spec.action_shape) |
|
|
|
high = np.array([np.inf] * brain.vector_observation_space_size) |
|
|
|
self.action_meanings = brain.vector_action_descriptions |
|
|
|
high = np.array([np.inf] * self._get_vec_obs_size()) |
|
|
|
shape = ( |
|
|
|
brain.camera_resolutions[0].height, |
|
|
|
brain.camera_resolutions[0].width, |
|
|
|
brain.camera_resolutions[0].num_channels, |
|
|
|
) |
|
|
|
shape = self._get_vis_obs_shape() |
|
|
|
if uint8_visual: |
|
|
|
self._observation_space = spaces.Box( |
|
|
|
0, 255, dtype=np.uint8, shape=shape |
|
|
|
|
|
|
space. |
|
|
|
""" |
|
|
|
self._env.reset() |
|
|
|
info = step_result_to_brain_info( |
|
|
|
self._env.get_step_result(self.brain_name), |
|
|
|
self._env.get_agent_group_spec(self.brain_name), |
|
|
|
) |
|
|
|
n_agents = len(info.agents) |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
n_agents = info.n_agents() |
|
|
|
self._check_agents(n_agents) |
|
|
|
self.game_over = False |
|
|
|
|
|
|
|
|
|
|
# Translate action into list |
|
|
|
action = self._flattener.lookup_action(action) |
|
|
|
|
|
|
|
spec = self._env.get_agent_group_spec(self.brain_name) |
|
|
|
spec = self.group_spec |
|
|
|
info = step_result_to_brain_info( |
|
|
|
self._env.get_step_result(self.brain_name), spec |
|
|
|
) |
|
|
|
n_agents = len(info.agents) |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
n_agents = info.n_agents() |
|
|
|
self._check_agents(n_agents) |
|
|
|
self._current_state = info |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _single_step(self, info): |
|
|
|
if self.use_visual: |
|
|
|
visual_obs = info.visual_observations |
|
|
|
visual_obs = self._get_vis_obs_list(info) |
|
|
|
|
|
|
|
if self._allow_multiple_visual_obs: |
|
|
|
visual_obs_list = [] |
|
|
|
|
|
|
|
|
|
|
default_observation = self.visual_obs |
|
|
|
else: |
|
|
|
default_observation = info.vector_observations[0, :] |
|
|
|
default_observation = self._get_vector_obs(info)[0, :] |
|
|
|
return ( |
|
|
|
default_observation, |
|
|
|
info.rewards[0], |
|
|
|
info.local_done[0], |
|
|
|
{"text_observation": None, "brain_info": info}, |
|
|
|
) |
|
|
|
return (default_observation, info.reward[0], info.done[0], info) |
|
|
|
|
|
|
|
def _preprocess_single(self, single_visual_obs): |
|
|
|
if self.uint8_visual: |
|
|
|
|
|
|
|
|
|
|
def _multi_step(self, info): |
|
|
|
if self.use_visual: |
|
|
|
self.visual_obs = self._preprocess_multi(info.visual_observations) |
|
|
|
self.visual_obs = self._preprocess_multi(self._get_vis_obs_list(info)) |
|
|
|
default_observation = info.vector_observations |
|
|
|
return ( |
|
|
|
list(default_observation), |
|
|
|
info.rewards, |
|
|
|
info.local_done, |
|
|
|
{"text_observation": None, "brain_info": info}, |
|
|
|
) |
|
|
|
default_observation = self._get_vector_obs(info) |
|
|
|
return (list(default_observation), list(info.reward), list(info.done), info) |
|
|
|
|
|
|
|
def _get_n_vis_obs(self) -> int: |
|
|
|
result = 0 |
|
|
|
for shape in self.group_spec.observation_shapes: |
|
|
|
if len(shape) == 3: |
|
|
|
result += 1 |
|
|
|
return result |
|
|
|
|
|
|
|
def _get_vis_obs_shape(self): |
|
|
|
for shape in self.group_spec.observation_shapes: |
|
|
|
if len(shape) == 3: |
|
|
|
return shape |
|
|
|
|
|
|
|
def _get_vis_obs_list(self, step_result): |
|
|
|
result = [] |
|
|
|
for obs in step_result.obs: |
|
|
|
if len(obs.shape) == 4: |
|
|
|
result += [obs] |
|
|
|
return result |
|
|
|
|
|
|
|
def _get_vector_obs(self, step_result): |
|
|
|
result = [] |
|
|
|
for obs in step_result.obs: |
|
|
|
if len(obs.shape) == 2: |
|
|
|
result += [obs] |
|
|
|
return np.concatenate(result, axis=1) |
|
|
|
|
|
|
|
def _get_vec_obs_size(self) -> int: |
|
|
|
result = 0 |
|
|
|
for shape in self.group_spec.observation_shapes: |
|
|
|
if len(shape) == 1: |
|
|
|
result += shape[0] |
|
|
|
return result |
|
|
|
|
|
|
|
def _preprocess_multi(self, multiple_visual_obs): |
|
|
|
if self.uint8_visual: |
|
|
|
|
|
|
garbage collected or when the program exits. |
|
|
|
""" |
|
|
|
self._env.close() |
|
|
|
|
|
|
|
def get_action_meanings(self): |
|
|
|
return self.action_meanings |
|
|
|
|
|
|
|
def seed(self, seed=None): |
|
|
|
"""Sets the seed for this env's random number generator(s). |
|
|
|