|
|
|
|
|
|
import numpy as np |
|
|
|
from mlagents.torch_utils import torch |
|
|
|
|
|
|
|
from mlagents.trainers.buffer import AgentBuffer, BufferKey, AgentBufferField |
|
|
|
from mlagents.trainers.buffer import AgentBuffer, BufferKey |
|
|
|
from mlagents.trainers.torch.utils import ModelUtils |
|
|
|
from mlagents_envs.base_env import ActionTuple |
|
|
|
|
|
|
|
|
|
|
return AgentAction(continuous, discrete) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _padded_time_to_batch( |
|
|
|
agent_buffer_field: AgentBufferField, dtype: torch.dtype = torch.float32 |
|
|
|
) -> List[torch.Tensor]: |
|
|
|
""" |
|
|
|
Pad actions and convert to tensor. Pad the data with 0s where there is no |
|
|
|
data. 0 is used instead of NaN because NaN is not a valid entry for integer |
|
|
|
tensors, as used for discrete actions. |
|
|
|
""" |
|
|
|
action_shape = None |
|
|
|
for _action in agent_buffer_field: |
|
|
|
# _action could be an empty list if there are no group agents in this |
|
|
|
# step. Find the first non-empty list and use that shape. |
|
|
|
if _action: |
|
|
|
action_shape = _action[0].shape |
|
|
|
break |
|
|
|
# If there were no groupmate agents in the entire batch, return an empty List. |
|
|
|
if action_shape is None: |
|
|
|
return [] |
|
|
|
|
|
|
|
# Convert to tensor while padding with 0's |
|
|
|
new_list = list( |
|
|
|
map( |
|
|
|
lambda x: ModelUtils.list_to_tensor(x, dtype=dtype), |
|
|
|
itertools.zip_longest( |
|
|
|
*agent_buffer_field, fillvalue=np.full(action_shape, 0) |
|
|
|
), |
|
|
|
) |
|
|
|
) |
|
|
|
return new_list |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _group_agent_action_from_buffer( |
|
|
|
buff: AgentBuffer, cont_action_key: BufferKey, disc_action_key: BufferKey |
|
|
|
) -> List["AgentAction"]: |
|
|
|
|
|
|
continuous_tensors: List[torch.Tensor] = [] |
|
|
|
discrete_tensors: List[torch.Tensor] = [] |
|
|
|
if cont_action_key in buff: |
|
|
|
continuous_tensors = AgentAction._padded_time_to_batch( |
|
|
|
buff[cont_action_key] |
|
|
|
) |
|
|
|
padded_batch = buff[cont_action_key].padded_to_batch() |
|
|
|
continuous_tensors = [ |
|
|
|
ModelUtils.list_to_tensor(arr) for arr in padded_batch |
|
|
|
] |
|
|
|
discrete_tensors = AgentAction._padded_time_to_batch( |
|
|
|
buff[disc_action_key], dtype=torch.long |
|
|
|
) |
|
|
|
padded_batch = buff[disc_action_key].padded_to_batch(dtype=np.long) |
|
|
|
discrete_tensors = [ |
|
|
|
ModelUtils.list_to_tensor(arr, dtype=torch.long) for arr in padded_batch |
|
|
|
] |
|
|
|
|
|
|
|
actions_list = [] |
|
|
|
for _cont, _disc in itertools.zip_longest( |
|
|
|