|
|
|
|
|
|
import logging |
|
|
|
import itertools |
|
|
|
import numpy as np |
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union, Set |
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
|
|
|
|
import gym |
|
|
|
from gym import error, spaces |
|
|
|
|
|
|
|
|
|
|
self.visual_obs = None |
|
|
|
self._n_agents = -1 |
|
|
|
self._done_agents: Set[int] = set() |
|
|
|
|
|
|
|
self.agent_mapper = AgentIdIndexMapper() |
|
|
|
|
|
|
|
# Save the step result from the last time all Agents requested decisions. |
|
|
|
self._previous_step_result: BatchedStepResult = None |
|
|
|
self._multiagent = multiagent |
|
|
|
|
|
|
step_result = self._env.get_step_result(self.brain_name) |
|
|
|
self._check_agents(step_result.n_agents()) |
|
|
|
self._previous_step_result = step_result |
|
|
|
self.agent_mapper.set_initial_agents(list(self._previous_step_result.agent_id)) |
|
|
|
|
|
|
|
# Set observation and action spaces |
|
|
|
if self.group_spec.is_action_discrete(): |
|
|
|
|
|
|
"The number of agents in the scene does not match the expected number." |
|
|
|
) |
|
|
|
|
|
|
|
# remove the done Agents |
|
|
|
indices_to_keep: List[int] = [] |
|
|
|
for index, is_done in enumerate(step_result.done): |
|
|
|
if not is_done: |
|
|
|
indices_to_keep.append(index) |
|
|
|
if step_result.n_agents() - sum(step_result.done) != self._n_agents: |
|
|
|
raise UnityGymException( |
|
|
|
"The number of agents in the scene does not match the expected number." |
|
|
|
) |
|
|
|
|
|
|
|
for index, agent_id in enumerate(step_result.agent_id): |
|
|
|
if step_result.done[index]: |
|
|
|
self.agent_mapper.mark_agent_done(agent_id, step_result.reward[index]) |
|
|
|
|
|
|
|
# Set the new AgentDone flags to True |
|
|
|
# Note that the corresponding agent_id that gets marked done will be different |
|
|
|
|
|
|
if not self._previous_step_result.contains_agent(agent_id): |
|
|
|
step_result.done[index] = True |
|
|
|
if agent_id in self._done_agents: |
|
|
|
# Register this agent, and get the reward of the previous agent that |
|
|
|
# was in its index, so that we can return it to the gym. |
|
|
|
last_reward = self.agent_mapper.register_new_agent_id(agent_id) |
|
|
|
self._done_agents = set() |
|
|
|
step_result.reward[index] = last_reward |
|
|
|
|
|
|
|
# Get a permutation of the agent IDs so that a given ID stays in the same |
|
|
|
# index as where it was first seen. |
|
|
|
new_id_order = self.agent_mapper.get_id_permutation(list(step_result.agent_id)) |
|
|
|
|
|
|
|
_mask.append(step_result.action_mask[mask_index][indices_to_keep]) |
|
|
|
_mask.append(step_result.action_mask[mask_index][new_id_order]) |
|
|
|
new_obs.append(step_result.obs[obs_index][indices_to_keep]) |
|
|
|
new_obs.append(step_result.obs[obs_index][new_id_order]) |
|
|
|
reward=step_result.reward[indices_to_keep], |
|
|
|
done=step_result.done[indices_to_keep], |
|
|
|
max_step=step_result.max_step[indices_to_keep], |
|
|
|
agent_id=step_result.agent_id[indices_to_keep], |
|
|
|
reward=step_result.reward[new_id_order], |
|
|
|
done=step_result.done[new_id_order], |
|
|
|
max_step=step_result.max_step[new_id_order], |
|
|
|
agent_id=step_result.agent_id[new_id_order], |
|
|
|
if self._previous_step_result.n_agents() == self._n_agents: |
|
|
|
return action |
|
|
|
input_index = 0 |
|
|
|
for index in range(self._previous_step_result.n_agents()): |
|
|
|
for index, agent_id in enumerate(self._previous_step_result.agent_id): |
|
|
|
sanitized_action[index, :] = action[input_index, :] |
|
|
|
input_index = input_index + 1 |
|
|
|
array_index = self.agent_mapper.get_gym_index(agent_id) |
|
|
|
sanitized_action[index, :] = action[array_index, :] |
|
|
|
return sanitized_action |
|
|
|
|
|
|
|
def _step(self, needs_reset: bool = False) -> BatchedStepResult: |
|
|
|
|
|
|
"The environment does not have the expected amount of agents." |
|
|
|
+ "Some agents did not request decisions at the same time." |
|
|
|
) |
|
|
|
self._done_agents.update(list(info.agent_id)) |
|
|
|
for agent_id, reward in zip(info.agent_id, info.reward): |
|
|
|
self.agent_mapper.mark_agent_done(agent_id, reward) |
|
|
|
|
|
|
|
self._env.step() |
|
|
|
info = self._env.get_step_result(self.brain_name) |
|
|
|
return self._sanitize_info(info) |
|
|
|
|
|
|
:return: The List containing the branched actions. |
|
|
|
""" |
|
|
|
return self.action_lookup[action] |
|
|
|
|
|
|
|
|
|
|
|
class AgentIdIndexMapper: |
|
|
|
def __init__(self) -> None: |
|
|
|
self._agent_id_to_gym_index: Dict[int, int] = {} |
|
|
|
self._done_agents_index_to_last_reward: Dict[int, float] = {} |
|
|
|
|
|
|
|
def set_initial_agents(self, agent_ids: List[int]) -> None: |
|
|
|
""" |
|
|
|
Provide the initial list of agent ids for the mapper |
|
|
|
""" |
|
|
|
for idx, agent_id in enumerate(agent_ids): |
|
|
|
self._agent_id_to_gym_index[agent_id] = idx |
|
|
|
|
|
|
|
def mark_agent_done(self, agent_id: int, reward: float) -> None: |
|
|
|
""" |
|
|
|
Declare the agent done with the corresponding final reward. |
|
|
|
""" |
|
|
|
gym_index = self._agent_id_to_gym_index.pop(agent_id) |
|
|
|
self._done_agents_index_to_last_reward[gym_index] = reward |
|
|
|
|
|
|
|
def register_new_agent_id(self, agent_id: int) -> float: |
|
|
|
""" |
|
|
|
Adds the new agent ID and returns the reward to use for the previous agent in this index |
|
|
|
""" |
|
|
|
# Any free index is OK here. |
|
|
|
free_index, last_reward = self._done_agents_index_to_last_reward.popitem() |
|
|
|
self._agent_id_to_gym_index[agent_id] = free_index |
|
|
|
return last_reward |
|
|
|
|
|
|
|
def get_id_permutation(self, agent_ids: List[int]) -> List[int]: |
|
|
|
""" |
|
|
|
Get the permutation from new agent ids to the order that preserves the positions of previous agents. |
|
|
|
The result is a list with each integer from 0 to len(agent_ids)-1 appearing exactly once. |
|
|
|
""" |
|
|
|
# Map the new agent ids to the their index |
|
|
|
new_agent_ids_to_index = { |
|
|
|
agent_id: idx for idx, agent_id in enumerate(agent_ids) |
|
|
|
} |
|
|
|
|
|
|
|
# Make the output list. We don't write to it sequentially, so start with dummy values. |
|
|
|
new_permutation = [-1] * len(agent_ids) |
|
|
|
|
|
|
|
# For each agent ID, find the new index of the agent, and write it in the original index. |
|
|
|
for agent_id, original_index in self._agent_id_to_gym_index.items(): |
|
|
|
new_permutation[original_index] = new_agent_ids_to_index[agent_id] |
|
|
|
return new_permutation |
|
|
|
|
|
|
|
def get_gym_index(self, agent_id: int) -> int: |
|
|
|
""" |
|
|
|
Get the gym index for the current agent. |
|
|
|
""" |
|
|
|
return self._agent_id_to_gym_index[agent_id] |
|
|
|
|
|
|
|
|
|
|
|
class AgentIdIndexMapperSlow: |
|
|
|
""" |
|
|
|
Reference implementation of AgentIdIndexMapper. |
|
|
|
The operations are O(N^2) so it shouldn't be used for large numbers of agents. |
|
|
|
See AgentIdIndexMapper for method descriptions |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self) -> None: |
|
|
|
self._gym_id_order: List[int] = [] |
|
|
|
self._done_agents_index_to_last_reward: Dict[int, float] = {} |
|
|
|
|
|
|
|
def set_initial_agents(self, agent_ids: List[int]) -> None: |
|
|
|
self._gym_id_order = list(agent_ids) |
|
|
|
|
|
|
|
def mark_agent_done(self, agent_id: int, reward: float) -> None: |
|
|
|
gym_index = self._gym_id_order.index(agent_id) |
|
|
|
self._done_agents_index_to_last_reward[gym_index] = reward |
|
|
|
self._gym_id_order[gym_index] = -1 |
|
|
|
|
|
|
|
def register_new_agent_id(self, agent_id: int) -> float: |
|
|
|
original_index = self._gym_id_order.index(-1) |
|
|
|
self._gym_id_order[original_index] = agent_id |
|
|
|
reward = self._done_agents_index_to_last_reward.pop(original_index) |
|
|
|
return reward |
|
|
|
|
|
|
|
def get_id_permutation(self, agent_ids): |
|
|
|
new_id_order = [] |
|
|
|
for agent_id in self._gym_id_order: |
|
|
|
new_id_order.append(agent_ids.index(agent_id)) |
|
|
|
return new_id_order |
|
|
|
|
|
|
|
def get_gym_index(self, agent_id: int) -> int: |
|
|
|
return self._gym_id_order.index(agent_id) |