|
|
|
|
|
|
from mlagents.trainers.torch.distributions import DistInstance, DiscreteDistInstance |
|
|
|
|
|
|
|
|
|
|
|
class ActionSpaceTuple(NamedTuple): |
|
|
|
continuous: torch.Tensor |
|
|
|
discrete: List[torch.Tensor] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property |
|
|
|
def discrete_tensor(self): |
|
|
|
return torch.cat([_disc.unsqueeze(-1) for _disc in self.discrete], dim=1) |
|
|
|
|
|
|
|
def to_numpy_dict(self) -> Dict[str, np.ndarray]: |
|
|
|
array_dict: Dict[str, np.ndarray] = {} |
|
|
|
|
|
|
discrete = [discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])] |
|
|
|
return AgentAction(continuous, discrete) |
|
|
|
|
|
|
|
|
|
|
|
class ActionLogProbs(ActionSpaceTuple): |
|
|
|
class ActionLogProbs(NamedTuple): |
|
|
|
|
|
|
|
|
|
|
|
@property |
|
|
|
def discrete_tensor(self): |
|
|
|
return torch.cat([_disc.unsqueeze(-1) for _disc in self.discrete], dim=1) |
|
|
|
|
|
|
|
def to_numpy_dict(self) -> Dict[str, np.ndarray]: |
|
|
|
array_dict: Dict[str, np.ndarray] = {} |
|
|
|
if self.continuous is not None: |
|
|
|
|
|
|
if self.continuous is not None: |
|
|
|
tensor_list.append(self.continuous) |
|
|
|
if self.discrete is not None: |
|
|
|
tensor_list += self.discrete |
|
|
|
for _disc in self.discrete: |
|
|
|
tensor_list.append(_disc.unsqueeze(-1)) |
|
|
|
return torch.stack(self.to_tensor_list(), dim=-1) |
|
|
|
return torch.cat(self.to_tensor_list(), dim=1) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def create(tensor_list: List[torch.Tensor], action_spec: ActionSpec) -> "ActionLogProbs": |
|
|
|
|
|
|
discrete = [discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])] |
|
|
|
return ActionLogProbs(continuous, discrete) |
|
|
|
|
|
|
|
#def to_numpy_dict(self) -> Dict[str, np.ndarray]: |
|
|
|
# action_arrays_dict: Dict[str, np.ndarray] = {} |
|
|
|
# if self.continuous is not None: |
|
|
|
# action_arrays_dict["continuous_action"] = ModelUtils.to_numpy(self.continuous.unsqueeze(-1)[:, :, 0]) |
|
|
|
# if self.discrete is not None: |
|
|
|
# discrete_tensor = torch.stack(self.discrete, dim=-1) |
|
|
|
# action_arrays_dict["discrete_action"] = ModelUtils.to_numpy(discrete_tensor[:, 0, :]) |
|
|
|
# return action_arrays_dict |
|
|
|
|
|
|
|
#def to_tensor_list(self) -> List[torch.Tensor]: |
|
|
|
# tensor_list: List[torch.Tensor] = [] |
|
|
|
# if self.continuous is not None: |
|
|
|
# tensor_list.append(self.continuous) |
|
|
|
# if self.discrete is not None: |
|
|
|
# tensor_list += self.discrete |
|
|
|
# return tensor_list |
|
|
|
|
|
|
|
#def flatten(self) -> torch.Tensor: |
|
|
|
# return torch.stack(self.to_tensor_list(), dim=-1) |
|
|
|
|
|
|
|
#@staticmethod |
|
|
|
#def extract_agent_action(buff: Dict[str, np.ndarray]) -> "AgentAction": |
|
|
|
# continuous: torch.Tensor = None |
|
|
|
# discrete: List[torch.Tensor] = None |
|
|
|
# if "continuous_action" in buff: |
|
|
|
# continuous = ModelUtils.list_to_tensor(buff["continuous_action"]) |
|
|
|
# if "discrete_action" in buff: |
|
|
|
# discrete_tensor = ModelUtils.list_to_tensor(buff["discrete_action"]) |
|
|
|
# discrete = [discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])] |
|
|
|
# return AgentAction(continuous, discrete) |
|
|
|
# |
|
|
|
#@staticmethod |
|
|
|
#def create_agent_action(action_tensors: List[torch.Tensor], action_spec: ActionSpec) -> "AgentAction": |
|
|
|
# continuous: torch.Tensor = None |
|
|
|
# discrete: List[torch.Tensor] = None |
|
|
|
# _offset = 0 |
|
|
|
# if action_spec.continuous_size > 0: |
|
|
|
# continuous = action_tensors[0] |
|
|
|
# _offset = 1 |
|
|
|
# if action_spec.discrete_size > 0: |
|
|
|
# discrete = action_tensors[_offset:] |
|
|
|
# return AgentAction(continuous, discrete) |
|
|
|
|
|
|
|
|
|
|
|
#def to_numpy_dict(self) -> Dict[str, np.ndarray]: |
|
|
|
# log_prob_arrays_dict: Dict[str, np.ndarray] = {} |
|
|
|
# if self.continuous is not None: |
|
|
|
# log_prob_arrays_dict["continuous_log_probs"] = ModelUtils.to_numpy(self.continuous) |
|
|
|
# if self.discrete is not None: |
|
|
|
# discrete_tensor = torch.stack(self.discrete, dim=-1) |
|
|
|
# log_prob_arrays_dict["discrete_log_probs"] = ModelUtils.to_numpy(discrete_tensor.squeeze(1)) |
|
|
|
# return log_prob_arrays_dict |
|
|
|
|
|
|
|
#def to_tensor_list(self) -> List[torch.Tensor]: |
|
|
|
# tensor_list: List[torch.Tensor] = [] |
|
|
|
# if self.continuous is not None: |
|
|
|
# tensor_list.append(self.continuous) |
|
|
|
# if self.discrete is not None: |
|
|
|
# tensor_list += self.discrete |
|
|
|
# return tensor_list |
|
|
|
|
|
|
|
#def flatten(self) -> torch.Tensor: |
|
|
|
# return torch.stack(self.to_tensor_list(), dim=-1) |
|
|
|
|
|
|
|
#@staticmethod |
|
|
|
#def extract_action_log_probs(buff: Dict[str, np.ndarray]) -> "AgentAction": |
|
|
|
# continuous: torch.Tensor = None |
|
|
|
# discrete: List[torch.Tensor] = None |
|
|
|
# if "continuous_action" in buff: |
|
|
|
# continuous = ModelUtils.list_to_tensor(buff["continuous_log_probs"]).unsqueeze(-1) |
|
|
|
# if "discrete_action" in buff: |
|
|
|
# discrete_tensor = ModelUtils.list_to_tensor(buff["discrete_log_probs"]) |
|
|
|
# discrete = [discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1])] |
|
|
|
# return ActionLogProbs(continuous, discrete) |
|
|
|
|
|
|
|
#@staticmethod |
|
|
|
#def create_action_log_probs(log_prob_tensors: List[torch.Tensor], action_spec: ActionSpec) -> "AgentAction": |
|
|
|
# continuous: torch.Tensor = None |
|
|
|
# discrete: List[torch.Tensor] = None |
|
|
|
# _offset = 0 |
|
|
|
# if action_spec.continuous_size > 0: |
|
|
|
# continuous = log_prob_tensors[0] |
|
|
|
# _offset = 1 |
|
|
|
# if action_spec.discrete_size > 0: |
|
|
|
# discrete = log_prob_tensors[_offset:] |
|
|
|
# return ActionLogProbs(continuous, discrete) |
|
|
|
|
|
|
|
|
|
|
|
class ModelUtils: |
|
|
|
# Minimum supported side for each encoder type. If refactoring an encoder, please |
|
|
|
# adjust these also. |
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
return ActionBuffers(agent_actions.continuous.detach().cpu().numpy(), agent_actions.discrete.detach().cpu().numpy()) |
|
|
|
|
|
|
|
#@staticmethod |
|
|
|
#def action_buffers_to_agent_action( |
|
|
|
# action_buffers: ActionBuffers, dtype: Optional[torch.dtype] = None |
|
|
|
#) -> AgentAction: |
|
|
|
# """ |
|
|
|
# Converts ActionBuffers fields into a AgentAction fields |
|
|
|
# """ |
|
|
|
# return AgentAction(torch.as_tensor(np.asanyarray(action_buffers.continuous), dtype=dtype), |
|
|
|
#torch.as_tensor(np.asanyarray(_disc), dtype=dtype)) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def list_to_tensor( |
|
|
|