浏览代码

Looks like it's training

/develop-newnormalization
Ervin Teng 5 年前
当前提交
9e661f0c
共有 5 个文件被更改,包括 330 次插入105 次删除
  1. 32
      ml-agents/mlagents/trainers/agent_processor.py
  2. 59
      ml-agents/mlagents/trainers/ppo/policy.py
  3. 269
      ml-agents/mlagents/trainers/ppo/trainer.py
  4. 57
      ml-agents/mlagents/trainers/rl_trainer.py
  5. 18
      ml-agents/mlagents/trainers/trainer_controller.py

32
ml-agents/mlagents/trainers/agent_processor.py


import numpy as np
from mlagents.trainers.buffer import AgentBuffer, BufferException
from mlagents.envs.trainer import Trainer
from mlagents.trainers.trainer import Trainer
from mlagents.envs.exception import UnityException
from mlagents.envs.brain import BrainInfo
from mlagents.envs.action_info import ActionInfoOutputs

obs: List[np.array]
obs: List[np.ndarray]
action_probs: np.array
prev_action: np.array
action_probs: np.ndarray
action_pre: np.ndarray # TODO: Remove this
action_mask: np.array
prev_action: np.ndarray
class SplitObservations(NamedTuple):
vector_observations: np.ndarray
visual_observations: List[np.ndarray]
class Trajectory(NamedTuple):

