|
|
|
|
|
|
import numpy as np |
|
|
|
import h5py |
|
|
|
from typing import List, BinaryIO |
|
|
|
from typing import List, BinaryIO, Any |
|
|
|
import itertools |
|
|
|
|
|
|
|
from mlagents_envs.exception import UnityException |
|
|
|
|
|
|
|
|
|
|
class AgentBufferField(list): |
|
|
|
""" |
|
|
|
AgentBufferField is a list of numpy arrays. When an agent collects a field, you can add it to his |
|
|
|
AgentBufferField with the append method. |
|
|
|
AgentBufferField is a list of data, usually numpy arrays. When an agent collects a field, |
|
|
|
you can add it to its AgentBufferField with the append method. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
|
def __str__(self): |
|
|
|
return str(np.array(self).shape) |
|
|
|
|
|
|
|
def append(self, element: np.ndarray, padding_value: float = 0.0) -> None: |
|
|
|
def append(self, element: Any, padding_value: Any = 0.0) -> None: |
|
|
|
Adds an element to this list. Also lets you change the padding |
|
|
|
Adds an element to this AgentBuffer. Also lets you change the padding |
|
|
|
type, so that it can be set on append (e.g. action_masks should |
|
|
|
be padded with 1.) |
|
|
|
:param element: The element to append to the list. |
|
|
|
|
|
|
self.padding_value = padding_value |
|
|
|
|
|
|
|
def extend(self, data: np.ndarray) -> None: |
|
|
|
""" |
|
|
|
Adds a list of np.arrays to the end of the list of np.arrays. |
|
|
|
:param data: The np.array list to append. |
|
|
|
def set(self, data: List[Any]) -> None: |
|
|
|
self += list(np.array(data, dtype=np.float32)) |
|
|
|
|
|
|
|
def set(self, data): |
|
|
|
""" |
|
|
|
Sets the list of np.array to the input data |
|
|
|
:param data: The np.array list to be set. |
|
|
|
Sets the AgentBuffer to the provided list |
|
|
|
:param data: The list to be set. |
|
|
|
dtype = None |
|
|
|
if data is not None and len(data) and isinstance(data[0], float): |
|
|
|
dtype = np.float32 |
|
|
|
self[:] = list(np.array(data, dtype=dtype)) |
|
|
|
self[:] = data |
|
|
|
|
|
|
|
def get_batch( |
|
|
|
self, |
|
|
|
|
|
|
) -> np.ndarray: |
|
|
|
) -> List[Any]: |
|
|
|
from the list of np.array |
|
|
|
from the AgentBuffer. |
|
|
|
:param batch_size: The number of elements to retrieve. If None: |
|
|
|
All elements will be retrieved. |
|
|
|
:param training_length: The length of the sequence to be retrieved. If |
|
|
|
|
|
|
) |
|
|
|
if batch_size * training_length > len(self): |
|
|
|
padding = np.array(self[-1], dtype=np.float32) * self.padding_value |
|
|
|
return np.array( |
|
|
|
[padding] * (training_length - leftover) + self[:], |
|
|
|
dtype=np.float32, |
|
|
|
) |
|
|
|
return [padding] * (training_length - leftover) + self[:] |
|
|
|
return np.array( |
|
|
|
self[len(self) - batch_size * training_length :], |
|
|
|
dtype=np.float32, |
|
|
|
) |
|
|
|
return self[len(self) - batch_size * training_length :] |
|
|
|
else: |
|
|
|
# The sequences will have overlapping elements |
|
|
|
if batch_size is None: |
|
|
|
|
|
|
tmp_list: List[np.ndarray] = [] |
|
|
|
for end in range(len(self) - batch_size + 1, len(self) + 1): |
|
|
|
tmp_list += self[end - training_length : end] |
|
|
|
return np.array(tmp_list, dtype=np.float32) |
|
|
|
return tmp_list |
|
|
|
|
|
|
|
def reset_field(self) -> None: |
|
|
|
""" |
|
|
|