浏览代码

Use ProcessingBuffer in AgentProcessor

/develop-newnormalization
Ervin Teng 5 年前
当前提交
f008dac0
共有 1 个文件被更改,包括 130 次插入6 次删除
  1. 136
      ml-agents/mlagents/trainers/agent_processor.py

136
ml-agents/mlagents/trainers/agent_processor.py


from typing import List, Dict, DefaultDict
from typing import List, Dict
import numpy as np
from mlagents.envs.brain import BrainInfo
from mlagents.envs.action_info import ActionInfoOutputs
class AgentProcessorException(UnityException):

"""
def __init__(self, policy: Policy):
self.agent_buffers: DefaultDict[str, AgentBuffer] = defaultdict(AgentBuffer)
self.processing_buffer = ProcessingBuffer()
self.stats: Dict[str, List] = defaultdict(list)
# Note: this is needed until we switch to AgentExperiences as the data input type.
# We still need some info from the policy (memories, previous actions)

return "local_buffers :\n{0}".format(
"\n".join(
[
"\tagent {0} :{1}".format(k, str(self.agent_buffers[k]))
for k in self.agent_buffers.keys()
"\tagent {0} :{1}".format(k, str(self.processing_buffer[k]))
for k in self.processing_buffer.keys()
]
)
)

Resets all the local local_buffers
"""
agent_ids = list(self.agent_buffers.keys())
agent_ids = list(self.processing_buffer.keys())
self.agent_buffers[k].reset_agent()
self.processing_buffer[k].reset_agent()
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
if take_action_outputs:
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean())
self.stats["Policy/Learning Rate"].append(
take_action_outputs["learning_rate"]
)
for name, values in take_action_outputs["value_heads"].items():
self.stats[name].append(np.mean(values))
for agent_id in curr_info.agents:
self.processing_buffer[agent_id].last_brain_info = curr_info
self.processing_buffer[
agent_id
].last_take_action_outputs = take_action_outputs
# Store the environment reward
tmp_environment = np.array(next_info.rewards)
for agent_id in next_info.agents:
stored_info = self.processing_buffer[agent_id].last_brain_info
stored_take_action_outputs = self.processing_buffer[
agent_id
].last_take_action_outputs
if stored_info is not None:
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
self.processing_buffer[agent_id]["visual_obs%d" % i].append(
stored_info.visual_observations[i][idx]
)
self.processing_buffer[agent_id][
"next_visual_obs%d" % i
].append(next_info.visual_observations[i][next_idx])
if self.policy.use_vec_obs:
self.processing_buffer[agent_id]["vector_obs"].append(
stored_info.vector_observations[idx]
)
self.processing_buffer[agent_id]["next_vector_in"].append(
next_info.vector_observations[next_idx]
)
if self.policy.use_recurrent:
self.processing_buffer[agent_id]["memory"].append(
self.policy.retrieve_memories([agent_id])[0, :]
)
self.processing_buffer[agent_id]["masks"].append(1.0)
self.processing_buffer[agent_id]["done"].append(
next_info.local_done[next_idx]
)
# Add the outputs of the last eval
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx)
# Store action masks if necessary. Eventually these will be
# None for continuous actions
if stored_info.action_masks[idx] is not None:
self.processing_buffer[agent_id]["action_mask"].append(
stored_info.action_masks[idx], padding_value=1
)
# TODO: This should be done by the env_manager, and put it in
# the AgentExperience
self.processing_buffer[agent_id]["prev_action"].append(
self.policy.retrieve_previous_action([agent_id])[0, :]
)
values = stored_take_action_outputs["value_heads"]
# Add the value outputs if needed
self.processing_buffer[agent_id]["environment_rewards"].append(
tmp_environment
)
for name, value in values.items():
self.processing_buffer[agent_id][
"{}_value_estimates".format(name)
].append(value[idx][0])
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
"""
actions = take_action_outputs["action"]
if self.policy.use_continuous_act:
actions_pre = take_action_outputs["pre_action"]
self.processing_buffer[agent_id]["actions_pre"].append(
actions_pre[agent_idx]
)
epsilons = take_action_outputs["random_normal_epsilon"]
self.processing_buffer[agent_id]["random_normal_epsilon"].append(
epsilons[agent_idx]
)
a_dist = take_action_outputs["log_probs"]
# value is a dictionary from name of reward to value estimate of the value head
self.processing_buffer[agent_id]["actions"].append(actions[agent_idx])
self.processing_buffer[agent_id]["action_probs"].append(a_dist[agent_idx])
def process_experiences(self):
pass
class ProcessingBuffer(dict):

正在加载...
取消
保存