浏览代码

Use new trajectory

/develop/centralizedcritic/nonego
Ervin Teng 4 年前
当前提交
92fc78a5
共有 3 个文件被更改,包括 249 次插入22 次删除
  1. 46
      ml-agents/mlagents/trainers/agent_processor.py
  2. 100
      ml-agents/mlagents/trainers/torch/agent_action.py
  3. 125
      ml-agents/mlagents/trainers/trajectory.py

46
ml-agents/mlagents/trainers/agent_processor.py


StatsAggregationMethod,
EnvironmentStats,
)
from mlagents.trainers.trajectory import Trajectory, AgentExperience
from mlagents.trainers.trajectory import TeammateStatus, Trajectory, AgentExperience
from mlagents.trainers.policy import Policy
from mlagents.trainers.action_info import ActionInfo, ActionInfoOutputs
from mlagents.trainers.torch.action_log_probs import LogProbsTuple

self.experience_buffers: Dict[str, List[AgentExperience]] = defaultdict(list)
self.last_step_result: Dict[str, Tuple[DecisionStep, int]] = {}
# current_group_obs is used to collect the last seen obs of all the agents in the same group,
# and assemble the next_collab_obs.
# and assemble the collab_obs.
self.last_group_obs: Dict[str, Dict[str, List[np.ndarray]]] = defaultdict(
lambda: defaultdict(list)
self.teammate_status: Dict[str, Dict[str, TeammateStatus]] = defaultdict(
lambda: defaultdict(None)
)
# last_take_action_outputs stores the action a_t taken before the current observation s_(t+1), while
# grabbing previous_action from the policy grabs the action PRIOR to that, a_(t-1).

self, step: Union[TerminalStep, DecisionStep], global_id: str
) -> None:
stored_decision_step, idx = self.last_step_result.get(global_id, (None, None))
if stored_decision_step is not None:
stored_take_action_outputs = self.last_take_action_outputs.get(global_id, None)
if stored_decision_step is not None and stored_take_action_outputs is not None:
self.last_group_obs[step.team_manager_id][
global_id
] = stored_decision_step.obs
stored_actions = stored_take_action_outputs["action"]
action_tuple = ActionTuple(
continuous=stored_actions.continuous[idx],
discrete=stored_actions.discrete[idx],
)
teammate_status = TeammateStatus(
obs=stored_decision_step.obs,
reward=step.reward,
action=action_tuple,
done=isinstance(step, TerminalStep),
)
self.teammate_status[step.team_manager_id][global_id] = teammate_status
for _manager_id, _team_group in self.current_group_obs.items():
self._safe_delete(_team_group, global_id)
if not _team_group: # if dict is empty
self._safe_delete(_team_group, _manager_id)
for _manager_id, _team_group in self.last_group_obs.items():
self._safe_delete(_team_group, global_id)
self._delete_in_nested_dict(self.current_group_obs, global_id)
self._delete_in_nested_dict(self.teammate_status, global_id)
def _delete_in_nested_dict(self, nested_dict, key):
for _manager_id, _team_group in nested_dict.items():
self._safe_delete(_team_group, key)
if not _team_group: # if dict is empty
self._safe_delete(_team_group, _manager_id)

prev_action = self.policy.retrieve_previous_action([global_id])[0, :]
# Assemble teammate_obs. If none saved, then it will be an empty list.
collab_obs = []
for _id, _obs in self.last_group_obs[step.team_manager_id].items():
teammate_statuses = []
for _id, _obs in self.teammate_status[step.team_manager_id].items():
collab_obs.append(_obs)
teammate_statuses.append(_obs)
collab_obs=collab_obs,
teammate_status=teammate_statuses,
reward=step.reward,
done=done,
action=action_tuple,

100
ml-agents/mlagents/trainers/torch/agent_action.py


from typing import List, Optional, NamedTuple, Dict
import itertools
from mlagents.trainers.buffer import AgentBuffer
class AgentAction(NamedTuple):

