Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

517 行
21 KiB

from typing import List, Optional, Tuple, NamedTuple, Dict
from mlagents.torch_utils import torch, nn
import numpy as np
from mlagents.trainers.torch.encoders import (
SimpleVisualEncoder,
ResNetVisualEncoder,
NatureVisualEncoder,
SmallVisualEncoder,
VectorInput,
)
from mlagents.trainers.settings import EncoderType, ScheduleType
from mlagents.trainers.exception import UnityTrainerException
from mlagents_envs.base_env import ActionSpec
from mlagents.trainers.torch.distributions import DistInstance, DiscreteDistInstance
class AgentAction(NamedTuple):
"""
A NamedTuple containing the tensor for continuous actions and list of tensors for
discrete actions. Utility functions provide numpy <=> tensor conversions to be
sent as actions to the environment manager as well as used by the optimizers.
:param continuous_tensor: Torch tensor corresponding to continuous actions
:param discrete_list: List of Torch tensors each corresponding to discrete actions
"""
continuous_tensor: torch.Tensor
discrete_list: List[torch.Tensor]
@property
def discrete_tensor(self):
"""
Returns the discrete action list as a stacked tensor
"""
return torch.stack(self.discrete_list, dim=-1)
def to_numpy_dict(self) -> Dict[str, np.ndarray]:
"""
Returns a Dict of np arrays with an entry correspinding to the continuous action
and an entry corresponding to the discrete action. "continuous_action" and
"discrete_action" are added to the agents buffer individually to maintain a flat buffer.
"""
array_dict: Dict[str, np.ndarray] = {}
if self.continuous_tensor is not None:
array_dict["continuous_action"] = ModelUtils.to_numpy(
self.continuous_tensor
)
if self.discrete_list is not None:
array_dict["discrete_action"] = ModelUtils.to_numpy(
self.discrete_tensor[:, 0, :]
)
return array_dict
def to_tensor_list(self) -> List[torch.Tensor]:
"""
Returns the tensors in the AgentAction as a flat List of torch Tensors. This will be removed
when the ActionModel is merged.
"""
tensor_list: List[torch.Tensor] = []
if self.continuous_tensor is not None:
tensor_list.append(self.continuous_tensor)
if self.discrete_list is not None:
tensor_list += (
self.discrete_list
) # Note this is different for ActionLogProbs
return tensor_list
@staticmethod
def create(
tensor_list: List[torch.Tensor], action_spec: ActionSpec
) -> "AgentAction":
"""
A static method that converts a list of torch Tensors into an AgentAction using the ActionSpec.
This will change (and may be removed) in the ActionModel.
"""
continuous: torch.Tensor = None
discrete: List[torch.Tensor] = None # type: ignore
_offset = 0
if action_spec.continuous_size > 0:
continuous = tensor_list[0]
_offset = 1
if action_spec.discrete_size > 0:
discrete = tensor_list[_offset:]
return AgentAction(continuous, discrete)
@staticmethod
def from_dict(buff: Dict[str, np.ndarray]) -> "AgentAction":
"""
A static method that accesses continuous and discrete action fields in an AgentBuffer
and constructs the corresponding AgentAction from the retrieved np arrays.
"""
continuous: torch.Tensor = None
discrete: List[torch.Tensor] = None # type: ignore
if "continuous_action" in buff:
continuous = ModelUtils.list_to_tensor(buff["continuous_action"])
if "discrete_action" in buff:
discrete_tensor = ModelUtils.list_to_tensor(
buff["discrete_action"], dtype=torch.long
)
discrete = [
discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])
]
return AgentAction(continuous, discrete)
class ActionLogProbs(NamedTuple):
"""
A NamedTuple containing the tensor for continuous log probs and list of tensors for
discrete log probs of individual actions as well as all the log probs for an entire branch.
Utility functions provide numpy <=> tensor conversions to be used by the optimizers.
:param continuous_tensor: Torch tensor corresponding to log probs of continuous actions
:param discrete_list: List of Torch tensors each corresponding to log probs of the discrete actions that were
sampled.
:param all_discrete_list: List of Torch tensors each corresponding to all log probs of
a discrete action branch, even the discrete actions that were not sampled. all_discrete_list is a list of Tensors,
each Tensor corresponds to one discrete branch log probabilities.
"""
continuous_tensor: torch.Tensor
discrete_list: List[torch.Tensor]
all_discrete_list: Optional[List[torch.Tensor]]
@property
def discrete_tensor(self):
"""
Returns the discrete log probs list as a stacked tensor
"""
return torch.stack(self.discrete_list, dim=-1)
@property
def all_discrete_tensor(self):
"""
Returns the discrete log probs of each branch as a tensor
"""
return torch.cat(self.all_discrete_list, dim=1)
def to_numpy_dict(self) -> Dict[str, np.ndarray]:
"""
Returns a Dict of np arrays with an entry correspinding to the continuous log probs
and an entry corresponding to the discrete log probs. "continuous_log_probs" and
"discrete_log_probs" are added to the agents buffer individually to maintain a flat buffer.
"""
array_dict: Dict[str, np.ndarray] = {}
if self.continuous_tensor is not None:
array_dict["continuous_log_probs"] = ModelUtils.to_numpy(
self.continuous_tensor
)
if self.discrete_list is not None:
array_dict["discrete_log_probs"] = ModelUtils.to_numpy(self.discrete_tensor)
return array_dict
def _to_tensor_list(self) -> List[torch.Tensor]:
"""
Returns the tensors in the ActionLogProbs as a flat List of torch Tensors. This
is private and serves as a utility for self.flatten()
"""
tensor_list: List[torch.Tensor] = []
if self.continuous_tensor is not None:
tensor_list.append(self.continuous_tensor)
if self.discrete_list is not None:
tensor_list.append(
self.discrete_tensor
) # Note this is different for AgentActions
return tensor_list
def flatten(self) -> torch.Tensor:
"""
A utility method that returns all log probs in ActionLogProbs as a flattened tensor.
This is useful for algorithms like PPO which can treat all log probs in the same way.
"""
return torch.cat(self._to_tensor_list(), dim=1)
@staticmethod
def create(
log_prob_list: List[torch.Tensor],
action_spec: ActionSpec,
all_log_prob_list: List[torch.Tensor] = None,
) -> "ActionLogProbs":
"""
A static method that converts a list of torch Tensors into an ActionLogProbs using the ActionSpec.
This will change (and may be removed) in the ActionModel.
"""
continuous: torch.Tensor = None
discrete: List[torch.Tensor] = None # type: ignore
_offset = 0
if action_spec.continuous_size > 0:
continuous = log_prob_list[0]
_offset = 1
if action_spec.discrete_size > 0:
discrete = log_prob_list[_offset:]
return ActionLogProbs(continuous, discrete, all_log_prob_list)
@staticmethod
def from_dict(buff: Dict[str, np.ndarray]) -> "ActionLogProbs":
"""
A static method that accesses continuous and discrete log probs fields in an AgentBuffer
and constructs the corresponding ActionLogProbs from the retrieved np arrays.
"""
continuous: torch.Tensor = None
discrete: List[torch.Tensor] = None # type: ignore
if "continuous_log_probs" in buff:
continuous = ModelUtils.list_to_tensor(buff["continuous_log_probs"])
if "discrete_log_probs" in buff:
discrete_tensor = ModelUtils.list_to_tensor(buff["discrete_log_probs"])
discrete = [
discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])
]
return ActionLogProbs(continuous, discrete, None)
class ModelUtils:
# Minimum supported side for each encoder type. If refactoring an encoder, please
# adjust these also.
MIN_RESOLUTION_FOR_ENCODER = {
EncoderType.MATCH3: 5,
EncoderType.SIMPLE: 20,
EncoderType.NATURE_CNN: 36,
EncoderType.RESNET: 15,
}
class ActionFlattener:
def __init__(self, action_spec: ActionSpec):
self._specs = action_spec
@property
def flattened_size(self) -> int:
if self._specs.is_continuous():
return self._specs.continuous_size
else:
return sum(self._specs.discrete_branches)
def forward(self, action: AgentAction) -> torch.Tensor:
if self._specs.is_continuous():
return action.continuous_tensor
else:
return torch.cat(
ModelUtils.actions_to_onehot(
torch.as_tensor(action.discrete_tensor, dtype=torch.long),
self._specs.discrete_branches,
),
dim=1,
)
@staticmethod
def update_learning_rate(optim: torch.optim.Optimizer, lr: float) -> None:
"""
Apply a learning rate to a torch optimizer.
:param optim: Optimizer
:param lr: Learning rate
"""
for param_group in optim.param_groups:
param_group["lr"] = lr
class DecayedValue:
def __init__(
self,
schedule: ScheduleType,
initial_value: float,
min_value: float,
max_step: int,
):
"""
Object that represnets value of a parameter that should be decayed, assuming it is a function of
global_step.
:param schedule: Type of learning rate schedule.
:param initial_value: Initial value before decay.
:param min_value: Decay value to this value by max_step.
:param max_step: The final step count where the return value should equal min_value.
:param global_step: The current step count.
:return: The value.
"""
self.schedule = schedule
self.initial_value = initial_value
self.min_value = min_value
self.max_step = max_step
def get_value(self, global_step: int) -> float:
"""
Get the value at a given global step.
:param global_step: Step count.
:returns: Decayed value at this global step.
"""
if self.schedule == ScheduleType.CONSTANT:
return self.initial_value
elif self.schedule == ScheduleType.LINEAR:
return ModelUtils.polynomial_decay(
self.initial_value, self.min_value, self.max_step, global_step
)
else:
raise UnityTrainerException(f"The schedule {self.schedule} is invalid.")
@staticmethod
def polynomial_decay(
initial_value: float,
min_value: float,
max_step: int,
global_step: int,
power: float = 1.0,
) -> float:
"""
Get a decayed value based on a polynomial schedule, with respect to the current global step.
:param initial_value: Initial value before decay.
:param min_value: Decay value to this value by max_step.
:param max_step: The final step count where the return value should equal min_value.
:param global_step: The current step count.
:param power: Power of polynomial decay. 1.0 (default) is a linear decay.
:return: The current decayed value.
"""
global_step = min(global_step, max_step)
decayed_value = (initial_value - min_value) * (
1 - float(global_step) / max_step
) ** (power) + min_value
return decayed_value
@staticmethod
def get_encoder_for_type(encoder_type: EncoderType) -> nn.Module:
ENCODER_FUNCTION_BY_TYPE = {
EncoderType.SIMPLE: SimpleVisualEncoder,
EncoderType.NATURE_CNN: NatureVisualEncoder,
EncoderType.RESNET: ResNetVisualEncoder,
EncoderType.MATCH3: SmallVisualEncoder,
}
return ENCODER_FUNCTION_BY_TYPE.get(encoder_type)
@staticmethod
def _check_resolution_for_encoder(
height: int, width: int, vis_encoder_type: EncoderType
) -> None:
min_res = ModelUtils.MIN_RESOLUTION_FOR_ENCODER[vis_encoder_type]
if height < min_res or width < min_res:
raise UnityTrainerException(
f"Visual observation resolution ({width}x{height}) is too small for"
f"the provided EncoderType ({vis_encoder_type.value}). The min dimension is {min_res}"
)
@staticmethod
def create_input_processors(
observation_shapes: List[Tuple[int, ...]],
h_size: int,
vis_encode_type: EncoderType,
normalize: bool = False,
) -> Tuple[nn.ModuleList, nn.ModuleList, int]:
"""
Creates visual and vector encoders, along with their normalizers.
:param observation_shapes: List of Tuples that represent the action dimensions.
:param action_size: Number of additional un-normalized inputs to each vector encoder. Used for
conditioining network on other values (e.g. actions for a Q function)
:param h_size: Number of hidden units per layer.
:param vis_encode_type: Type of visual encoder to use.
:param unnormalized_inputs: Vector inputs that should not be normalized, and added to the vector
obs.
:param normalize: Normalize all vector inputs.
:return: Tuple of visual encoders and vector encoders each as a list.
"""
visual_encoders: List[nn.Module] = []
vector_encoders: List[nn.Module] = []
visual_encoder_class = ModelUtils.get_encoder_for_type(vis_encode_type)
vector_size = 0
visual_output_size = 0
for i, dimension in enumerate(observation_shapes):
if len(dimension) == 3:
ModelUtils._check_resolution_for_encoder(
dimension[0], dimension[1], vis_encode_type
)
visual_encoders.append(
visual_encoder_class(
dimension[0], dimension[1], dimension[2], h_size
)
)
visual_output_size += h_size
elif len(dimension) == 1:
vector_size += dimension[0]
else:
raise UnityTrainerException(
f"Unsupported shape of {dimension} for observation {i}"
)
if vector_size > 0:
vector_encoders.append(VectorInput(vector_size, normalize))
# Total output size for all inputs + CNNs
total_processed_size = vector_size + visual_output_size
return (
nn.ModuleList(visual_encoders),
nn.ModuleList(vector_encoders),
total_processed_size,
)
@staticmethod
def list_to_tensor(
ndarray_list: List[np.ndarray], dtype: Optional[torch.dtype] = torch.float32
) -> torch.Tensor:
"""
Converts a list of numpy arrays into a tensor. MUCH faster than
calling as_tensor on the list directly.
"""
return torch.as_tensor(np.asanyarray(ndarray_list), dtype=dtype)
@staticmethod
def to_numpy(tensor: torch.Tensor) -> np.ndarray:
"""
Converts a Torch Tensor to a numpy array. If the Tensor is on the GPU, it will
be brought to the CPU.
"""
return tensor.detach().cpu().numpy()
@staticmethod
def break_into_branches(
concatenated_logits: torch.Tensor, action_size: List[int]
) -> List[torch.Tensor]:
"""
Takes a concatenated set of logits that represent multiple discrete action branches
and breaks it up into one Tensor per branch.
:param concatenated_logits: Tensor that represents the concatenated action branches
:param action_size: List of ints containing the number of possible actions for each branch.
:return: A List of Tensors containing one tensor per branch.
"""
action_idx = [0] + list(np.cumsum(action_size))
branched_logits = [
concatenated_logits[:, action_idx[i] : action_idx[i + 1]]
for i in range(len(action_size))
]
return branched_logits
@staticmethod
def actions_to_onehot(
discrete_actions: torch.Tensor, action_size: List[int]
) -> List[torch.Tensor]:
"""
Takes a tensor of discrete actions and turns it into a List of onehot encoding for each
action.
:param discrete_actions: Actions in integer form.
:param action_size: List of branch sizes. Should be of same size as discrete_actions'
last dimension.
:return: List of one-hot tensors, one representing each branch.
"""
onehot_branches = [
torch.nn.functional.one_hot(_act.T, action_size[i]).float()
for i, _act in enumerate(discrete_actions.long().T)
]
return onehot_branches
@staticmethod
def dynamic_partition(
data: torch.Tensor, partitions: torch.Tensor, num_partitions: int
) -> List[torch.Tensor]:
"""
Torch implementation of dynamic_partition :
https://www.tensorflow.org/api_docs/python/tf/dynamic_partition
Splits the data Tensor input into num_partitions Tensors according to the indices in
partitions.
:param data: The Tensor data that will be split into partitions.
:param partitions: An indices tensor that determines in which partition each element
of data will be in.
:param num_partitions: The number of partitions to output. Corresponds to the
maximum possible index in the partitions argument.
:return: A list of Tensor partitions (Their indices correspond to their partition index).
"""
res: List[torch.Tensor] = []
for i in range(num_partitions):
res += [data[(partitions == i).nonzero().squeeze(1)]]
return res
@staticmethod
def get_probs_and_entropy(
action_list: List[torch.Tensor], dists: List[DistInstance]
) -> Tuple[List[torch.Tensor], torch.Tensor, Optional[torch.Tensor]]:
log_probs_list = []
all_probs_list = []
entropies_list = []
for action, action_dist in zip(action_list, dists):
log_prob = action_dist.log_prob(action)
log_probs_list.append(log_prob)
entropies_list.append(action_dist.entropy())
if isinstance(action_dist, DiscreteDistInstance):
all_probs_list.append(action_dist.all_log_prob())
entropies = torch.stack(entropies_list, dim=-1)
if not all_probs_list:
entropies = entropies.squeeze(-1)
return log_probs_list, entropies, all_probs_list
@staticmethod
def masked_mean(tensor: torch.Tensor, masks: torch.Tensor) -> torch.Tensor:
"""
Returns the mean of the tensor but ignoring the values specified by masks.
Used for masking out loss functions.
:param tensor: Tensor which needs mean computation.
:param masks: Boolean tensor of masks with same dimension as tensor.
"""
return (tensor.T * masks).sum() / torch.clamp(
(torch.ones_like(tensor.T) * masks).float().sum(), min=1.0
)
@staticmethod
def soft_update(source: nn.Module, target: nn.Module, tau: float) -> None:
"""
Performs an in-place polyak update of the target module based on the source,
by a ratio of tau. Note that source and target modules must have the same
parameters, where:
target = tau * source + (1-tau) * target
:param source: Source module whose parameters will be used.
:param target: Target module whose parameters will be updated.
:param tau: Percentage of source parameters to use in average. Setting tau to
1 will copy the source parameters to the target.
"""
with torch.no_grad():
for source_param, target_param in zip(
source.parameters(), target.parameters()
):
target_param.data.mul_(1.0 - tau)
torch.add(
target_param.data,
source_param.data,
alpha=tau,
out=target_param.data,
)