|
|
|
|
|
|
from typing import List, Dict, DefaultDict |
|
|
|
from typing import List, Dict |
|
|
|
import numpy as np |
|
|
|
from mlagents.envs.brain import BrainInfo |
|
|
|
from mlagents.envs.action_info import ActionInfoOutputs |
|
|
|
|
|
|
|
|
|
|
|
class AgentProcessorException(UnityException): |
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, policy: Policy): |
|
|
|
self.agent_buffers: DefaultDict[str, AgentBuffer] = defaultdict(AgentBuffer) |
|
|
|
self.processing_buffer = ProcessingBuffer() |
|
|
|
self.stats: Dict[str, List] = defaultdict(list) |
|
|
|
# Note: this is needed until we switch to AgentExperiences as the data input type. |
|
|
|
# We still need some info from the policy (memories, previous actions) |
|
|
|
|
|
|
return "local_buffers :\n{0}".format( |
|
|
|
"\n".join( |
|
|
|
[ |
|
|
|
"\tagent {0} :{1}".format(k, str(self.agent_buffers[k])) |
|
|
|
for k in self.agent_buffers.keys() |
|
|
|
"\tagent {0} :{1}".format(k, str(self.processing_buffer[k])) |
|
|
|
for k in self.processing_buffer.keys() |
|
|
|
] |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
Resets all the local local_buffers |
|
|
|
""" |
|
|
|
agent_ids = list(self.agent_buffers.keys()) |
|
|
|
agent_ids = list(self.processing_buffer.keys()) |
|
|
|
self.agent_buffers[k].reset_agent() |
|
|
|
self.processing_buffer[k].reset_agent() |
|
|
|
|
|
|
|
def add_experiences( |
|
|
|
self, |
|
|
|
curr_info: BrainInfo, |
|
|
|
next_info: BrainInfo, |
|
|
|
take_action_outputs: ActionInfoOutputs, |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Adds experiences to each agent's experience history. |
|
|
|
:param curr_info: current BrainInfo. |
|
|
|
:param next_info: next BrainInfo. |
|
|
|
:param take_action_outputs: The outputs of the Policy's get_action method. |
|
|
|
""" |
|
|
|
if take_action_outputs: |
|
|
|
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean()) |
|
|
|
self.stats["Policy/Learning Rate"].append( |
|
|
|
take_action_outputs["learning_rate"] |
|
|
|
) |
|
|
|
for name, values in take_action_outputs["value_heads"].items(): |
|
|
|
self.stats[name].append(np.mean(values)) |
|
|
|
|
|
|
|
for agent_id in curr_info.agents: |
|
|
|
self.processing_buffer[agent_id].last_brain_info = curr_info |
|
|
|
self.processing_buffer[ |
|
|
|
agent_id |
|
|
|
].last_take_action_outputs = take_action_outputs |
|
|
|
|
|
|
|
# Store the environment reward |
|
|
|
tmp_environment = np.array(next_info.rewards) |
|
|
|
|
|
|
|
for agent_id in next_info.agents: |
|
|
|
stored_info = self.processing_buffer[agent_id].last_brain_info |
|
|
|
stored_take_action_outputs = self.processing_buffer[ |
|
|
|
agent_id |
|
|
|
].last_take_action_outputs |
|
|
|
if stored_info is not None: |
|
|
|
idx = stored_info.agents.index(agent_id) |
|
|
|
next_idx = next_info.agents.index(agent_id) |
|
|
|
if not stored_info.local_done[idx]: |
|
|
|
for i, _ in enumerate(stored_info.visual_observations): |
|
|
|
self.processing_buffer[agent_id]["visual_obs%d" % i].append( |
|
|
|
stored_info.visual_observations[i][idx] |
|
|
|
) |
|
|
|
self.processing_buffer[agent_id][ |
|
|
|
"next_visual_obs%d" % i |
|
|
|
].append(next_info.visual_observations[i][next_idx]) |
|
|
|
if self.policy.use_vec_obs: |
|
|
|
self.processing_buffer[agent_id]["vector_obs"].append( |
|
|
|
stored_info.vector_observations[idx] |
|
|
|
) |
|
|
|
self.processing_buffer[agent_id]["next_vector_in"].append( |
|
|
|
next_info.vector_observations[next_idx] |
|
|
|
) |
|
|
|
if self.policy.use_recurrent: |
|
|
|
self.processing_buffer[agent_id]["memory"].append( |
|
|
|
self.policy.retrieve_memories([agent_id])[0, :] |
|
|
|
) |
|
|
|
|
|
|
|
self.processing_buffer[agent_id]["masks"].append(1.0) |
|
|
|
self.processing_buffer[agent_id]["done"].append( |
|
|
|
next_info.local_done[next_idx] |
|
|
|
) |
|
|
|
# Add the outputs of the last eval |
|
|
|
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx) |
|
|
|
|
|
|
|
# Store action masks if necessary. Eventually these will be |
|
|
|
# None for continuous actions |
|
|
|
if stored_info.action_masks[idx] is not None: |
|
|
|
self.processing_buffer[agent_id]["action_mask"].append( |
|
|
|
stored_info.action_masks[idx], padding_value=1 |
|
|
|
) |
|
|
|
|
|
|
|
# TODO: This should be done by the env_manager, and put it in |
|
|
|
# the AgentExperience |
|
|
|
self.processing_buffer[agent_id]["prev_action"].append( |
|
|
|
self.policy.retrieve_previous_action([agent_id])[0, :] |
|
|
|
) |
|
|
|
|
|
|
|
values = stored_take_action_outputs["value_heads"] |
|
|
|
|
|
|
|
# Add the value outputs if needed |
|
|
|
self.processing_buffer[agent_id]["environment_rewards"].append( |
|
|
|
tmp_environment |
|
|
|
) |
|
|
|
|
|
|
|
for name, value in values.items(): |
|
|
|
self.processing_buffer[agent_id][ |
|
|
|
"{}_value_estimates".format(name) |
|
|
|
].append(value[idx][0]) |
|
|
|
|
|
|
|
if not next_info.local_done[next_idx]: |
|
|
|
if agent_id not in self.episode_steps: |
|
|
|
self.episode_steps[agent_id] = 0 |
|
|
|
self.episode_steps[agent_id] += 1 |
|
|
|
self.policy.save_previous_action( |
|
|
|
curr_info.agents, take_action_outputs["action"] |
|
|
|
) |
|
|
|
|
|
|
|
def add_policy_outputs( |
|
|
|
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Takes the output of the last action and store it into the training buffer. |
|
|
|
""" |
|
|
|
actions = take_action_outputs["action"] |
|
|
|
if self.policy.use_continuous_act: |
|
|
|
actions_pre = take_action_outputs["pre_action"] |
|
|
|
self.processing_buffer[agent_id]["actions_pre"].append( |
|
|
|
actions_pre[agent_idx] |
|
|
|
) |
|
|
|
epsilons = take_action_outputs["random_normal_epsilon"] |
|
|
|
self.processing_buffer[agent_id]["random_normal_epsilon"].append( |
|
|
|
epsilons[agent_idx] |
|
|
|
) |
|
|
|
a_dist = take_action_outputs["log_probs"] |
|
|
|
# value is a dictionary from name of reward to value estimate of the value head |
|
|
|
self.processing_buffer[agent_id]["actions"].append(actions[agent_idx]) |
|
|
|
self.processing_buffer[agent_id]["action_probs"].append(a_dist[agent_idx]) |
|
|
|
|
|
|
|
def process_experiences(self): |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class ProcessingBuffer(dict): |
|
|
|