|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr.s(auto_attribs=True) |
|
|
|
class TeammateStatus: |
|
|
|
""" |
|
|
|
Stores data related to an agent's teammate. |
|
|
|
""" |
|
|
|
|
|
|
|
obs: List[np.ndarray] |
|
|
|
reward: float |
|
|
|
action: ActionTuple |
|
|
|
done: bool |
|
|
|
|
|
|
|
|
|
|
|
@attr.s(auto_attribs=True) |
|
|
|
collab_obs: List[List[np.ndarray]] |
|
|
|
teammate_status: List[TeammateStatus] |
|
|
|
reward: float |
|
|
|
done: bool |
|
|
|
action: ActionTuple |
|
|
|
|
|
|
return f"team_obs_{index}" |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def get_name_at_next(index: int) -> str: |
|
|
|
""" |
|
|
|
returns the name of the next team observation given the index of the observation |
|
|
|
""" |
|
|
|
return f"team_obs_next_{index}" |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _padded_time_to_batch( |
|
|
|
agent_buffer_field: AgentBuffer.AgentBufferField, |
|
|
|
) -> List[np.ndarray]: |
|
|
|
|
|
|
result = TeamObsUtil._transpose_list_of_lists(separated_obs) |
|
|
|
return result |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]: |
|
|
|
""" |
|
|
|
Creates the list of observations from an AgentBuffer |
|
|
|
""" |
|
|
|
separated_obs: List[np.array] = [] |
|
|
|
for i in range(num_obs): |
|
|
|
separated_obs.append( |
|
|
|
TeamObsUtil._padded_time_to_batch( |
|
|
|
batch[TeamObsUtil.get_name_at_next(i)] |
|
|
|
) |
|
|
|
) |
|
|
|
# separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip |
|
|
|
# that and get a List(num_agents) of Lists(num_obs) |
|
|
|
result = TeamObsUtil._transpose_list_of_lists(separated_obs) |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
class Trajectory(NamedTuple): |
|
|
|
steps: List[AgentExperience] |
|
|
|
|
|
|
agent_buffer_trajectory = AgentBuffer() |
|
|
|
obs = self.steps[0].obs |
|
|
|
for step, exp in enumerate(self.steps): |
|
|
|
if step < len(self.steps) - 1: |
|
|
|
is_last_step = step == len(self.steps) - 1 |
|
|
|
if not is_last_step: |
|
|
|
next_obs = self.steps[step + 1].obs |
|
|
|
else: |
|
|
|
next_obs = self.next_obs |
|
|
|
|
|
|
agent_buffer_trajectory[ObsUtil.get_name_at(i)].append(obs[i]) |
|
|
|
agent_buffer_trajectory[ObsUtil.get_name_at_next(i)].append(next_obs[i]) |
|
|
|
|
|
|
|
# Take care of teammate obs and actions |
|
|
|
teammate_continuous_actions, teammate_discrete_actions, teammate_rewards = ( |
|
|
|
[], |
|
|
|
[], |
|
|
|
[], |
|
|
|
) |
|
|
|
for teammate_status in exp.teammate_status: |
|
|
|
teammate_rewards.append(teammate_status.reward) |
|
|
|
teammate_continuous_actions.append(teammate_status.action.continuous) |
|
|
|
teammate_discrete_actions.append(teammate_status.action.discrete) |
|
|
|
|
|
|
|
# Team actions |
|
|
|
agent_buffer_trajectory["team_continuous_action"].append( |
|
|
|
teammate_continuous_actions |
|
|
|
) |
|
|
|
agent_buffer_trajectory["team_discrete_action"].append( |
|
|
|
teammate_discrete_actions |
|
|
|
) |
|
|
|
agent_buffer_trajectory["team_rewards"].append(teammate_rewards) |
|
|
|
team_reward = teammate_rewards + [exp.reward] |
|
|
|
agent_buffer_trajectory["average_team_reward"].append( |
|
|
|
sum(team_reward) / len(team_reward) |
|
|
|
) |
|
|
|
|
|
|
|
# Next actions |
|
|
|
teammate_cont_next_actions = [] |
|
|
|
teammate_disc_next_actions = [] |
|
|
|
if not is_last_step: |
|
|
|
next_exp = self.steps[step + 1] |
|
|
|
for teammate_status in next_exp.teammate_status: |
|
|
|
teammate_cont_next_actions.append(teammate_status.action.continuous) |
|
|
|
teammate_disc_next_actions.append(teammate_status.action.discrete) |
|
|
|
else: |
|
|
|
for teammate_status in exp.teammate_status: |
|
|
|
teammate_cont_next_actions.append(teammate_status.action.continuous) |
|
|
|
teammate_disc_next_actions.append(teammate_status.action.discrete) |
|
|
|
|
|
|
|
agent_buffer_trajectory["team_next_continuous_action"].append( |
|
|
|
teammate_cont_next_actions |
|
|
|
) |
|
|
|
agent_buffer_trajectory["team_next_discrete_action"].append( |
|
|
|
teammate_disc_next_actions |
|
|
|
) |
|
|
|
|
|
|
|
for _team_obs in exp.collab_obs: |
|
|
|
for _teammate_status in exp.teammate_status: |
|
|
|
ith_team_obs.append(_team_obs[i]) |
|
|
|
ith_team_obs.append(_teammate_status.obs[i]) |
|
|
|
ith_team_obs_next = [] |
|
|
|
if is_last_step: |
|
|
|
for _obs in self.next_collab_obs: |
|
|
|
ith_team_obs_next.append(_obs[i]) |
|
|
|
else: |
|
|
|
next_teammate_status = self.steps[step + 1].teammate_status |
|
|
|
for _teammate_status in next_teammate_status: |
|
|
|
# Assume teammates have same obs space |
|
|
|
ith_team_obs_next.append(_teammate_status.obs[i]) |
|
|
|
agent_buffer_trajectory[TeamObsUtil.get_name_at_next(i)].append( |
|
|
|
ith_team_obs_next |
|
|
|
) |
|
|
|
|
|
|
|
agent_buffer_trajectory["team_dones"].append( |
|
|
|
[_status.done for _status in exp.teammate_status] |
|
|
|
) |
|
|
|
|
|
|
|
cont_next_actions = np.zeros_like(exp.action.continuous) |
|
|
|
disc_next_actions = np.zeros_like(exp.action.discrete) |
|
|
|
|
|
|
|
if not is_last_step: |
|
|
|
next_action = self.steps[step + 1].action |
|
|
|
cont_next_actions = next_action.continuous |
|
|
|
disc_next_actions = next_action.discrete |
|
|
|
|
|
|
|
agent_buffer_trajectory["next_continuous_action"].append(cont_next_actions) |
|
|
|
agent_buffer_trajectory["next_discrete_action"].append(disc_next_actions) |
|
|
|
|
|
|
|
agent_buffer_trajectory["continuous_log_probs"].append( |
|
|
|
exp.action_probs.continuous |
|
|
|
) |
|
|
|
|
|
|
Returns true if trajectory is terminated with a Done. |
|
|
|
""" |
|
|
|
return self.steps[-1].done |
|
|
|
|
|
|
|
@property |
|
|
|
def teammate_dones_reached(self) -> bool: |
|
|
|
""" |
|
|
|
Returns true if all teammates are done at the end of the trajectory. |
|
|
|
Combine with done_reached to check if the whole team is done. |
|
|
|
""" |
|
|
|
return all(_status.done for _status in self.steps[-1].teammate_status) |
|
|
|
|
|
|
|
@property |
|
|
|
def interrupted(self) -> bool: |
|
|
|