浏览代码

Move padding method to AgentBufferField

/develop/coma2/samenet
Ervin Teng 4 年前
当前提交
be45d8c0
共有 3 个文件被更改,包括 50 次插入74 次删除
  1. 38
      ml-agents/mlagents/trainers/buffer.py
  2. 47
      ml-agents/mlagents/trainers/torch/agent_action.py
  3. 39
      ml-agents/mlagents/trainers/trajectory.py

38
ml-agents/mlagents/trainers/buffer.py


"""
self[:] = []
def padded_to_batch(
self, pad_value: np.float = 0, dtype: np.dtype = np.float32
) -> Union[np.ndarray, List[np.ndarray]]:
"""
Converts this AgentBufferField (which is a List[BufferEntry]) into a numpy array
with first dimension equal to the length of this AgentBufferField. If this AgentBufferField
contains a List[List[BufferEntry]] (i.e., in the case of group observations), return a List
containing numpy arrays or tensors, of length equal to the maximum length of an entry. Missing
For entries with less than that length, the array will be padded with pad_value.
:param pad_value: Value to pad List AgentBufferFields, when there are less than the maximum
number of agents present.
:param dtype: Dtype of output numpy array.
:return: Numpy array or List of numpy arrays representing this AgentBufferField, where the first
dimension is equal to the length of the AgentBufferField.
"""
if len(self) > 0 and not isinstance(self[0], list):
return np.asanyarray(self, dytpe=dtype)
shape = None
for _entry in self:
# _entry could be an empty list if there are no group agents in this
# step. Find the first non-empty list and use that shape.
if _entry:
shape = _entry[0].shape
break
# If there were no groupmate agents in the entire batch, return an empty List.
if shape is None:
return []
# Convert to numpy array while padding with 0's
new_list = list(
map(
lambda x: np.asanyarray(x, dtype=dtype),
itertools.zip_longest(*self, fillvalue=np.full(shape, pad_value)),
)
)
return new_list
class AgentBuffer(MutableMapping):
"""

47
ml-agents/mlagents/trainers/torch/agent_action.py


import numpy as np
from mlagents.torch_utils import torch
from mlagents.trainers.buffer import AgentBuffer, BufferKey, AgentBufferField
from mlagents.trainers.buffer import AgentBuffer, BufferKey
from mlagents.trainers.torch.utils import ModelUtils
from mlagents_envs.base_env import ActionTuple

return AgentAction(continuous, discrete)
@staticmethod
def _padded_time_to_batch(
agent_buffer_field: AgentBufferField, dtype: torch.dtype = torch.float32
) -> List[torch.Tensor]:
"""
Pad actions and convert to tensor. Pad the data with 0s where there is no
data. 0 is used instead of NaN because NaN is not a valid entry for integer
tensors, as used for discrete actions.
"""
action_shape = None
for _action in agent_buffer_field:
# _action could be an empty list if there are no group agents in this
# step. Find the first non-empty list and use that shape.
if _action:
action_shape = _action[0].shape
break
# If there were no groupmate agents in the entire batch, return an empty List.
if action_shape is None:
return []
# Convert to tensor while padding with 0's
new_list = list(
map(
lambda x: ModelUtils.list_to_tensor(x, dtype=dtype),
itertools.zip_longest(
*agent_buffer_field, fillvalue=np.full(action_shape, 0)
),
)
)
return new_list
@staticmethod
def _group_agent_action_from_buffer(
buff: AgentBuffer, cont_action_key: BufferKey, disc_action_key: BufferKey
) -> List["AgentAction"]:

continuous_tensors: List[torch.Tensor] = []
discrete_tensors: List[torch.Tensor] = []
if cont_action_key in buff:
continuous_tensors = AgentAction._padded_time_to_batch(
buff[cont_action_key]
)
padded_batch = buff[cont_action_key].padded_to_batch()
continuous_tensors = [
ModelUtils.list_to_tensor(arr) for arr in padded_batch
]
discrete_tensors = AgentAction._padded_time_to_batch(
buff[disc_action_key], dtype=torch.long
)
padded_batch = buff[disc_action_key].padded_to_batch(dtype=np.long)
discrete_tensors = [
ModelUtils.list_to_tensor(arr, dtype=torch.long) for arr in padded_batch
]
actions_list = []
for _cont, _disc in itertools.zip_longest(

39
ml-agents/mlagents/trainers/trajectory.py


from typing import List, NamedTuple
import itertools
AgentBufferField,
ObservationKeyPrefix,
AgentBufferKey,
BufferKey,

return ObservationKeyPrefix.NEXT_GROUP_OBSERVATION, index
@staticmethod
def _padded_time_to_batch(
agent_buffer_field: AgentBufferField,
) -> List[np.ndarray]:
"""
Convert an AgentBufferField of List of obs, where one of the dimension is time and the other is number (e.g.
in the case of a variable number of critic observations) to a List of obs, where time is in the batch dimension
of the obs, and the List is the variable number of agents. For cases where there are varying number of agents,
pad the non-existent agents with NaN.
"""
# Find the first observation. This should be USUALLY O(1)
obs_shape = None
for _group_obs in agent_buffer_field:
if _group_obs:
obs_shape = _group_obs[0].shape
break
# If there were no critic obs at all
if obs_shape is None:
return []
new_list = list(
map(
lambda x: np.asanyarray(x),
itertools.zip_longest(
*agent_buffer_field, fillvalue=np.full(obs_shape, np.nan)
),
)
)
return new_list
@staticmethod
def _transpose_list_of_lists(
list_list: List[List[np.ndarray]],
) -> List[List[np.ndarray]]:

separated_obs: List[np.array] = []
for i in range(num_obs):
separated_obs.append(
GroupObsUtil._padded_time_to_batch(batch[GroupObsUtil.get_name_at(i)])
batch[GroupObsUtil.get_name_at(i)].padded_to_batch(pad_value=np.nan)
)
# separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip
# that and get a List(num_agents) of Lists(num_obs)

separated_obs: List[np.array] = []
for i in range(num_obs):
separated_obs.append(
GroupObsUtil._padded_time_to_batch(
batch[GroupObsUtil.get_name_at_next(i)]
batch[GroupObsUtil.get_name_at_next(i)].padded_to_batch(
pad_value=np.nan
)
)
# separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip

正在加载...
取消
保存