Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

250 行
9.5 KiB

from typing import Any, Dict
import numpy as np
import torch
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.policy import Policy
from mlagents_envs.base_env import DecisionSteps
from mlagents.tf_utils import tf
from mlagents_envs.timers import timed
from mlagents.trainers.policy.policy import UnityPolicyException
from mlagents.trainers.trajectory import SplitObservations
from mlagents.trainers.brain import BrainParameters
from mlagents.trainers.models_torch import ActionType, EncoderType, Actor, Critic
EPSILON = 1e-7 # Small value to avoid divide by zero
class TorchPolicy(Policy):
def __init__(
self,
seed: int,
brain: BrainParameters,
trainer_params: Dict[str, Any],
load: bool,
tanh_squash: bool = False,
reparameterize: bool = False,
condition_sigma_on_obs: bool = True,
):
"""
Policy that uses a multilayer perceptron to map the observations to actions. Could
also use a CNN to encode visual input prior to the MLP. Supports discrete and
continuous action spaces, as well as recurrent networks.
:param seed: Random seed.
:param brain: Assigned BrainParameters object.
:param trainer_params: Defined training parameters.
:param load: Whether a pre-trained model will be loaded or a new one created.
:param tanh_squash: Whether to use a tanh function on the continuous output,
or a clipped output.
:param reparameterize: Whether we are using the resampling trick to update the policy
in continuous output.
"""
super(TorchPolicy, self).__init__(brain, seed, trainer_params)
self.grads = None
num_layers = trainer_params["num_layers"]
self.h_size = trainer_params["hidden_units"]
self.seed = seed
self.brain = brain
self.global_step = 0
self.act_size = brain.vector_action_space_size
self.sequence_length = 1
if self.use_recurrent:
self.m_size = trainer_params["memory_size"]
self.sequence_length = trainer_params["sequence_length"]
if self.m_size == 0:
raise UnityPolicyException(
"The memory size for brain {0} is 0 even "
"though the trainer uses recurrent.".format(brain.brain_name)
)
elif self.m_size % 2 != 0:
raise UnityPolicyException(
"The memory size for brain {0} is {1} "
"but it must be divisible by 2.".format(
brain.brain_name, self.m_size
)
)
if num_layers < 1:
num_layers = 1
self.num_layers = num_layers
self.vis_encode_type = EncoderType(
trainer_params.get("vis_encode_type", "simple")
)
self.tanh_squash = tanh_squash
self.reparameterize = reparameterize
self.condition_sigma_on_obs = condition_sigma_on_obs
# Non-exposed parameters; these aren't exposed because they don't have a
# good explanation and usually shouldn't be touched.
self.log_std_min = -20
self.log_std_max = 2
self.inference_dict: Dict[str, tf.Tensor] = {}
self.update_dict: Dict[str, tf.Tensor] = {}
# TF defaults to 32-bit, so we use the same here.
torch.set_default_tensor_type(torch.DoubleTensor)
reward_signal_configs = trainer_params["reward_signals"]
self.stats_name_to_update_name = {
"Losses/Value Loss": "value_loss",
"Losses/Policy Loss": "policy_loss",
}
self.actor = Actor(
h_size=int(trainer_params["hidden_units"]),
act_type=ActionType.CONTINUOUS,
vector_sizes=[brain.vector_observation_space_size],
act_size=sum(brain.vector_action_space_size),
normalize=trainer_params["normalize"],
num_layers=int(trainer_params["num_layers"]),
m_size=trainer_params["memory_size"],
use_lstm=self.use_recurrent,
visual_sizes=brain.camera_resolutions,
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
self.critic = Critic(
h_size=int(trainer_params["hidden_units"]),
vector_sizes=[brain.vector_observation_space_size],
normalize=trainer_params["normalize"],
num_layers=int(trainer_params["num_layers"]),
m_size=trainer_params["memory_size"],
use_lstm=self.use_recurrent,
visual_sizes=brain.camera_resolutions,
stream_names=list(reward_signal_configs.keys()),
vis_encode_type=EncoderType(
trainer_params.get("vis_encode_type", "simple")
),
)
def split_decision_step(self, decision_requests):
vec_vis_obs = SplitObservations.from_observations(decision_requests.obs)
mask = None
if not self.use_continuous_act:
mask = np.ones(
(len(decision_requests), np.sum(self.brain.vector_action_space_size)),
dtype=np.float32,
)
if decision_requests.action_mask is not None:
mask = 1 - np.concatenate(decision_requests.action_mask, axis=1)
return vec_vis_obs.vector_observations, vec_vis_obs.visual_observations, mask
def update_normalization(self, vector_obs: np.ndarray) -> None:
"""
If this policy normalizes vector observations, this will update the norm values in the graph.
:param vector_obs: The vector observations to add to the running estimate of the distribution.
"""
print(vector_obs.shape)
vector_obs = [vector_obs]
if self.use_vec_obs and self.normalize:
self.critic.network_body.update_normalization(vector_obs)
self.actor.network_body.update_normalization(vector_obs)
def execute_model(self, vec_obs, vis_obs, masks=None):
action_dists = self.actor(vec_obs, vis_obs, masks)
actions = []
log_probs = []
entropies = []
for action_dist in action_dists:
action = action_dist.sample()
actions.append(action)
log_probs.append(action_dist.log_prob(action))
entropies.append(action_dist.entropy())
actions = torch.stack(actions).squeeze(0)
log_probs = torch.stack(log_probs).squeeze(0)
entropies = torch.stack(entropies).squeeze(0)
value_heads, mean_value = self.critic(vec_obs, vis_obs)
return actions, log_probs, entropies, value_heads
@timed
def evaluate(self, decision_requests: DecisionSteps) -> Dict[str, Any]:
"""
Evaluates policy for the agent experiences provided.
:param decision_requests: DecisionStep object containing inputs.
:return: Outputs from network as defined by self.inference_dict.
"""
vec_obs, vis_obs, masks = self.split_decision_step(decision_requests)
vec_obs = [torch.Tensor(vec_obs)]
vis_obs = [torch.Tensor(vis_ob) for vis_ob in vis_obs]
run_out = {}
action, log_probs, entropy, value_heads = self.execute_model(
vec_obs, vis_obs, masks
)
run_out["action"] = np.array(action.detach())
run_out["pre_action"] = np.array(
action.detach()
) # Todo - make pre_action difference
run_out["log_probs"] = np.array(log_probs.detach())
run_out["entropy"] = np.array(entropy.detach())
run_out["value_heads"] = {
name: np.array(t.detach()) for name, t in value_heads.items()
}
run_out["value"] = np.mean(list(run_out["value_heads"].values()), 0)
run_out["learning_rate"] = 0.0
self.actor.network_body.update_normalization(vec_obs)
self.critic.network_body.update_normalization(vec_obs)
return run_out
def get_action(
self, decision_requests: DecisionSteps, worker_id: int = 0
) -> ActionInfo:
"""
Decides actions given observations information, and takes them in environment.
:param worker_id:
:param decision_requests: A dictionary of brain names and BrainInfo from environment.
:return: an ActionInfo containing action, memories, values and an object
to be passed to add experiences
"""
if len(decision_requests) == 0:
return ActionInfo.empty()
run_out = self.evaluate(
decision_requests
) # pylint: disable=assignment-from-no-return
return ActionInfo(
action=run_out.get("action"),
value=run_out.get("value"),
outputs=run_out,
agent_ids=list(decision_requests.agent_id),
)
@property
def vis_obs_size(self):
return self.brain.number_visual_observations
@property
def vec_obs_size(self):
return self.brain.vector_observation_space_size
@property
def use_vis_obs(self):
return self.vis_obs_size > 0
@property
def use_vec_obs(self):
return self.vec_obs_size > 0
def get_current_step(self):
"""
Gets current model step.
:return: current model step.
"""
step = self.global_step
return step
def increment_step(self, n_steps):
"""
Increments model step.
"""
self.global_step += n_steps
return self.get_current_step()
def save_model(self, step):
pass
def export_model(self):
pass