|
|
|
|
|
|
from typing import List, Dict |
|
|
|
from typing import List, Dict, NamedTuple, Iterable |
|
|
|
from mlagents.envs.policy import Policy |
|
|
|
from mlagents.envs.trainer import Trainer |
|
|
|
|
|
|
|
|
|
|
|
class AgentExperience(NamedTuple): |
|
|
|
obs: List[np.array] |
|
|
|
reward: float |
|
|
|
done: bool |
|
|
|
action: np.array |
|
|
|
action_probs: np.array |
|
|
|
prev_action: np.array |
|
|
|
epsilon: float |
|
|
|
memory: np.array |
|
|
|
agent_id: str |
|
|
|
|
|
|
|
|
|
|
|
class Trajectory(NamedTuple): |
|
|
|
steps: Iterable[AgentExperience] |
|
|
|
next_step: AgentExperience # The next step after the trajectory. Used for GAE when time_horizon is reached. |
|
|
|
|
|
|
|
|
|
|
|
class AgentProcessorException(UnityException): |
|
|
|
|
|
|
Buffer also contains an update_buffer that corresponds to the buffer used when updating the model. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, policy: Policy): |
|
|
|
def __init__(self, trainer: Trainer): |
|
|
|
self.policy = policy |
|
|
|
self.policy = trainer.policy |
|
|
|
self.time_horizon: int = trainer.parameters["time_horizon"] |
|
|
|
self.trainer = trainer |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return "local_buffers :\n{0}".format( |
|
|
|
|
|
|
"{}_value_estimates".format(name) |
|
|
|
].append(value[idx][0]) |
|
|
|
|
|
|
|
if not next_info.local_done[next_idx]: |
|
|
|
agent_actions = self.processing_buffer[agent_id]["actions"] |
|
|
|
if ( |
|
|
|
next_info.local_done[next_idx] |
|
|
|
or len(agent_actions) > self.time_horizon |
|
|
|
) and len(agent_actions) > 0: |
|
|
|
trajectory = self.processing_buffer.agent_to_trajectory( |
|
|
|
agent_id, training_length=self.policy.sequence_length |
|
|
|
) |
|
|
|
self.trainer.process_trajectory(trajectory) |
|
|
|
elif not next_info.local_done[next_idx]: |
|
|
|
if agent_id not in self.episode_steps: |
|
|
|
self.episode_steps[agent_id] = 0 |
|
|
|
self.episode_steps[agent_id] += 1 |
|
|
|
|
|
|
batch_size=batch_size, training_length=training_length |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
def agent_to_trajectory( |
|
|
|
self, |
|
|
|
agent_id: str, |
|
|
|
key_list: List[str] = None, |
|
|
|
batch_size: int = None, |
|
|
|
training_length: int = None, |
|
|
|
) -> Trajectory: |
|
|
|
""" |
|
|
|
Creates a Trajectory containing the AgentExperiences belonging to agent agent_id. |
|
|
|
:param agent_id: The id of the agent which data will be appended |
|
|
|
:param key_list: The fields that must be added. If None: all fields will be appended. |
|
|
|
:param batch_size: The number of elements that must be appended. If None: All of them will be. |
|
|
|
:param training_length: The length of the samples that must be appended. If None: only takes one element. |
|
|
|
""" |
|
|
|
if key_list is None: |
|
|
|
key_list = self[agent_id].keys() |
|
|
|
if not self[agent_id].check_length(key_list): |
|
|
|
raise BufferException( |
|
|
|
"The length of the fields {0} for agent {1} were not of same length".format( |
|
|
|
key_list, agent_id |
|
|
|
) |
|
|
|
) |
|
|
|
# trajectory = Trajectory() |
|
|
|
trajectory_list: List[AgentExperience] = [] |
|
|
|
for _exp in range(self[agent_id].num_experiences): |
|
|
|
obs = [] |
|
|
|
|
|
|
|
if "vector_obs" in key_list: |
|
|
|
obs.append(self[agent_id]["vector_obs"][_exp]) |
|
|
|
memory = self[agent_id]["memory"][_exp] if "memory" in key_list else None |
|
|
|
# Assemble AgentExperience |
|
|
|
experience = AgentExperience( |
|
|
|
obs=obs, |
|
|
|
reward=self[agent_id]["environment_rewards"][_exp], |
|
|
|
done=self[agent_id]["done"][_exp], |
|
|
|
action=self[agent_id]["actions"][_exp], |
|
|
|
action_probs=self[agent_id]["action_probs"][_exp], |
|
|
|
prev_action=self[agent_id]["prev_action"][_exp], |
|
|
|
agent_id=agent_id, |
|
|
|
memory=memory, |
|
|
|
epsilon=self[agent_id]["random_normal_epsilon"][_exp], |
|
|
|
) |
|
|
|
trajectory_list.append(experience) |
|
|
|
trajectory = Trajectory(steps=trajectory_list, next_step=experience) |
|
|
|
return trajectory |
|
|
|
|
|
|
|
def append_all_agent_batch_to_update_buffer( |
|
|
|
self, |
|
|
|