您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
313 行
13 KiB
313 行
13 KiB
import numpy as np
|
|
import h5py
|
|
from typing import List, BinaryIO, Any
|
|
import itertools
|
|
|
|
from mlagents_envs.exception import UnityException
|
|
|
|
|
|
class BufferException(UnityException):
|
|
"""
|
|
Related to errors with the Buffer.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class AgentBuffer(dict):
|
|
"""
|
|
AgentBuffer contains a dictionary of AgentBufferFields. Each agent has his own AgentBuffer.
|
|
The keys correspond to the name of the field. Example: state, action
|
|
"""
|
|
|
|
class AgentBufferField(list):
|
|
"""
|
|
AgentBufferField is a list of data, usually numpy arrays. When an agent collects a field,
|
|
you can add it to its AgentBufferField with the append method.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.padding_value = 0
|
|
super().__init__()
|
|
|
|
def __str__(self):
|
|
return str(np.array(self).shape)
|
|
|
|
def append(self, element: Any, padding_value: Any = 0.0) -> None:
|
|
"""
|
|
Adds an element to this AgentBuffer. Also lets you change the padding
|
|
type, so that it can be set on append (e.g. action_masks should
|
|
be padded with 1.)
|
|
:param element: The element to append to the list.
|
|
:param padding_value: The value used to pad when get_batch is called.
|
|
"""
|
|
super().append(element)
|
|
self.padding_value = padding_value
|
|
|
|
def set(self, data: List[Any]) -> None:
|
|
"""
|
|
Sets the AgentBuffer to the provided list
|
|
:param data: The list to be set.
|
|
"""
|
|
# Make sure we convert incoming data to float32 if it's a float
|
|
self[:] = []
|
|
self[:] = data
|
|
|
|
def get_batch(
|
|
self,
|
|
batch_size: int = None,
|
|
training_length: int = 1,
|
|
sequential: bool = True,
|
|
) -> List[Any]:
|
|
"""
|
|
Retrieve the last batch_size elements of length training_length
|
|
from the AgentBuffer.
|
|
:param batch_size: The number of elements to retrieve. If None:
|
|
All elements will be retrieved.
|
|
:param training_length: The length of the sequence to be retrieved. If
|
|
None: only takes one element.
|
|
:param sequential: If true and training_length is not None: the elements
|
|
will not repeat in the sequence. [a,b,c,d,e] with training_length = 2 and
|
|
sequential=True gives [[0,a],[b,c],[d,e]]. If sequential=False gives
|
|
[[a,b],[b,c],[c,d],[d,e]]
|
|
"""
|
|
if sequential:
|
|
# The sequences will not have overlapping elements (this involves padding)
|
|
leftover = len(self) % training_length
|
|
# leftover is the number of elements in the first sequence (this sequence might need 0 padding)
|
|
if batch_size is None:
|
|
# retrieve the maximum number of elements
|
|
batch_size = len(self) // training_length + 1 * (leftover != 0)
|
|
# The maximum number of sequences taken from a list of length len(self) without overlapping
|
|
# with padding is equal to batch_size
|
|
if batch_size > (len(self) // training_length + 1 * (leftover != 0)):
|
|
raise BufferException(
|
|
"The batch size and training length requested for get_batch where"
|
|
" too large given the current number of data points."
|
|
)
|
|
if batch_size * training_length > len(self):
|
|
padding = np.array(self[-1], dtype=np.float32) * self.padding_value
|
|
return [padding] * (training_length - leftover) + self[:]
|
|
else:
|
|
return self[len(self) - batch_size * training_length :]
|
|
else:
|
|
# The sequences will have overlapping elements
|
|
if batch_size is None:
|
|
# retrieve the maximum number of elements
|
|
batch_size = len(self) - training_length + 1
|
|
# The number of sequences of length training_length taken from a list of len(self) elements
|
|
# with overlapping is equal to batch_size
|
|
if (len(self) - training_length + 1) < batch_size:
|
|
raise BufferException(
|
|
"The batch size and training length requested for get_batch where"
|
|
" too large given the current number of data points."
|
|
)
|
|
tmp_list: List[np.ndarray] = []
|
|
for end in range(len(self) - batch_size + 1, len(self) + 1):
|
|
tmp_list += self[end - training_length : end]
|
|
return tmp_list
|
|
|
|
def reset_field(self) -> None:
|
|
"""
|
|
Resets the AgentBufferField
|
|
"""
|
|
self[:] = []
|
|
|
|
def __init__(self):
|
|
self.last_brain_info = None
|
|
self.last_take_action_outputs = None
|
|
super().__init__()
|
|
|
|
def __str__(self):
|
|
return ", ".join(["'{}' : {}".format(k, str(self[k])) for k in self.keys()])
|
|
|
|
def reset_agent(self) -> None:
|
|
"""
|
|
Resets the AgentBuffer
|
|
"""
|
|
for k in self.keys():
|
|
self[k].reset_field()
|
|
self.last_brain_info = None
|
|
self.last_take_action_outputs = None
|
|
|
|
def __getitem__(self, key):
|
|
if key not in self.keys():
|
|
self[key] = self.AgentBufferField()
|
|
return super().__getitem__(key)
|
|
|
|
def check_length(self, key_list: List[str]) -> bool:
|
|
"""
|
|
Some methods will require that some fields have the same length.
|
|
check_length will return true if the fields in key_list
|
|
have the same length.
|
|
:param key_list: The fields which length will be compared
|
|
"""
|
|
if len(key_list) < 2:
|
|
return True
|
|
length = None
|
|
for key in key_list:
|
|
if key not in self.keys():
|
|
return False
|
|
if (length is not None) and (length != len(self[key])):
|
|
return False
|
|
length = len(self[key])
|
|
return True
|
|
|
|
def shuffle(self, sequence_length: int, key_list: List[str] = None) -> None:
|
|
"""
|
|
Shuffles the fields in key_list in a consistent way: The reordering will
|
|
be the same across fields.
|
|
:param key_list: The fields that must be shuffled.
|
|
"""
|
|
if key_list is None:
|
|
key_list = list(self.keys())
|
|
if not self.check_length(key_list):
|
|
raise BufferException(
|
|
"Unable to shuffle if the fields are not of same length"
|
|
)
|
|
s = np.arange(len(self[key_list[0]]) // sequence_length)
|
|
np.random.shuffle(s)
|
|
for key in key_list:
|
|
tmp: List[np.ndarray] = []
|
|
for i in s:
|
|
tmp += self[key][i * sequence_length : (i + 1) * sequence_length]
|
|
self[key][:] = tmp
|
|
|
|
def make_mini_batch(self, start: int, end: int) -> "AgentBuffer":
|
|
"""
|
|
Creates a mini-batch from buffer.
|
|
:param start: Starting index of buffer.
|
|
:param end: Ending index of buffer.
|
|
:return: Dict of mini batch.
|
|
"""
|
|
mini_batch = AgentBuffer()
|
|
for key in self:
|
|
mini_batch[key] = self[key][start:end]
|
|
return mini_batch
|
|
|
|
def sample_mini_batch(
|
|
self, batch_size: int, sequence_length: int = 1
|
|
) -> "AgentBuffer":
|
|
"""
|
|
Creates a mini-batch from a random start and end.
|
|
:param batch_size: number of elements to withdraw.
|
|
:param sequence_length: Length of sequences to sample.
|
|
Number of sequences to sample will be batch_size/sequence_length.
|
|
"""
|
|
num_seq_to_sample = batch_size // sequence_length
|
|
mini_batch = AgentBuffer()
|
|
buff_len = self.num_experiences
|
|
num_sequences_in_buffer = buff_len // sequence_length
|
|
start_idxes = (
|
|
np.random.randint(num_sequences_in_buffer, size=num_seq_to_sample)
|
|
* sequence_length
|
|
) # Sample random sequence starts
|
|
for key in self:
|
|
mb_list = [self[key][i : i + sequence_length] for i in start_idxes]
|
|
# See comparison of ways to make a list from a list of lists here:
|
|
# https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
|
|
mini_batch[key].set(list(itertools.chain.from_iterable(mb_list)))
|
|
return mini_batch
|
|
|
|
def save_to_file(self, file_object: BinaryIO) -> None:
|
|
"""
|
|
Saves the AgentBuffer to a file-like object.
|
|
"""
|
|
with h5py.File(file_object, "w") as write_file:
|
|
for key, data in self.items():
|
|
write_file.create_dataset(key, data=data, dtype="f", compression="gzip")
|
|
|
|
def load_from_file(self, file_object: BinaryIO) -> None:
|
|
"""
|
|
Loads the AgentBuffer from a file-like object.
|
|
"""
|
|
with h5py.File(file_object, "r") as read_file:
|
|
for key in list(read_file.keys()):
|
|
self[key] = AgentBuffer.AgentBufferField()
|
|
# extend() will convert the numpy array's first dimension into list
|
|
self[key].extend(read_file[key][()])
|
|
|
|
def truncate(self, max_length: int, sequence_length: int = 1) -> None:
|
|
"""
|
|
Truncates the buffer to a certain length.
|
|
|
|
This can be slow for large buffers. We compensate by cutting further than we need to, so that
|
|
we're not truncating at each update. Note that we must truncate an integer number of sequence_lengths
|
|
param: max_length: The length at which to truncate the buffer.
|
|
"""
|
|
current_length = self.num_experiences
|
|
# make max_length an integer number of sequence_lengths
|
|
max_length -= max_length % sequence_length
|
|
if current_length > max_length:
|
|
for _key in self.keys():
|
|
self[_key][:] = self[_key][current_length - max_length :]
|
|
|
|
def resequence_and_append(
|
|
self,
|
|
target_buffer: "AgentBuffer",
|
|
key_list: List[str] = None,
|
|
batch_size: int = None,
|
|
training_length: int = None,
|
|
) -> None:
|
|
"""
|
|
Takes in a batch size and training length (sequence length), and appends this AgentBuffer to target_buffer
|
|
properly padded for LSTM use. Optionally, use key_list to restrict which fields are inserted into the new
|
|
buffer.
|
|
:param target_buffer: The buffer which to append the samples to.
|
|
:param key_list: The fields that must be added. If None: all fields will be appended.
|
|
:param batch_size: The number of elements that must be appended. If None: All of them will be.
|
|
:param training_length: The length of the samples that must be appended. If None: only takes one element.
|
|
"""
|
|
if key_list is None:
|
|
key_list = list(self.keys())
|
|
if not self.check_length(key_list):
|
|
raise BufferException(
|
|
f"The length of the fields {key_list} were not of same length"
|
|
)
|
|
for field_key in key_list:
|
|
target_buffer[field_key].extend(
|
|
self[field_key].get_batch(
|
|
batch_size=batch_size, training_length=training_length
|
|
)
|
|
)
|
|
|
|
@property
|
|
def num_experiences(self) -> int:
|
|
"""
|
|
The number of agent experiences in the AgentBuffer, i.e. the length of the buffer.
|
|
|
|
An experience consists of one element across all of the fields of this AgentBuffer.
|
|
Note that these all have to be the same length, otherwise shuffle and append_to_update_buffer
|
|
will fail.
|
|
"""
|
|
if self.values():
|
|
return len(next(iter(self.values())))
|
|
else:
|
|
return 0
|
|
|
|
@staticmethod
|
|
def obs_list_to_obs_batch(obs_list: List[List[np.ndarray]]) -> List[np.ndarray]:
|
|
"""
|
|
Converts a List of obs (an obs itself consinsting of a List of np.ndarray) to
|
|
a List of np.ndarray, with the observations batchwise.
|
|
"""
|
|
# Transpose and convert List of Lists
|
|
new_list = list(map(lambda x: np.asanyarray(list(x)), zip(*obs_list)))
|
|
return new_list
|
|
|
|
@staticmethod
|
|
def obs_list_list_to_obs_batch(
|
|
obs_list_list: List[List[List[np.ndarray]]]
|
|
) -> List[List[np.ndarray]]:
|
|
"""
|
|
Convert a List of List of obs, where one of the dimension is time and the other is number (e.g. in the
|
|
case of a variable number of critic observations) to a List of obs, where time is in the batch dimension
|
|
of the obs, and the List is the variable number of agents.
|
|
"""
|
|
new_list = list(
|
|
map(
|
|
lambda x: AgentBuffer.obs_list_to_obs_batch(list(x)),
|
|
zip(*obs_list_list),
|
|
)
|
|
)
|
|
return new_list
|