|
|
|
|
|
|
from typing import List, NamedTuple |
|
|
|
import itertools |
|
|
|
import attr |
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
class TeamObsUtil: |
|
|
|
@staticmethod |
|
|
|
def get_name_at(index: int) -> str: |
|
|
|
""" |
|
|
|
returns the name of the observation given the index of the observation |
|
|
|
""" |
|
|
|
return f"team_obs_{index}" |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _padded_time_to_batch( |
|
|
|
agent_buffer_field: AgentBuffer.AgentBufferField, |
|
|
|
) -> List[np.ndarray]: |
|
|
|
""" |
|
|
|
Convert an AgentBufferField of List of obs, where one of the dimension is time and the other is number (e.g. |
|
|
|
in the case of a variable number of critic observations) to a List of obs, where time is in the batch dimension |
|
|
|
of the obs, and the List is the variable number of agents. For cases where there are varying number of agents, |
|
|
|
pad the non-existent agents with 0. |
|
|
|
""" |
|
|
|
# Find the first observation. This should be USUALLY O(1) |
|
|
|
obs_shape = None |
|
|
|
for _team_obs in agent_buffer_field: |
|
|
|
if _team_obs: |
|
|
|
obs_shape = _team_obs[0].shape |
|
|
|
break |
|
|
|
# If there were no critic obs at all |
|
|
|
if obs_shape is None: |
|
|
|
return [] |
|
|
|
|
|
|
|
new_list = list( |
|
|
|
map( |
|
|
|
lambda x: np.asanyarray(x), |
|
|
|
itertools.zip_longest( |
|
|
|
*agent_buffer_field, fillvalue=np.zeros(obs_shape) |
|
|
|
), |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
return new_list |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _transpose_list_of_lists( |
|
|
|
list_list: List[List[np.ndarray]], |
|
|
|
) -> List[List[np.ndarray]]: |
|
|
|
return list(map(list, zip(*list_list))) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_buffer(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
|
|
|
""" |
|
|
|
Creates the list of observations from an AgentBuffer |
|
|
|
""" |
|
|
|
separated_obs: List[np.array] = [] |
|
|
|
for i in range(num_obs): |
|
|
|
separated_obs.append( |
|
|
|
TeamObsUtil._padded_time_to_batch(batch[TeamObsUtil.get_name_at(i)]) |
|
|
|
) |
|
|
|
# separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip |
|
|
|
# that and get a List(num_agents) of Lists(num_obs) |
|
|
|
result = TeamObsUtil._transpose_list_of_lists(separated_obs) |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
class Trajectory(NamedTuple): |
|
|
|
steps: List[AgentExperience] |
|
|
|
next_obs: List[ |
|
|
|
|
|
|
for i in range(num_obs): |
|
|
|
agent_buffer_trajectory[ObsUtil.get_name_at(i)].append(obs[i]) |
|
|
|
agent_buffer_trajectory[ObsUtil.get_name_at_next(i)].append(next_obs[i]) |
|
|
|
agent_buffer_trajectory["critic_obs"].append(exp.collab_obs) |
|
|
|
|
|
|
|
for i in range(num_obs): |
|
|
|
ith_team_obs = [] |
|
|
|
for _team_obs in exp.collab_obs: |
|
|
|
# Assume teammates have same obs space |
|
|
|
ith_team_obs.append(_team_obs[i]) |
|
|
|
agent_buffer_trajectory[TeamObsUtil.get_name_at(i)].append(ith_team_obs) |
|
|
|
|
|
|
|
if exp.memory is not None: |
|
|
|
agent_buffer_trajectory["memory"].append(exp.memory) |
|
|
|