|
|
|
|
|
|
from unittest import mock |
|
|
|
from typing import List |
|
|
|
from mlagents.trainers.trajectory import Trajectory, AgentExperience |
|
|
|
from mlagents_envs.base_env import BatchedStepResult |
|
|
|
|
|
|
|
|
|
|
|
def create_mock_brainparams( |
|
|
|
|
|
|
return mock_brain() |
|
|
|
|
|
|
|
|
|
|
|
def create_mock_braininfo( |
|
|
|
num_agents=1, |
|
|
|
num_vector_observations=0, |
|
|
|
num_vis_observations=0, |
|
|
|
num_vector_acts=2, |
|
|
|
discrete=False, |
|
|
|
num_discrete_branches=1, |
|
|
|
): |
|
|
|
def create_mock_batchedstep( |
|
|
|
num_agents: int = 1, |
|
|
|
num_vector_observations: int = 0, |
|
|
|
num_vis_observations: int = 0, |
|
|
|
action_shape: List[int] = None, |
|
|
|
discrete: bool = False, |
|
|
|
) -> BatchedStepResult: |
|
|
|
Creates a mock BrainInfo with observations. Imitates constant |
|
|
|
Creates a mock BatchedStepResult with observations. Imitates constant |
|
|
|
:int num_agents: Number of "agents" to imitate in your BrainInfo values. |
|
|
|
:int num_agents: Number of "agents" to imitate. |
|
|
|
mock_braininfo = mock.Mock() |
|
|
|
if action_shape is None: |
|
|
|
action_shape = [2] |
|
|
|
mock_braininfo.return_value.visual_observations = num_vis_observations * [ |
|
|
|
np.ones((num_agents, 84, 84, 3), dtype=np.float32) |
|
|
|
] |
|
|
|
mock_braininfo.return_value.vector_observations = np.array( |
|
|
|
num_agents * [num_vector_observations * [1]], dtype=np.float32 |
|
|
|
) |
|
|
|
if discrete: |
|
|
|
mock_braininfo.return_value.previous_vector_actions = np.array( |
|
|
|
num_agents * [num_discrete_branches * [0.5]], dtype=np.float32 |
|
|
|
obs_list = [] |
|
|
|
for _ in range(num_vis_observations): |
|
|
|
obs_list.append(np.ones((num_agents, 84, 84, 3), dtype=np.float32)) |
|
|
|
if num_vector_observations > 1: |
|
|
|
obs_list.append( |
|
|
|
np.array(num_agents * [num_vector_observations * [1]], dtype=np.float32) |
|
|
|
mock_braininfo.return_value.action_masks = np.array( |
|
|
|
num_agents * [num_vector_acts * [1.0]], dtype=np.float32 |
|
|
|
) |
|
|
|
else: |
|
|
|
mock_braininfo.return_value.previous_vector_actions = np.array( |
|
|
|
num_agents * [num_vector_acts * [0.5]], dtype=np.float32 |
|
|
|
) |
|
|
|
mock_braininfo.return_value.memories = np.ones((num_agents, 8), dtype=np.float32) |
|
|
|
mock_braininfo.return_value.rewards = num_agents * [1.0] |
|
|
|
mock_braininfo.return_value.local_done = num_agents * [False] |
|
|
|
mock_braininfo.return_value.max_reached = num_agents * [100] |
|
|
|
mock_braininfo.return_value.action_masks = num_agents * [num_vector_acts * [1.0]] |
|
|
|
mock_braininfo.return_value.agents = range(0, num_agents) |
|
|
|
return mock_braininfo() |
|
|
|
action_mask = None |
|
|
|
if discrete: |
|
|
|
action_mask = [ |
|
|
|
np.array(num_agents * [action_size * [False]]) |
|
|
|
for action_size in action_shape |
|
|
|
] |
|
|
|
|
|
|
|
reward = np.array(num_agents * [1.0], dtype=np.float32) |
|
|
|
done = np.array(num_agents * [False], dtype=np.bool) |
|
|
|
max_step = np.array(num_agents * [False], dtype=np.bool) |
|
|
|
agent_id = np.arange(num_agents, dtype=np.int32) |
|
|
|
|
|
|
|
return BatchedStepResult(obs_list, reward, done, max_step, agent_id, action_mask) |
|
|
|
|
|
|
|
|
|
|
|
def create_batchedstep_from_brainparams( |
|
|
|
brain_params: BrainParameters, num_agents: int = 1 |
|
|
|
) -> BatchedStepResult: |
|
|
|
return create_mock_batchedstep( |
|
|
|
num_agents=num_agents, |
|
|
|
num_vector_observations=brain_params.vector_observation_space_size, |
|
|
|
num_vis_observations=brain_params.number_visual_observations, |
|
|
|
action_shape=brain_params.vector_action_space_size, |
|
|
|
discrete=brain_params.vector_action_space_type == "discrete", |
|
|
|
) |
|
|
|
def setup_mock_unityenvironment(mock_env, mock_brain, mock_braininfo): |
|
|
|
def make_fake_trajectory( |
|
|
|
length: int, |
|
|
|
max_step_complete: bool = False, |
|
|
|
vec_obs_size: int = 1, |
|
|
|
num_vis_obs: int = 1, |
|
|
|
action_space: List[int] = None, |
|
|
|
memory_size: int = 10, |
|
|
|
is_discrete: bool = True, |
|
|
|
) -> Trajectory: |
|
|
|
Takes a mock UnityEnvironment and adds the appropriate properties, defined by the mock |
|
|
|
BrainParameters and BrainInfo. |
|
|
|
Makes a fake trajectory of length length. If max_step_complete, |
|
|
|
the trajectory is terminated by a max step rather than a done. |
|
|
|
""" |
|
|
|
if action_space is None: |
|
|
|
action_space = [2] |
|
|
|
steps_list = [] |
|
|
|
for _i in range(length - 1): |
|
|
|
obs = [] |
|
|
|
for _j in range(num_vis_obs): |
|
|
|
obs.append(np.ones((84, 84, 3), dtype=np.float32)) |
|
|
|
obs.append(np.ones(vec_obs_size, dtype=np.float32)) |
|
|
|
reward = 1.0 |
|
|
|
done = False |
|
|
|
if is_discrete: |
|
|
|
action_size = len(action_space) |
|
|
|
else: |
|
|
|
action_size = action_space[0] |
|
|
|
action = np.zeros(action_size, dtype=np.float32) |
|
|
|
action_probs = np.ones(action_size, dtype=np.float32) |
|
|
|
action_pre = np.zeros(action_size, dtype=np.float32) |
|
|
|
action_mask = ( |
|
|
|
[[False for _ in range(branch)] for branch in action_space] |
|
|
|
if is_discrete |
|
|
|
else None |
|
|
|
) |
|
|
|
prev_action = np.ones(action_size, dtype=np.float32) |
|
|
|
max_step = False |
|
|
|
memory = np.ones(memory_size, dtype=np.float32) |
|
|
|
agent_id = "test_agent" |
|
|
|
behavior_id = "test_brain" |
|
|
|
experience = AgentExperience( |
|
|
|
obs=obs, |
|
|
|
reward=reward, |
|
|
|
done=done, |
|
|
|
action=action, |
|
|
|
action_probs=action_probs, |
|
|
|
action_pre=action_pre, |
|
|
|
action_mask=action_mask, |
|
|
|
prev_action=prev_action, |
|
|
|
max_step=max_step, |
|
|
|
memory=memory, |
|
|
|
) |
|
|
|
steps_list.append(experience) |
|
|
|
last_experience = AgentExperience( |
|
|
|
obs=obs, |
|
|
|
reward=reward, |
|
|
|
done=not max_step_complete, |
|
|
|
action=action, |
|
|
|
action_probs=action_probs, |
|
|
|
action_pre=action_pre, |
|
|
|
action_mask=action_mask, |
|
|
|
prev_action=prev_action, |
|
|
|
max_step=max_step_complete, |
|
|
|
memory=memory, |
|
|
|
) |
|
|
|
steps_list.append(last_experience) |
|
|
|
return Trajectory( |
|
|
|
steps=steps_list, agent_id=agent_id, behavior_id=behavior_id, next_obs=obs |
|
|
|
) |
|
|
|
:Mock mock_env: A mock UnityEnvironment, usually empty. |
|
|
|
:Mock mock_brain: A mock Brain object that specifies the params of this environment. |
|
|
|
:Mock mock_braininfo: A mock BrainInfo object that will be returned at each step and reset. |
|
|
|
""" |
|
|
|
brain_name = mock_brain.brain_name |
|
|
|
mock_env.return_value.academy_name = "MockAcademy" |
|
|
|
mock_env.return_value.brains = {brain_name: mock_brain} |
|
|
|
mock_env.return_value.external_brain_names = [brain_name] |
|
|
|
mock_env.return_value.reset.return_value = {brain_name: mock_braininfo} |
|
|
|
mock_env.return_value.step.return_value = {brain_name: mock_braininfo} |
|
|
|
def simulate_rollout( |
|
|
|
length: int, |
|
|
|
brain_params: BrainParameters, |
|
|
|
memory_size: int = 10, |
|
|
|
exclude_key_list: List[str] = None, |
|
|
|
) -> AgentBuffer: |
|
|
|
vec_obs_size = brain_params.vector_observation_space_size |
|
|
|
num_vis_obs = brain_params.number_visual_observations |
|
|
|
action_space = brain_params.vector_action_space_size |
|
|
|
is_discrete = brain_params.vector_action_space_type == "discrete" |
|
|
|
def simulate_rollout(env, policy, buffer_init_samples, exclude_key_list=None): |
|
|
|
brain_info_list = [] |
|
|
|
for _ in range(buffer_init_samples): |
|
|
|
brain_info_list.append(env.step()[env.external_brain_names[0]]) |
|
|
|
buffer = create_buffer(brain_info_list, policy.brain, policy.sequence_length) |
|
|
|
trajectory = make_fake_trajectory( |
|
|
|
length, |
|
|
|
vec_obs_size=vec_obs_size, |
|
|
|
num_vis_obs=num_vis_obs, |
|
|
|
action_space=action_space, |
|
|
|
memory_size=memory_size, |
|
|
|
is_discrete=is_discrete, |
|
|
|
) |
|
|
|
buffer = trajectory.to_agentbuffer() |
|
|
|
# If a key_list was given, remove those keys |
|
|
|
if exclude_key_list: |
|
|
|
for key in exclude_key_list: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_buffer(brain_infos, brain_params, sequence_length, memory_size=8): |
|
|
|
buffer = AgentBuffer() |
|
|
|
update_buffer = AgentBuffer() |
|
|
|
# Make a buffer |
|
|
|
for idx, experience in enumerate(brain_infos): |
|
|
|
if idx > len(brain_infos) - 2: |
|
|
|
break |
|
|
|
current_brain_info = experience |
|
|
|
next_brain_info = brain_infos[idx + 1] |
|
|
|
buffer.last_brain_info = current_brain_info |
|
|
|
buffer["done"].append(next_brain_info.local_done[0]) |
|
|
|
buffer["rewards"].append(next_brain_info.rewards[0]) |
|
|
|
for i in range(brain_params.number_visual_observations): |
|
|
|
buffer["visual_obs%d" % i].append( |
|
|
|
current_brain_info.visual_observations[i][0] |
|
|
|
) |
|
|
|
buffer["next_visual_obs%d" % i].append( |
|
|
|
current_brain_info.visual_observations[i][0] |
|
|
|
) |
|
|
|
if brain_params.vector_observation_space_size > 0: |
|
|
|
buffer["vector_obs"].append(current_brain_info.vector_observations[0]) |
|
|
|
buffer["next_vector_in"].append(current_brain_info.vector_observations[0]) |
|
|
|
fake_action_size = len(brain_params.vector_action_space_size) |
|
|
|
if brain_params.vector_action_space_type == "continuous": |
|
|
|
fake_action_size = brain_params.vector_action_space_size[0] |
|
|
|
buffer["actions"].append(np.zeros(fake_action_size, dtype=np.float32)) |
|
|
|
buffer["prev_action"].append(np.zeros(fake_action_size, dtype=np.float32)) |
|
|
|
buffer["masks"].append(1.0) |
|
|
|
buffer["advantages"].append(1.0) |
|
|
|
if brain_params.vector_action_space_type == "discrete": |
|
|
|
buffer["action_probs"].append( |
|
|
|
np.ones(sum(brain_params.vector_action_space_size), dtype=np.float32) |
|
|
|
) |
|
|
|
else: |
|
|
|
buffer["action_probs"].append( |
|
|
|
np.ones(buffer["actions"][0].shape, dtype=np.float32) |
|
|
|
) |
|