|
|
|
|
|
|
from typing import List |
|
|
|
from typing import List, Dict, DefaultDict |
|
|
|
from mlagents.envs.policy import Policy |
|
|
|
from mlagents.envs.exception import UnityException |
|
|
|
from mlagents.envs.brain import BrainInfo |
|
|
|
from mlagents.envs.action_info import ActionInfoOutputs |
|
|
|
|
|
|
Buffer also contains an update_buffer that corresponds to the buffer used when updating the model. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
self.agent_buffers = defaultdict(AgentBuffer) |
|
|
|
def __init__(self, policy: Policy): |
|
|
|
self.agent_buffers: DefaultDict[str, AgentBuffer] = defaultdict(AgentBuffer) |
|
|
|
self.stats: Dict[str, List] = defaultdict(list) |
|
|
|
# Note: this is needed until we switch to AgentExperiences as the data input type. |
|
|
|
# We still need some info from the policy (memories, previous actions) |
|
|
|
# that really should be gathered by the env-manager. |
|
|
|
self.policy = policy |
|
|
|
self.episode_steps: Dict[str, int] = {} |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return "local_buffers :\n{0}".format( |
|
|
|
|
|
|
:param next_info: next BrainInfo. |
|
|
|
:param take_action_outputs: The outputs of the Policy's get_action method. |
|
|
|
""" |
|
|
|
self.trainer_metrics.start_experience_collection_timer() |
|
|
|
for name, signal in self.policy.reward_signals.items(): |
|
|
|
self.stats[signal.value_name].append( |
|
|
|
np.mean(take_action_outputs["value_heads"][name]) |
|
|
|
) |
|
|
|
for name, values in take_action_outputs["value_heads"].items(): |
|
|
|
self.stats[name].append(np.mean(values)) |
|
|
|
|
|
|
|
for agent_id in curr_info.agents: |
|
|
|
self.agent_buffers[agent_id].last_brain_info = curr_info |
|
|
|
|
|
|
) |
|
|
|
# Add the outputs of the last eval |
|
|
|
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx) |
|
|
|
# Store action masks if necessary |
|
|
|
if not self.policy.use_continuous_act: |
|
|
|
|
|
|
|
# Store action masks if necessary. Eventually these will be |
|
|
|
# None for continuous actions |
|
|
|
if stored_info.action_masks[idx] is not None: |
|
|
|
|
|
|
|
# TODO: This should be done by the env_manager, and put it in |
|
|
|
# the AgentExperience |
|
|
|
self.agent_buffers[agent_id]["prev_action"].append( |
|
|
|
self.policy.retrieve_previous_action([agent_id])[0, :] |
|
|
|
) |
|
|
|
|
|
|
self.policy.save_previous_action( |
|
|
|
curr_info.agents, take_action_outputs["action"] |
|
|
|
) |
|
|
|
|
|
|
|
def add_policy_outputs( |
|
|
|
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Takes the output of the last action and store it into the training buffer. |
|
|
|
""" |
|
|
|
actions = take_action_outputs["action"] |
|
|
|
if self.policy.use_continuous_act: |
|
|
|
actions_pre = take_action_outputs["pre_action"] |
|
|
|
self.agent_buffers[agent_id]["actions_pre"].append(actions_pre[agent_idx]) |
|
|
|
epsilons = take_action_outputs["random_normal_epsilon"] |
|
|
|
self.agent_buffers[agent_id]["random_normal_epsilon"].append( |
|
|
|
epsilons[agent_idx] |
|
|
|
) |
|
|
|
a_dist = take_action_outputs["log_probs"] |
|
|
|
# value is a dictionary from name of reward to value estimate of the value head |
|
|
|
self.agent_buffers[agent_id]["actions"].append(actions[agent_idx]) |
|
|
|
self.agent_buffers[agent_id]["action_probs"].append(a_dist[agent_idx]) |
|
|
|
|
|
|
|
def process_experiences(self): |
|
|
|
pass |