Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

244 行
9.8 KiB

from typing import List, Dict
from collections import defaultdict
import numpy as np
from mlagents.trainers.buffer import AgentBuffer, BufferException
from mlagents.envs.policy import Policy
from mlagents.envs.exception import UnityException
from mlagents.envs.brain import BrainInfo
from mlagents.envs.action_info import ActionInfoOutputs
class AgentProcessorException(UnityException):
"""
Related to errors with the AgentProcessor.
"""
pass
class AgentProcessor:
"""
AgentProcessor contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.
Buffer also contains an update_buffer that corresponds to the buffer used when updating the model.
"""
def __init__(self, policy: Policy):
self.processing_buffer = ProcessingBuffer()
self.stats: Dict[str, List] = defaultdict(list)
# Note: this is needed until we switch to AgentExperiences as the data input type.
# We still need some info from the policy (memories, previous actions)
# that really should be gathered by the env-manager.
self.policy = policy
self.episode_steps: Dict[str, int] = {}
def __str__(self):
return "local_buffers :\n{0}".format(
"\n".join(
[
"\tagent {0} :{1}".format(k, str(self.processing_buffer[k]))
for k in self.processing_buffer.keys()
]
)
)
def reset_local_buffers(self) -> None:
"""
Resets all the local local_buffers
"""
agent_ids = list(self.processing_buffer.keys())
for k in agent_ids:
self.processing_buffer[k].reset_agent()
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_info: current BrainInfo.
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
"""
if take_action_outputs:
self.stats["Policy/Entropy"].append(take_action_outputs["entropy"].mean())
self.stats["Policy/Learning Rate"].append(
take_action_outputs["learning_rate"]
)
for name, values in take_action_outputs["value_heads"].items():
self.stats[name].append(np.mean(values))
for agent_id in curr_info.agents:
self.processing_buffer[agent_id].last_brain_info = curr_info
self.processing_buffer[
agent_id
].last_take_action_outputs = take_action_outputs
# Store the environment reward
tmp_environment = np.array(next_info.rewards)
for agent_id in next_info.agents:
stored_info = self.processing_buffer[agent_id].last_brain_info
stored_take_action_outputs = self.processing_buffer[
agent_id
].last_take_action_outputs
if stored_info is not None:
idx = stored_info.agents.index(agent_id)
next_idx = next_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
for i, _ in enumerate(stored_info.visual_observations):
self.processing_buffer[agent_id]["visual_obs%d" % i].append(
stored_info.visual_observations[i][idx]
)
self.processing_buffer[agent_id][
"next_visual_obs%d" % i
].append(next_info.visual_observations[i][next_idx])
if self.policy.use_vec_obs:
self.processing_buffer[agent_id]["vector_obs"].append(
stored_info.vector_observations[idx]
)
self.processing_buffer[agent_id]["next_vector_in"].append(
next_info.vector_observations[next_idx]
)
if self.policy.use_recurrent:
self.processing_buffer[agent_id]["memory"].append(
self.policy.retrieve_memories([agent_id])[0, :]
)
self.processing_buffer[agent_id]["masks"].append(1.0)
self.processing_buffer[agent_id]["done"].append(
next_info.local_done[next_idx]
)
# Add the outputs of the last eval
self.add_policy_outputs(stored_take_action_outputs, agent_id, idx)
# Store action masks if necessary. Eventually these will be
# None for continuous actions
if stored_info.action_masks[idx] is not None:
self.processing_buffer[agent_id]["action_mask"].append(
stored_info.action_masks[idx], padding_value=1
)
# TODO: This should be done by the env_manager, and put it in
# the AgentExperience
self.processing_buffer[agent_id]["prev_action"].append(
self.policy.retrieve_previous_action([agent_id])[0, :]
)
values = stored_take_action_outputs["value_heads"]
# Add the value outputs if needed
self.processing_buffer[agent_id]["environment_rewards"].append(
tmp_environment
)
for name, value in values.items():
self.processing_buffer[agent_id][
"{}_value_estimates".format(name)
].append(value[idx][0])
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int
) -> None:
"""
Takes the output of the last action and store it into the training buffer.
"""
actions = take_action_outputs["action"]
if self.policy.use_continuous_act:
actions_pre = take_action_outputs["pre_action"]
self.processing_buffer[agent_id]["actions_pre"].append(
actions_pre[agent_idx]
)
epsilons = take_action_outputs["random_normal_epsilon"]
self.processing_buffer[agent_id]["random_normal_epsilon"].append(
epsilons[agent_idx]
)
a_dist = take_action_outputs["log_probs"]
# value is a dictionary from name of reward to value estimate of the value head
self.processing_buffer[agent_id]["actions"].append(actions[agent_idx])
self.processing_buffer[agent_id]["action_probs"].append(a_dist[agent_idx])
def process_experiences(self):
pass
class ProcessingBuffer(dict):
"""
ProcessingBuffer contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.
"""
def __str__(self):
return "local_buffers :\n{0}".format(
"\n".join(["\tagent {0} :{1}".format(k, str(self[k])) for k in self.keys()])
)
def __getitem__(self, key):
if key not in self.keys():
self[key] = AgentBuffer()
return super().__getitem__(key)
def reset_local_buffers(self) -> None:
"""
Resets all the local AgentBuffers.
"""
for buf in self.values():
buf.reset_agent()
def append_to_update_buffer(
self,
update_buffer: AgentBuffer,
agent_id: str,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of an agent to the update buffer.
:param update_buffer: A reference to an AgentBuffer to append the agent's buffer to
:param agent_id: The id of the agent which data will be appended
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
if key_list is None:
key_list = self[agent_id].keys()
if not self[agent_id].check_length(key_list):
raise BufferException(
"The length of the fields {0} for agent {1} were not of same length".format(
key_list, agent_id
)
)
for field_key in key_list:
update_buffer[field_key].extend(
self[agent_id][field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
)
def append_all_agent_batch_to_update_buffer(
self,
update_buffer: AgentBuffer,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of all agents to the update buffer.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
for agent_id in self.keys():
self.append_to_update_buffer(
update_buffer, agent_id, key_list, batch_size, training_length
)