浏览代码

Kill the ProcessingBuffer

/develop-newnormalization
Ervin Teng 5 年前
当前提交
336ca456
共有 5 个文件被更改,包括 93 次插入209 次删除
  1. 123
      ml-agents/mlagents/trainers/agent_processor.py
  2. 32
      ml-agents/mlagents/trainers/demo_loader.py
  3. 43
      ml-agents/mlagents/trainers/tests/mock_brain.py
  4. 95
      ml-agents/mlagents/trainers/tests/test_buffer.py
  5. 9
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py

123
ml-agents/mlagents/trainers/agent_processor.py


from collections import defaultdict
import numpy as np
from mlagents.trainers.buffer import AgentBuffer, BufferException
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.trajectory import (
Trajectory,

self.policy.save_previous_action(
curr_info.agents, take_action_outputs["action"]
)
class ProcessingBuffer(dict):
"""
ProcessingBuffer contains a dictionary of AgentBuffer. The AgentBuffers are indexed by agent_id.
TODO: Remove.
"""
def __str__(self):
return "local_buffers :\n{0}".format(
"\n".join(["\tagent {0} :{1}".format(k, str(self[k])) for k in self.keys()])
)
def __getitem__(self, key):
if key not in self.keys():
self[key] = AgentBuffer()
return super().__getitem__(key)
def reset_local_buffers(self) -> None:
"""
Resets all the local AgentBuffers.
"""
for buf in self.values():
buf.reset_agent()
def append_to_update_buffer(
self,
update_buffer: AgentBuffer,
agent_id: str,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of an agent to the update buffer.
:param update_buffer: A reference to an AgentBuffer to append the agent's buffer to
:param agent_id: The id of the agent which data will be appended
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
if key_list is None:
key_list = self[agent_id].keys()
if not self[agent_id].check_length(key_list):
raise BufferException(
"The length of the fields {0} for agent {1} were not of same length".format(
key_list, agent_id
)
)
for field_key in key_list:
update_buffer[field_key].extend(
self[agent_id][field_key].get_batch(
batch_size=batch_size, training_length=training_length
)
)
def agent_to_trajectory(
self,
agent_id: str,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> Trajectory:
"""
Creates a Trajectory containing the AgentExperiences belonging to agent agent_id.
:param agent_id: The id of the agent which data will be appended
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
if key_list is None:
key_list = self[agent_id].keys()
if not self[agent_id].check_length(key_list):
raise BufferException(
"The length of the fields {0} for agent {1} were not of same length".format(
key_list, agent_id
)
)
# trajectory = Trajectory()
trajectory_list: List[AgentExperience] = []
for _exp in range(self[agent_id].num_experiences):
obs = []
if "vector_obs" in key_list:
obs.append(self[agent_id]["vector_obs"][_exp])
memory = self[agent_id]["memory"][_exp] if "memory" in key_list else None
# Assemble AgentExperience
experience = AgentExperience(
obs=obs,
reward=self[agent_id]["environment_rewards"][_exp],
done=self[agent_id]["done"][_exp],
action=self[agent_id]["actions"][_exp],
action_probs=self[agent_id]["action_probs"][_exp],
action_pre=self[agent_id]["actions_pre"][_exp],
action_mask=self[agent_id]["action_mask"][_exp],
prev_action=self[agent_id]["prev_action"][_exp],
agent_id=agent_id,
memory=memory,
epsilon=self[agent_id]["random_normal_epsilon"][_exp],
)
bootstrap_step = BootstrapExperience(obs=obs, agent_id=agent_id)
trajectory_list.append(experience)
trajectory = Trajectory(steps=trajectory_list, bootstrap_step=bootstrap_step)
return trajectory
def append_all_agent_batch_to_update_buffer(
self,
update_buffer: AgentBuffer,
key_list: List[str] = None,
batch_size: int = None,
training_length: int = None,
) -> None:
"""
Appends the buffer of all agents to the update buffer.
:param key_list: The fields that must be added. If None: all fields will be appended.
:param batch_size: The number of elements that must be appended. If None: All of them will be.
:param training_length: The length of the samples that must be appended. If None: only takes one element.
"""
for agent_id in self.keys():
self.append_to_update_buffer(
update_buffer, agent_id, key_list, batch_size, training_length
)

32
ml-agents/mlagents/trainers/demo_loader.py


from typing import List, Tuple
import numpy as np
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
from mlagents.envs.brain import BrainParameters, BrainInfo
from mlagents.envs.communicator_objects.agent_info_action_pair_pb2 import (
AgentInfoActionPairProto,

sequence_length: int,
) -> AgentBuffer:
# Create and populate buffer using experiences
demo_process_buffer = ProcessingBuffer()
demo_buffer = AgentBuffer()
demo_raw_buffer = AgentBuffer()
demo_processed_buffer = AgentBuffer()
for idx, experience in enumerate(pair_infos):
if idx > len(pair_infos) - 2:
break

previous_action = np.array(
pair_infos[idx - 1].action_info.vector_actions, dtype=np.float32
)
demo_process_buffer[0].last_brain_info = current_brain_info
demo_process_buffer[0]["done"].append(next_brain_info.local_done[0])
demo_process_buffer[0]["rewards"].append(next_brain_info.rewards[0])
demo_raw_buffer["done"].append(next_brain_info.local_done[0])
demo_raw_buffer["rewards"].append(next_brain_info.rewards[0])
demo_process_buffer[0]["visual_obs%d" % i].append(
demo_raw_buffer["visual_obs%d" % i].append(
demo_process_buffer[0]["vector_obs"].append(
demo_raw_buffer["vector_obs"].append(
demo_process_buffer[0]["actions"].append(
current_pair_info.action_info.vector_actions
)
demo_process_buffer[0]["prev_action"].append(previous_action)
demo_raw_buffer["actions"].append(current_pair_info.action_info.vector_actions)
demo_raw_buffer["prev_action"].append(previous_action)
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
demo_process_buffer.reset_local_buffers()
demo_process_buffer.append_to_update_buffer(
demo_buffer, 0, batch_size=None, training_length=sequence_length
demo_raw_buffer.reset_agent()
demo_raw_buffer.resequence_and_append(
demo_processed_buffer, batch_size=None, training_length=sequence_length
return demo_buffer
return demo_processed_buffer
@timed

43
ml-agents/mlagents/trainers/tests/mock_brain.py


from mlagents.envs.brain import CameraResolution, BrainParameters
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
def create_mock_brainparams(

def create_buffer(brain_infos, brain_params, sequence_length, memory_size=8):
buffer = ProcessingBuffer()
buffer = AgentBuffer()
update_buffer = AgentBuffer()
# Make a buffer
for idx, experience in enumerate(brain_infos):

next_brain_info = brain_infos[idx + 1]
buffer[0].last_brain_info = current_brain_info
buffer[0]["done"].append(next_brain_info.local_done[0])
buffer[0]["rewards"].append(next_brain_info.rewards[0])
buffer.last_brain_info = current_brain_info
buffer["done"].append(next_brain_info.local_done[0])
buffer["rewards"].append(next_brain_info.rewards[0])
buffer[0]["visual_obs%d" % i].append(
buffer["visual_obs%d" % i].append(
buffer[0]["next_visual_obs%d" % i].append(
buffer["next_visual_obs%d" % i].append(
buffer[0]["vector_obs"].append(current_brain_info.vector_observations[0])
buffer[0]["next_vector_in"].append(
current_brain_info.vector_observations[0]
)
buffer["vector_obs"].append(current_brain_info.vector_observations[0])
buffer["next_vector_in"].append(current_brain_info.vector_observations[0])
buffer[0]["actions"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer[0]["prev_action"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer[0]["masks"].append(1.0)
buffer[0]["advantages"].append(1.0)
buffer["actions"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer["prev_action"].append(np.zeros(fake_action_size, dtype=np.float32))
buffer["masks"].append(1.0)
buffer["advantages"].append(1.0)
buffer[0]["action_probs"].append(
buffer["action_probs"].append(
buffer[0]["action_probs"].append(
buffer["action_probs"].append(
buffer[0]["actions_pre"].append(
np.ones(buffer[0]["actions"][0].shape, dtype=np.float32)
buffer["actions_pre"].append(
np.ones(buffer["actions"][0].shape, dtype=np.float32)
buffer[0]["action_mask"].append(
buffer["action_mask"].append(
buffer[0]["memory"].append(np.ones(memory_size, dtype=np.float32))
buffer["memory"].append(np.ones(memory_size, dtype=np.float32))
buffer.append_to_update_buffer(
update_buffer, 0, batch_size=None, training_length=sequence_length
buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=sequence_length
)
return update_buffer

95
ml-agents/mlagents/trainers/tests/test_buffer.py


import numpy as np
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.agent_processor import ProcessingBuffer
def assert_array(a, b):

assert la[i] == lb[i]
def construct_fake_processing_buffer():
b = ProcessingBuffer()
for fake_agent_id in range(4):
for step in range(9):
b[fake_agent_id]["vector_observation"].append(
[
100 * fake_agent_id + 10 * step + 1,
100 * fake_agent_id + 10 * step + 2,
100 * fake_agent_id + 10 * step + 3,
]
)
b[fake_agent_id]["action"].append(
[
100 * fake_agent_id + 10 * step + 4,
100 * fake_agent_id + 10 * step + 5,
]
)
def construct_fake_buffer(fake_agent_id):
b = AgentBuffer()
for step in range(9):
b["vector_observation"].append(
[
100 * fake_agent_id + 10 * step + 1,
100 * fake_agent_id + 10 * step + 2,
100 * fake_agent_id + 10 * step + 3,
]
)
b["action"].append(
[100 * fake_agent_id + 10 * step + 4, 100 * fake_agent_id + 10 * step + 5]
)
b = construct_fake_processing_buffer()
a = b[1]["vector_observation"].get_batch(
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
agent_3_buffer = construct_fake_buffer(3)
a = agent_1_buffer["vector_observation"].get_batch(
a = b[2]["vector_observation"].get_batch(
a = agent_2_buffer["vector_observation"].get_batch(
batch_size=2, training_length=3, sequential=True
)
assert_array(

]
),
)
a = b[2]["vector_observation"].get_batch(
a = agent_2_buffer["vector_observation"].get_batch(
batch_size=2, training_length=3, sequential=False
)
assert_array(

]
),
)
b[4].reset_agent()
assert len(b[4]) == 0
agent_1_buffer.reset_agent()
assert agent_1_buffer.num_experiences == 0
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_3_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
assert len(update_buffer["action"]) == 20
assert np.array(update_buffer["action"]).shape == (20, 2)

def test_buffer_sample():
b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
# Test non-LSTM
mb = update_buffer.sample_mini_batch(batch_size=4, sequence_length=1)
assert mb.keys() == update_buffer.keys()

def test_num_experiences():
b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
assert len(update_buffer["action"]) == 20
assert update_buffer.num_experiences == 20

b = construct_fake_processing_buffer()
agent_1_buffer = construct_fake_buffer(1)
agent_2_buffer = construct_fake_buffer(2)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
b.append_to_update_buffer(update_buffer, 3, batch_size=None, training_length=2)
b.append_to_update_buffer(update_buffer, 2, batch_size=None, training_length=2)
agent_1_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
agent_2_buffer.resequence_and_append(
update_buffer, batch_size=None, training_length=2
)
# Test LSTM, truncate should be some multiple of sequence_length
update_buffer.truncate(4, sequence_length=3)
assert update_buffer.num_experiences == 3

9
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


import mlagents.trainers.tests.mock_brain as mb
import numpy as np
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.tests.test_buffer import construct_fake_processing_buffer
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.tests.test_buffer import construct_fake_buffer
@pytest.fixture

def test_clear_update_buffer():
trainer = create_rl_trainer()
trainer.processing_buffer = construct_fake_processing_buffer()
trainer.update_buffer = AgentBuffer()
trainer.processing_buffer.append_to_update_buffer(
trainer.update_buffer, 2, batch_size=None, training_length=2
)
trainer.update_buffer = construct_fake_buffer(0)
trainer.clear_update_buffer()
for _, arr in trainer.update_buffer.items():
assert len(arr) == 0
正在加载...
取消
保存