|
|
|
|
|
|
from typing import Tuple, Optional, Union |
|
|
|
|
|
|
|
from mlagents.trainers.exception import UnityTrainerException |
|
|
|
from mlagents.trainers.torch.layers import linear_layer, Initialization, Swish |
|
|
|
from mlagents.trainers.torch.layers import Swish |
|
|
|
|
|
|
|
import torch |
|
|
|
from torch import nn |
|
|
|
|
|
|
return height, width |
|
|
|
|
|
|
|
|
|
|
|
class VectorEncoder(nn.Module): |
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
input_size: int, |
|
|
|
hidden_size: int, |
|
|
|
num_layers: int, |
|
|
|
normalize: bool = False, |
|
|
|
): |
|
|
|
class VectorInput(nn.Module): |
|
|
|
def __init__(self, input_size: int, normalize: bool = False): |
|
|
|
self._output_size = input_size |
|
|
|
self.layers = [ |
|
|
|
linear_layer( |
|
|
|
input_size, |
|
|
|
hidden_size, |
|
|
|
kernel_init=Initialization.KaimingHeNormal, |
|
|
|
kernel_gain=1.0, |
|
|
|
) |
|
|
|
] |
|
|
|
self.layers.append(Swish()) |
|
|
|
for _ in range(num_layers - 1): |
|
|
|
self.layers.append( |
|
|
|
linear_layer( |
|
|
|
hidden_size, |
|
|
|
hidden_size, |
|
|
|
kernel_init=Initialization.KaimingHeNormal, |
|
|
|
kernel_gain=1.0, |
|
|
|
) |
|
|
|
) |
|
|
|
self.layers.append(Swish()) |
|
|
|
self.seq_layers = nn.Sequential(*self.layers) |
|
|
|
|
|
|
|
return self.seq_layers(inputs) |
|
|
|
return inputs |
|
|
|
def copy_normalization(self, other_encoder: "VectorEncoder") -> None: |
|
|
|
def copy_normalization(self, other_encoder: "VectorInput") -> None: |
|
|
|
if self.normalizer is not None and other_encoder.normalizer is not None: |
|
|
|
self.normalizer.copy_from(other_encoder.normalizer) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def output_size(self) -> int: |
|
|
|
return self._output_size |
|
|
|
class VectorAndUnnormalizedInputEncoder(VectorEncoder): |
|
|
|
|
|
|
|
class VectorAndUnnormalizedInput(VectorInput): |
|
|
|
""" |
|
|
|
Encoder for concatenated vector input (can be normalized) and unnormalized vector input. |
|
|
|
This is used for passing inputs to the network that should not be normalized, such as |
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
input_size: int, |
|
|
|
hidden_size: int, |
|
|
|
unnormalized_input_size: int, |
|
|
|
num_layers: int, |
|
|
|
normalize: bool = False, |
|
|
|
self, input_size: int, unnormalized_input_size: int, normalize: bool = False |
|
|
|
super().__init__( |
|
|
|
input_size + unnormalized_input_size, |
|
|
|
hidden_size, |
|
|
|
num_layers, |
|
|
|
normalize=False, |
|
|
|
) |
|
|
|
super().__init__(input_size + unnormalized_input_size, normalize=False) |
|
|
|
self._output_size = input_size + unnormalized_input_size |
|
|
|
if normalize: |
|
|
|
self.normalizer = Normalizer(input_size) |
|
|
|
else: |
|
|
|
|
|
|
) # Fix mypy errors about method parameters. |
|
|
|
if self.normalizer is not None: |
|
|
|
inputs = self.normalizer(inputs) |
|
|
|
return self.seq_layers(torch.cat([inputs, unnormalized_inputs], dim=-1)) |
|
|
|
return torch.cat([inputs, unnormalized_inputs], dim=-1) |
|
|
|
|
|
|
|
|
|
|
|
class SimpleVisualEncoder(nn.Module): |
|
|
|
|
|
|
nn.Conv2d(initial_channels, 16, [8, 8], [4, 4]), |
|
|
|
nn.LeakyReLU(), |
|
|
|
nn.Conv2d(16, 32, [4, 4], [2, 2]), |
|
|
|
nn.LeakyReLU(), |
|
|
|
) |
|
|
|
self.dense = nn.Sequential( |
|
|
|
linear_layer( |
|
|
|
self.final_flat, |
|
|
|
self.h_size, |
|
|
|
kernel_init=Initialization.KaimingHeNormal, |
|
|
|
kernel_gain=1.0, |
|
|
|
), |
|
|
|
def forward(self, visual_obs: torch.Tensor) -> None: |
|
|
|
def output_size(self) -> int: |
|
|
|
return self.final_flat |
|
|
|
|
|
|
|
def forward(self, visual_obs: torch.Tensor) -> torch.Tensor: |
|
|
|
hidden = self.dense(hidden) |
|
|
|
def __init__(self, height, width, initial_channels, output_size): |
|
|
|
def __init__( |
|
|
|
self, height: int, width: int, initial_channels: int, output_size: int |
|
|
|
): |
|
|
|
super().__init__() |
|
|
|
self.h_size = output_size |
|
|
|
conv_1_hw = conv_output_shape((height, width), 8, 4) |
|
|
|
|
|
|
nn.Conv2d(64, 64, [3, 3], [1, 1]), |
|
|
|
nn.LeakyReLU(), |
|
|
|
) |
|
|
|
self.dense = nn.Sequential( |
|
|
|
linear_layer( |
|
|
|
self.final_flat, |
|
|
|
self.h_size, |
|
|
|
kernel_init=Initialization.KaimingHeNormal, |
|
|
|
kernel_gain=1.0, |
|
|
|
), |
|
|
|
nn.LeakyReLU(), |
|
|
|
) |
|
|
|
|
|
|
|
def output_size(self) -> int: |
|
|
|
return self.final_flat |
|
|
|
def forward(self, visual_obs: torch.Tensor) -> None: |
|
|
|
def forward(self, visual_obs: torch.Tensor) -> torch.Tensor: |
|
|
|
hidden = self.dense(hidden) |
|
|
|
return hidden |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResNetVisualEncoder(nn.Module): |
|
|
|
def __init__(self, height, width, initial_channels, final_hidden): |
|
|
|
def __init__(self, height: int, width: int, initial_channels: int): |
|
|
|
super().__init__() |
|
|
|
n_channels = [16, 32, 32] # channel for each stack |
|
|
|
n_blocks = 2 # number of residual blocks |
|
|
|
|
|
|
layers.append(ResNetBlock(channel)) |
|
|
|
last_channel = channel |
|
|
|
layers.append(Swish()) |
|
|
|
self.dense = linear_layer( |
|
|
|
n_channels[-1] * height * width, |
|
|
|
final_hidden, |
|
|
|
kernel_init=Initialization.KaimingHeNormal, |
|
|
|
kernel_gain=1.0, |
|
|
|
) |
|
|
|
self._output_size = n_channels[-1] * height * width |
|
|
|
def forward(self, visual_obs): |
|
|
|
def output_size(self) -> int: |
|
|
|
return self._output_size |
|
|
|
|
|
|
|
def forward(self, visual_obs: torch.Tensor) -> torch.Tensor: |
|
|
|
return torch.relu(self.dense(before_out)) |
|
|
|
return torch.relu(before_out) |