Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

141 行
5.6 KiB

# # Unity ML-Agents Toolkit
# ## ML-Agent Learning (Behavioral Cloning)
# Contains an implementation of Behavioral Cloning Algorithm
import logging
import numpy as np
from mlagents.envs.brain import BrainInfo
from mlagents.envs.action_info import ActionInfoOutputs
from mlagents.trainers.bc.policy import BCPolicy
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.trainers.trainer import Trainer
logger = logging.getLogger("mlagents.trainers")
class BCTrainer(Trainer):
"""The BCTrainer is an implementation of Behavioral Cloning."""
def __init__(self, brain, trainer_parameters, training, load, seed, run_id):
"""
Responsible for collecting experiences and training PPO model.
:param trainer_parameters: The parameters for the trainer (dictionary).
:param training: Whether the trainer is set for training.
:param load: Whether the model should be loaded.
:param seed: The seed the model will be initialized with
:param run_id: The identifier of the current run
"""
super(BCTrainer, self).__init__(brain, trainer_parameters, training, run_id)
self.policy = BCPolicy(seed, brain, trainer_parameters, load)
self.n_sequences = 1
self.cumulative_rewards = {}
self.episode_steps = {}
self.stats = {
"Losses/Cloning Loss": [],
"Environment/Episode Length": [],
"Environment/Cumulative Reward": [],
}
self.batches_per_epoch = trainer_parameters["batches_per_epoch"]
self.demonstration_buffer = AgentBuffer()
self.evaluation_buffer = ProcessingBuffer()
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
) -> None:
"""
Adds experiences to each agent's experience history.
:param curr_info: Current BrainInfo
:param next_info: Next BrainInfo
:param take_action_outputs: The outputs of the take action method.
"""
# Used to collect information about student performance.
for agent_id in curr_info.agents:
self.evaluation_buffer[agent_id].last_brain_info = curr_info
for agent_id in next_info.agents:
stored_next_info = self.evaluation_buffer[agent_id].last_brain_info
if stored_next_info is None:
continue
else:
next_idx = next_info.agents.index(agent_id)
if agent_id not in self.cumulative_rewards:
self.cumulative_rewards[agent_id] = 0
self.cumulative_rewards[agent_id] += next_info.rewards[next_idx]
if not next_info.local_done[next_idx]:
if agent_id not in self.episode_steps:
self.episode_steps[agent_id] = 0
self.episode_steps[agent_id] += 1
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
"""
Checks agent histories for processing condition, and processes them as necessary.
Processing involves calculating value and advantage targets for model updating step.
:param current_info: Current BrainInfo
:param next_info: Next BrainInfo
"""
for l in range(len(next_info.agents)):
if next_info.local_done[l]:
agent_id = next_info.agents[l]
self.stats["Environment/Cumulative Reward"].append(
self.cumulative_rewards.get(agent_id, 0)
)
self.stats["Environment/Episode Length"].append(
self.episode_steps.get(agent_id, 0)
)
self.reward_buffer.appendleft(self.cumulative_rewards.get(agent_id, 0))
self.cumulative_rewards[agent_id] = 0
self.episode_steps[agent_id] = 0
def end_episode(self):
"""
A signal that the Episode has ended. The buffer must be reset.
Get only called when the academy resets.
"""
self.evaluation_buffer.reset_local_buffers()
for agent_id in self.cumulative_rewards:
self.cumulative_rewards[agent_id] = 0
for agent_id in self.episode_steps:
self.episode_steps[agent_id] = 0
def is_ready_update(self):
"""
Returns whether or not the trainer has enough elements to run update model
:return: A boolean corresponding to whether or not update_model() can be run
"""
return self.demonstration_buffer.num_experiences > self.n_sequences
def update_policy(self):
"""
Updates the policy.
"""
self.demonstration_buffer.shuffle(self.policy.sequence_length)
batch_losses = []
batch_size = self.n_sequences * self.policy.sequence_length
# We either divide the entire buffer into num_batches batches, or limit the number
# of batches to batches_per_epoch.
num_batches = min(
self.demonstration_buffer.num_experiences // batch_size,
self.batches_per_epoch,
)
for i in range(0, num_batches * batch_size, batch_size):
update_buffer = self.demonstration_buffer
mini_batch = update_buffer.make_mini_batch(i, i + batch_size)
run_out = self.policy.update(mini_batch, self.n_sequences)
loss = run_out["policy_loss"]
batch_losses.append(loss)
if len(batch_losses) > 0:
self.stats["Losses/Cloning Loss"].append(np.mean(batch_losses))
else:
self.stats["Losses/Cloning Loss"].append(0)