return action_tuple
@staticmethod
def _padded_time_to_batch(
agent_buffer_field: AgentBuffer.AgentBufferField,
dtype: torch.dtype = torch.float32,
) -> List[torch.Tensor]:
"""
Pad actions and convert to tensor. Note that data is padded by 0's, not NaNs
as the observations are.
"""
action_shape = None
for _action in agent_buffer_field:
if _action:
action_shape = _action[0].shape
break
# If there were no critic obs at all
if action_shape is None:
return []
new_list = list(
map(
lambda x: ModelUtils.list_to_tensor(x, dtype=dtype),
itertools.zip_longest(
*agent_buffer_field, fillvalue=np.full(action_shape, 0)
),
)
)
return new_list
@staticmethod
def from_dict(buff: Dict[str, np.ndarray]) -> "AgentAction":
"""
A static method that accesses continuous and discrete action fields in an AgentBuffer

discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])
]
return AgentAction(continuous, discrete)
@staticmethod
def from_dict_next(buff: Dict[str, np.ndarray]) -> "AgentAction":
"""
A static method that accesses continuous and discrete action fields in an AgentBuffer
and constructs the corresponding AgentAction from the retrieved np arrays.
"""
continuous: torch.Tensor = None
discrete: List[torch.Tensor] = None # type: ignore
if "next_continuous_action" in buff:
continuous = ModelUtils.list_to_tensor(buff["continuous_action"])
if "next_discrete_action" in buff:
discrete_tensor = ModelUtils.list_to_tensor(
buff["discrete_action"], dtype=torch.long
)
discrete = [
discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])
]
return AgentAction(continuous, discrete)
@staticmethod
def _from_team_dict(
buff: Dict[str, np.ndarray], cont_action_key: str, disc_action_key: str
) -> List["AgentAction"]:
continuous_tensors: List[torch.Tensor] = []
discrete_tensors: List[torch.Tensor] = [] # type: ignore
if cont_action_key in buff:
continuous_tensors = AgentAction._padded_time_to_batch(
buff[cont_action_key]
)
if disc_action_key in buff:
discrete_tensors = AgentAction._padded_time_to_batch(
buff[disc_action_key], dtype=torch.long
)
actions_list = []
for _cont, _disc in itertools.zip_longest(
continuous_tensors, discrete_tensors, fillvalue=None
):
if _disc is not None:
_disc = [_disc[..., i] for i in range(_disc.shape[-1])]
actions_list.append(AgentAction(_cont, _disc))
return actions_list
@staticmethod
def from_team_dict(buff: Dict[str, np.ndarray]) -> List["AgentAction"]:
"""
A static method that accesses continuous and discrete action fields in an AgentBuffer
and constructs the corresponding AgentAction from the retrieved np arrays.
"""
return AgentAction._from_team_dict(
buff, "team_continuous_action", "team_discrete_action"
)
@staticmethod
def from_team_dict_next(buff: Dict[str, np.ndarray]) -> List["AgentAction"]:
"""
A static method that accesses next continuous and discrete action fields in an AgentBuffer
and constructs the corresponding AgentAction from the retrieved np arrays.
"""
return AgentAction._from_team_dict(
buff, "team_next_continuous_action", "team_next_discrete_action"
)
def to_flat(self, discrete_branches: List[int]) -> torch.Tensor:
discrete_oh = ModelUtils.actions_to_onehot(
self.discrete_tensor, discrete_branches
)
discrete_oh = torch.cat(discrete_oh, dim=1)
return torch.cat([self.continuous_tensor, discrete_oh], dim=-1)

125
ml-agents/mlagents/trainers/trajectory.py


@attr.s(auto_attribs=True)
class TeammateStatus:
"""
Stores data related to an agent's teammate.
"""
obs: List[np.ndarray]
reward: float
action: ActionTuple
done: bool
@attr.s(auto_attribs=True)
collab_obs: List[List[np.ndarray]]
teammate_status: List[TeammateStatus]
reward: float
done: bool
action: ActionTuple

return f"team_obs_{index}"
@staticmethod
def get_name_at_next(index: int) -> str:
"""
returns the name of the next team observation given the index of the observation
"""
return f"team_obs_next_{index}"
@staticmethod
def _padded_time_to_batch(
agent_buffer_field: AgentBuffer.AgentBufferField,
) -> List[np.ndarray]:

result = TeamObsUtil._transpose_list_of_lists(separated_obs)
return result
@staticmethod
def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]:
"""
Creates the list of observations from an AgentBuffer
"""
separated_obs: List[np.array] = []
for i in range(num_obs):
separated_obs.append(
TeamObsUtil._padded_time_to_batch(
batch[TeamObsUtil.get_name_at_next(i)]
)
)
# separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip
# that and get a List(num_agents) of Lists(num_obs)
result = TeamObsUtil._transpose_list_of_lists(separated_obs)
return result
class Trajectory(NamedTuple):
steps: List[AgentExperience]

agent_buffer_trajectory = AgentBuffer()
obs = self.steps[0].obs
for step, exp in enumerate(self.steps):
if step < len(self.steps) - 1:
is_last_step = step == len(self.steps) - 1
if not is_last_step:
next_obs = self.steps[step + 1].obs
else:
next_obs = self.next_obs

agent_buffer_trajectory[ObsUtil.get_name_at(i)].append(obs[i])
agent_buffer_trajectory[ObsUtil.get_name_at_next(i)].append(next_obs[i])
# Take care of teammate obs and actions
teammate_continuous_actions, teammate_discrete_actions, teammate_rewards = (
[],
[],
[],
)
for teammate_status in exp.teammate_status:
teammate_rewards.append(teammate_status.reward)
teammate_continuous_actions.append(teammate_status.action.continuous)
teammate_discrete_actions.append(teammate_status.action.discrete)
# Team actions
agent_buffer_trajectory["team_continuous_action"].append(
teammate_continuous_actions
)
agent_buffer_trajectory["team_discrete_action"].append(
teammate_discrete_actions
)
agent_buffer_trajectory["team_rewards"].append(teammate_rewards)
team_reward = teammate_rewards + [exp.reward]
agent_buffer_trajectory["average_team_reward"].append(
sum(team_reward) / len(team_reward)
)
# Next actions
teammate_cont_next_actions = []
teammate_disc_next_actions = []
if not is_last_step:
next_exp = self.steps[step + 1]
for teammate_status in next_exp.teammate_status:
teammate_cont_next_actions.append(teammate_status.action.continuous)
teammate_disc_next_actions.append(teammate_status.action.discrete)
else:
for teammate_status in exp.teammate_status:
teammate_cont_next_actions.append(teammate_status.action.continuous)
teammate_disc_next_actions.append(teammate_status.action.discrete)
agent_buffer_trajectory["team_next_continuous_action"].append(
teammate_cont_next_actions
)
agent_buffer_trajectory["team_next_discrete_action"].append(
teammate_disc_next_actions
)
for _team_obs in exp.collab_obs:
for _teammate_status in exp.teammate_status:
ith_team_obs.append(_team_obs[i])
ith_team_obs.append(_teammate_status.obs[i])
ith_team_obs_next = []
if is_last_step:
for _obs in self.next_collab_obs:
ith_team_obs_next.append(_obs[i])
else:
next_teammate_status = self.steps[step + 1].teammate_status
for _teammate_status in next_teammate_status:
# Assume teammates have same obs space
ith_team_obs_next.append(_teammate_status.obs[i])
agent_buffer_trajectory[TeamObsUtil.get_name_at_next(i)].append(
ith_team_obs_next
)
agent_buffer_trajectory["team_dones"].append(
[_status.done for _status in exp.teammate_status]
)
cont_next_actions = np.zeros_like(exp.action.continuous)
disc_next_actions = np.zeros_like(exp.action.discrete)
if not is_last_step:
next_action = self.steps[step + 1].action
cont_next_actions = next_action.continuous
disc_next_actions = next_action.discrete
agent_buffer_trajectory["next_continuous_action"].append(cont_next_actions)
agent_buffer_trajectory["next_discrete_action"].append(disc_next_actions)
agent_buffer_trajectory["continuous_log_probs"].append(
exp.action_probs.continuous
)

Returns true if trajectory is terminated with a Done.
"""
return self.steps[-1].done
@property
def teammate_dones_reached(self) -> bool:
"""
Returns true if all teammates are done at the end of the trajectory.
Combine with done_reached to check if the whole team is done.
"""
return all(_status.done for _status in self.steps[-1].teammate_status)
@property
def interrupted(self) -> bool:

正在加载...
取消
保存