pass
def split_obs(obs: List[np.ndarray]) -> SplitObservations:
vis_obs_indices = []
vec_obs_indices = []
for index, observation in enumerate(obs):
if len(observation.shape) == 1:
vec_obs_indices.append(index)
if len(observation.shape) == 3:
vis_obs_indices.append(index)
vec_obs = np.concatenate([obs[i] for i in vec_obs_indices], axis=0)
vis_obs = [obs[i] for i in vis_obs_indices]
return SplitObservations(vector_observations=vec_obs, visual_observations=vis_obs)
class AgentProcessor:
"""
AgentProcessor contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.

# Add the value outputs if needed
self.processing_buffer[agent_id]["environment_rewards"].append(
tmp_environment
tmp_environment[next_idx]
)
for name, value in values.items():

done=self[agent_id]["done"][_exp],
action=self[agent_id]["actions"][_exp],
action_probs=self[agent_id]["action_probs"][_exp],
action_pre=self[agent_id]["actions_pre"][_exp],
action_mask=self[agent_id]["action_mask"][_exp],
prev_action=self[agent_id]["prev_action"][_exp],
agent_id=agent_id,
memory=memory,

59
ml-agents/mlagents/trainers/ppo/policy.py


from mlagents.envs.timers import timed
from mlagents.envs.brain import BrainInfo, BrainParameters
from mlagents.trainers.models import EncoderType, LearningRateSchedule
from mlagents.trainers.agent_processor import AgentExperience, split_obs
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)

value_estimates[k] = 0.0
return value_estimates
def get_batched_value_estimates(self, batch: AgentBuffer) -> Dict[str, np.ndarray]:
feed_dict: Dict[tf.Tensor, Any] = {
self.model.batch_size: batch.num_experiences,
self.model.sequence_length: self.sequence_length,
}
if self.use_vec_obs:
feed_dict[self.model.vector_in] = batch["vector_obs"]
if self.model.vis_obs_size > 0:
for i in range(len(self.model.visual_in)):
_obs = batch["visual_obs%d" % i]
feed_dict[self.model.visual_in[i]] = _obs
if self.use_recurrent:
feed_dict[self.model.memory_in] = batch["memory"]
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = batch["prev_action"]
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: np.squeeze(v) for k, v in value_estimates.items()}
return value_estimates
def get_value_estimates2(self, experience: AgentExperience) -> Dict[str, float]:
"""
Generates value estimates for bootstrapping.
:param brain_info: BrainInfo to be used for bootstrapping.
:param idx: Index in BrainInfo of agent.
:param done: Whether or not this is the last element of the episode, in which case the value estimate will be 0.
:return: The value estimate dictionary with key being the name of the reward signal and the value the
corresponding value estimate.
"""
feed_dict: Dict[tf.Tensor, Any] = {
self.model.batch_size: 1,
self.model.sequence_length: 1,
}
vec_vis_obs = split_obs(experience.obs)
for i in range(len(vec_vis_obs.visual_observations)):
feed_dict[self.model.visual_in[i]] = [vec_vis_obs.visual_observations[i]]
if self.use_vec_obs:
feed_dict[self.model.vector_in] = [vec_vis_obs.vector_observations]
if self.use_recurrent:
feed_dict[self.model.memory_in] = [experience.memory]
if not self.use_continuous_act and self.use_recurrent:
feed_dict[self.model.prev_action] = [experience.prev_action]
value_estimates = self.sess.run(self.model.value_heads, feed_dict)
value_estimates = {k: float(v) for k, v in value_estimates.items()}
# If we're done, reassign all of the value estimates that need terminal states.
if experience.done:
for k in value_estimates:
if self.reward_signals[k].use_terminal_states:
value_estimates[k] = 0.0
return value_estimates

269
ml-agents/mlagents/trainers/ppo/trainer.py


self.collected_rewards[_reward_signal] = {}
def process_trajectory(self, trajectory: Trajectory) -> None:
pass
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
Checks agent histories for processing condition, and processes them as necessary.
Takes a trajectory and processes it, putting it into the update buffer.
:param current_info: current BrainInfo.
:param next_info: next BrainInfo.
agent_buffer_trajectory = self._trajectory_to_agentbuffer(trajectory)
# Update the normalization
self.policy.update_normalization(next_info.vector_observations)
for l in range(len(next_info.agents)):
agent_actions = self.processing_buffer[next_info.agents[l]]["actions"]
if (
next_info.local_done[l]
or len(agent_actions) > self.trainer_parameters["time_horizon"]
) and len(agent_actions) > 0:
agent_id = next_info.agents[l]
if next_info.max_reached[l]:
bootstrapping_info = self.processing_buffer[
agent_id
].last_brain_info
idx = bootstrapping_info.agents.index(agent_id)
else:
bootstrapping_info = next_info
idx = l
value_next = self.policy.get_value_estimates(
bootstrapping_info,
idx,
next_info.local_done[l] and not next_info.max_reached[l],
self.policy.update_normalization(agent_buffer_trajectory["vector_obs"])
# Get all value estimates
value_estimates = self.policy.get_batched_value_estimates(
agent_buffer_trajectory
)
for name, v in value_estimates.items():
agent_buffer_trajectory["{}_value_estimates".format(name)].extend(v)
value_next = self.policy.get_value_estimates2(trajectory.next_step)
# Evaluate all reward functions
for name, reward_signal in self.policy.reward_signals.items():
evaluate_result = reward_signal.evaluate_batch(
agent_buffer_trajectory
).scaled_reward
agent_buffer_trajectory["{}_rewards".format(name)].extend(evaluate_result)
# Compute GAE and returns
tmp_advantages = []
tmp_returns = []
for name in self.policy.reward_signals:
bootstrap_value = value_next[name]
local_rewards = agent_buffer_trajectory[
"{}_rewards".format(name)
].get_batch()
local_value_estimates = agent_buffer_trajectory[
"{}_value_estimates".format(name)
].get_batch()
local_advantage = get_gae(
rewards=local_rewards,
value_estimates=local_value_estimates,
value_next=bootstrap_value,
gamma=self.policy.reward_signals[name].gamma,
lambd=self.trainer_parameters["lambd"],
)
local_return = local_advantage + local_value_estimates
# This is later use as target for the different value estimates
agent_buffer_trajectory["{}_returns".format(name)].set(local_return)
agent_buffer_trajectory["{}_advantage".format(name)].set(local_advantage)
tmp_advantages.append(local_advantage)
tmp_returns.append(local_return)
# Get global advantages
global_advantages = list(
np.mean(np.array(tmp_advantages, dtype=np.float32), axis=0)
)
global_returns = list(np.mean(np.array(tmp_returns, dtype=np.float32), axis=0))
agent_buffer_trajectory["advantages"].set(global_advantages)
agent_buffer_trajectory["discounted_returns"].set(global_returns)
# Append to update buffer
key_list = agent_buffer_trajectory.keys()
for field_key in key_list:
self.update_buffer[field_key].extend(
agent_buffer_trajectory[field_key].get_batch(
batch_size=None, training_length=self.policy.sequence_length
)
tmp_advantages = []
tmp_returns = []
for name in self.policy.reward_signals:
bootstrap_value = value_next[name]
local_rewards = self.processing_buffer[agent_id][
"{}_rewards".format(name)
].get_batch()
local_value_estimates = self.processing_buffer[agent_id][
"{}_value_estimates".format(name)
].get_batch()
local_advantage = get_gae(
rewards=local_rewards,
value_estimates=local_value_estimates,
value_next=bootstrap_value,
gamma=self.policy.reward_signals[name].gamma,
lambd=self.trainer_parameters["lambd"],
if trajectory.steps[-1].done:
agent_id = trajectory.steps[-1].agent_id
self.stats["Environment/Episode Length"].append(
self.episode_steps.get(agent_id, 0)
)
self.episode_steps[agent_id] = 0
for name, rewards in self.collected_rewards.items():
if name == "environment":
self.cumulative_returns_since_policy_update.append(
rewards.get(agent_id, 0)
local_return = local_advantage + local_value_estimates
# This is later use as target for the different value estimates
self.processing_buffer[agent_id]["{}_returns".format(name)].set(
local_return
self.stats["Environment/Cumulative Reward"].append(
rewards.get(agent_id, 0)
self.processing_buffer[agent_id]["{}_advantage".format(name)].set(
local_advantage
self.reward_buffer.appendleft(rewards.get(agent_id, 0))
rewards[agent_id] = 0
else:
self.stats[self.policy.reward_signals[name].stat_name].append(
rewards.get(agent_id, 0)
tmp_advantages.append(local_advantage)
tmp_returns.append(local_return)
rewards[agent_id] = 0
def process_experiences(
self, current_info: BrainInfo, next_info: BrainInfo
) -> None:
pass
# """
# Checks agent histories for processing condition, and processes them as necessary.
# Processing involves calculating value and advantage targets for model updating step.
# :param current_info: current BrainInfo.
# :param next_info: next BrainInfo.
# """
# if self.is_training:
# self.policy.update_normalization(next_info.vector_observations)
# for l in range(len(next_info.agents)):
# agent_actions = self.processing_buffer[next_info.agents[l]]["actions"]
# if (
# next_info.local_done[l]
# or len(agent_actions) > self.trainer_parameters["time_horizon"]
# ) and len(agent_actions) > 0:
# agent_id = next_info.agents[l]
# if next_info.max_reached[l]:
# bootstrapping_info = self.processing_buffer[
# agent_id
# ].last_brain_info
# idx = bootstrapping_info.agents.index(agent_id)
# else:
# bootstrapping_info = next_info
# idx = l
# value_next = self.policy.get_value_estimates(
# bootstrapping_info,
# idx,
# next_info.local_done[l] and not next_info.max_reached[l],
# )
global_advantages = list(
np.mean(np.array(tmp_advantages, dtype=np.float32), axis=0)
)
global_returns = list(
np.mean(np.array(tmp_returns, dtype=np.float32), axis=0)
)
self.processing_buffer[agent_id]["advantages"].set(global_advantages)
self.processing_buffer[agent_id]["discounted_returns"].set(
global_returns
)
# tmp_advantages = []
# tmp_returns = []
# for name in self.policy.reward_signals:
# bootstrap_value = value_next[name]
self.processing_buffer.append_to_update_buffer(
self.update_buffer,
agent_id,
batch_size=None,
training_length=self.policy.sequence_length,
)
# local_rewards = self.processing_buffer[agent_id][
# "{}_rewards".format(name)
# ].get_batch()
# local_value_estimates = self.processing_buffer[agent_id][
# "{}_value_estimates".format(name)
# ].get_batch()
# local_advantage = get_gae(
# rewards=local_rewards,
# value_estimates=local_value_estimates,
# value_next=bootstrap_value,
# gamma=self.policy.reward_signals[name].gamma,
# lambd=self.trainer_parameters["lambd"],
# )
# local_return = local_advantage + local_value_estimates
# # This is later use as target for the different value estimates
# self.processing_buffer[agent_id]["{}_returns".format(name)].set(
# local_return
# )
# self.processing_buffer[agent_id]["{}_advantage".format(name)].set(
# local_advantage
# )
# tmp_advantages.append(local_advantage)
# tmp_returns.append(local_return)
self.processing_buffer[agent_id].reset_agent()
if next_info.local_done[l]:
self.stats["Environment/Episode Length"].append(
self.episode_steps.get(agent_id, 0)
)
self.episode_steps[agent_id] = 0
for name, rewards in self.collected_rewards.items():
if name == "environment":
self.cumulative_returns_since_policy_update.append(
rewards.get(agent_id, 0)
)
self.stats["Environment/Cumulative Reward"].append(
rewards.get(agent_id, 0)
)
self.reward_buffer.appendleft(rewards.get(agent_id, 0))
rewards[agent_id] = 0
else:
self.stats[
self.policy.reward_signals[name].stat_name
].append(rewards.get(agent_id, 0))
rewards[agent_id] = 0
# global_advantages = list(
# np.mean(np.array(tmp_advantages, dtype=np.float32), axis=0)
# )
# global_returns = list(
# np.mean(np.array(tmp_returns, dtype=np.float32), axis=0)
# )
# self.processing_buffer[agent_id]["advantages"].set(global_advantages)
# self.processing_buffer[agent_id]["discounted_returns"].set(
# global_returns
# )
# self.processing_buffer.append_to_update_buffer(
# self.update_buffer,
# agent_id,
# batch_size=None,
# training_length=self.policy.sequence_length,
# )
# self.processing_buffer[agent_id].reset_agent()
# if next_info.local_done[l]:
# self.stats["Environment/Episode Length"].append(
# self.episode_steps.get(agent_id, 0)
# )
# self.episode_steps[agent_id] = 0
# for name, rewards in self.collected_rewards.items():
# if name == "environment":
# self.cumulative_returns_since_policy_update.append(
# rewards.get(agent_id, 0)
# )
# self.stats["Environment/Cumulative Reward"].append(
# rewards.get(agent_id, 0)
# )
# self.reward_buffer.appendleft(rewards.get(agent_id, 0))
# rewards[agent_id] = 0
# else:
# self.stats[
# self.policy.reward_signals[name].stat_name
# ].append(rewards.get(agent_id, 0))
# rewards[agent_id] = 0
def add_policy_outputs(
self, take_action_outputs: ActionInfoOutputs, agent_id: str, agent_idx: int

57
ml-agents/mlagents/trainers/rl_trainer.py


from mlagents.envs.brain import BrainInfo
from mlagents.envs.action_info import ActionInfoOutputs
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer, Trajectory, split_obs
from mlagents.trainers.trainer import Trainer, UnityTrainerException
from mlagents.trainers.components.reward_signals import RewardSignalResult

raise UnityTrainerException(
"The add_rewards_outputs method was not implemented."
)
def _trajectory_to_agentbuffer(self, trajectory: Trajectory) -> AgentBuffer:
"""
Converts a Trajectory to an AgentBuffer
:param trajectory: A Trajectory
:returns: AgentBuffer
"""
agent_buffer_trajectory = AgentBuffer()
for step, exp in enumerate(trajectory.steps):
vec_vis_obs = split_obs(exp.obs)
if step < len(trajectory.steps) - 1:
next_vec_vis_obs = split_obs(trajectory.steps[step + 1].obs)
else:
next_vec_vis_obs = split_obs(trajectory.next_step.obs)
for i, _ in enumerate(vec_vis_obs.visual_observations):
agent_buffer_trajectory["visual_obs%d" % i].append(
vec_vis_obs.visual_observations[i]
)
agent_buffer_trajectory["next_visual_obs%d" % i].append(
next_vec_vis_obs.visual_observations[i]
)
if self.policy.use_vec_obs:
agent_buffer_trajectory["vector_obs"].append(
vec_vis_obs.vector_observations
)
agent_buffer_trajectory["next_vector_in"].append(
next_vec_vis_obs.vector_observations
)
if self.policy.use_recurrent:
agent_buffer_trajectory["memory"].append(exp.memory)
agent_buffer_trajectory["masks"].append(1.0)
agent_buffer_trajectory["done"].append(exp.done)
# Add the outputs of the last eval
if self.policy.use_continuous_act:
actions_pre = exp.action_pre
agent_buffer_trajectory["actions_pre"].append(actions_pre)
epsilons = exp.epsilon
agent_buffer_trajectory["random_normal_epsilon"].append(epsilons)
# value is a dictionary from name of reward to value estimate of the value head
agent_buffer_trajectory["actions"].append(exp.action)
agent_buffer_trajectory["action_probs"].append(exp.action_probs)
# Store action masks if necessary. Eventually these will be
# None for continuous actions
if exp.action_mask is not None:
agent_buffer_trajectory["action_mask"].append(
exp.action_mask, padding_value=1
)
agent_buffer_trajectory["prev_action"].append(exp.prev_action)
# Add the value outputs if needed
agent_buffer_trajectory["environment_rewards"].append(exp.reward)
return agent_buffer_trajectory

18
ml-agents/mlagents/trainers/trainer_controller.py


step_info.current_all_brain_info[brain_name],
step_info.brain_name_to_action_info[brain_name].outputs,
)
trainer.add_experiences(
step_info.previous_all_brain_info[brain_name],
step_info.current_all_brain_info[brain_name],
step_info.brain_name_to_action_info[brain_name].outputs,
)
trainer.process_experiences(
step_info.previous_all_brain_info[brain_name],
step_info.current_all_brain_info[brain_name],
)
# trainer.add_experiences(
# step_info.previous_all_brain_info[brain_name],
# step_info.current_all_brain_info[brain_name],
# step_info.brain_name_to_action_info[brain_name].outputs,
# )
# trainer.process_experiences(
# step_info.previous_all_brain_info[brain_name],
# step_info.current_all_brain_info[brain_name],
# )
for brain_name, trainer in self.trainers.items():
if brain_name in self.trainer_metrics:
self.trainer_metrics[brain_name].add_delta_step(delta_time_step)

正在加载...
取消
保存