|
|
|
|
|
|
from unittest import mock |
|
|
|
from typing import List, Tuple |
|
|
|
from typing import List, Tuple, Union |
|
|
|
from collections.abc import Iterable |
|
|
|
from mlagents.trainers.brain import CameraResolution, BrainParameters |
|
|
|
from mlagents.trainers.buffer import AgentBuffer |
|
|
|
from mlagents.trainers.trajectory import Trajectory, AgentExperience |
|
|
|
from mlagents_envs.base_env import ( |
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def create_mock_brainparams( |
|
|
|
number_visual_observations=0, |
|
|
|
vector_action_space_type="continuous", |
|
|
|
vector_observation_space_size=3, |
|
|
|
vector_action_space_size=None, |
|
|
|
): |
|
|
|
""" |
|
|
|
Creates a mock BrainParameters object with parameters. |
|
|
|
""" |
|
|
|
# Avoid using mutable object as default param |
|
|
|
if vector_action_space_size is None: |
|
|
|
vector_action_space_size = [2] |
|
|
|
mock_brain = mock.Mock() |
|
|
|
mock_brain.return_value.number_visual_observations = number_visual_observations |
|
|
|
mock_brain.return_value.vector_action_space_type = vector_action_space_type |
|
|
|
mock_brain.return_value.vector_observation_space_size = ( |
|
|
|
vector_observation_space_size |
|
|
|
) |
|
|
|
camrez = CameraResolution(height=84, width=84, num_channels=3) |
|
|
|
mock_brain.return_value.camera_resolutions = [camrez] * number_visual_observations |
|
|
|
mock_brain.return_value.vector_action_space_size = vector_action_space_size |
|
|
|
mock_brain.return_value.brain_name = "MockBrain" |
|
|
|
return mock_brain() |
|
|
|
|
|
|
|
|
|
|
|
num_agents: int = 1, |
|
|
|
num_vector_observations: int = 0, |
|
|
|
num_vis_observations: int = 0, |
|
|
|
action_shape: List[int] = None, |
|
|
|
num_agents: int, |
|
|
|
observation_shapes: List[Tuple], |
|
|
|
action_shape: Union[int, Tuple[int]] = None, |
|
|
|
discrete: bool = False, |
|
|
|
done: bool = False, |
|
|
|
) -> Tuple[DecisionSteps, TerminalSteps]: |
|
|
|
|
|
|
|
|
|
|
:int num_agents: Number of "agents" to imitate. |
|
|
|
:int num_vector_observations: Number of "observations" in your observation space |
|
|
|
:int num_vis_observations: Number of "observations" in your observation space |
|
|
|
:List observation_shapes: A List of the observation spaces in your steps |
|
|
|
action_shape = [2] |
|
|
|
action_shape = 2 |
|
|
|
for _ in range(num_vis_observations): |
|
|
|
obs_list.append(np.ones((num_agents, 84, 84, 3), dtype=np.float32)) |
|
|
|
if num_vector_observations > 1: |
|
|
|
obs_list.append( |
|
|
|
np.array(num_agents * [num_vector_observations * [1]], dtype=np.float32) |
|
|
|
) |
|
|
|
for _shape in observation_shapes: |
|
|
|
obs_list.append(np.ones((num_agents,) + _shape, dtype=np.float32)) |
|
|
|
if discrete: |
|
|
|
if discrete and isinstance(action_shape, Iterable): |
|
|
|
for action_size in action_shape |
|
|
|
] |
|
|
|
for action_size in action_shape # type: ignore |
|
|
|
] # type: ignore |
|
|
|
[(84, 84, 3)] * num_vis_observations + [(num_vector_observations, 0, 0)], |
|
|
|
observation_shapes, |
|
|
|
action_shape if discrete else action_shape[0], |
|
|
|
action_shape, |
|
|
|
) |
|
|
|
if done: |
|
|
|
return ( |
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def create_steps_from_brainparams( |
|
|
|
brain_params: BrainParameters, num_agents: int = 1 |
|
|
|
def create_steps_from_behavior_spec( |
|
|
|
behavior_spec: BehaviorSpec, num_agents: int = 1 |
|
|
|
num_vector_observations=brain_params.vector_observation_space_size, |
|
|
|
num_vis_observations=brain_params.number_visual_observations, |
|
|
|
action_shape=brain_params.vector_action_space_size, |
|
|
|
discrete=brain_params.vector_action_space_type == "discrete", |
|
|
|
observation_shapes=behavior_spec.observation_shapes, |
|
|
|
action_shape=behavior_spec.action_shape, |
|
|
|
discrete=behavior_spec.is_action_discrete(), |
|
|
|
observation_shapes: List[Tuple], |
|
|
|
vec_obs_size: int = 1, |
|
|
|
num_vis_obs: int = 1, |
|
|
|
action_space: List[int] = None, |
|
|
|
action_space: Union[int, Tuple[int]] = 2, |
|
|
|
memory_size: int = 10, |
|
|
|
is_discrete: bool = True, |
|
|
|
) -> Trajectory: |
|
|
|
|
|
|
""" |
|
|
|
if action_space is None: |
|
|
|
action_space = [2] |
|
|
|
for _j in range(num_vis_obs): |
|
|
|
obs.append(np.ones((84, 84, 3), dtype=np.float32)) |
|
|
|
obs.append(np.ones(vec_obs_size, dtype=np.float32)) |
|
|
|
for _shape in observation_shapes: |
|
|
|
obs.append(np.ones(_shape, dtype=np.float32)) |
|
|
|
action_size = len(action_space) |
|
|
|
action_size = len(action_space) # type: ignore |
|
|
|
action_size = action_space[0] |
|
|
|
action_size = int(action_space) # type: ignore |
|
|
|
[[False for _ in range(branch)] for branch in action_space] |
|
|
|
[[False for _ in range(branch)] for branch in action_space] # type: ignore |
|
|
|
if is_discrete |
|
|
|
else None |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
def simulate_rollout( |
|
|
|
length: int, |
|
|
|
brain_params: BrainParameters, |
|
|
|
behavior_spec: BehaviorSpec, |
|
|
|
vec_obs_size = brain_params.vector_observation_space_size |
|
|
|
num_vis_obs = brain_params.number_visual_observations |
|
|
|
action_space = brain_params.vector_action_space_size |
|
|
|
is_discrete = brain_params.vector_action_space_type == "discrete" |
|
|
|
action_space = behavior_spec.action_shape |
|
|
|
is_discrete = behavior_spec.is_action_discrete() |
|
|
|
vec_obs_size=vec_obs_size, |
|
|
|
num_vis_obs=num_vis_obs, |
|
|
|
behavior_spec.observation_shapes, |
|
|
|
action_space=action_space, |
|
|
|
memory_size=memory_size, |
|
|
|
is_discrete=is_discrete, |
|
|
|
|
|
|
return buffer |
|
|
|
|
|
|
|
|
|
|
|
def setup_mock_brain( |
|
|
|
use_discrete, |
|
|
|
use_visual, |
|
|
|
discrete_action_space=None, |
|
|
|
vector_action_space=None, |
|
|
|
vector_obs_space=8, |
|
|
|
def setup_test_behavior_specs( |
|
|
|
use_discrete=True, use_visual=False, vector_action_space=2, vector_obs_space=8 |
|
|
|
# defaults |
|
|
|
discrete_action_space = ( |
|
|
|
[3, 3, 3, 2] if discrete_action_space is None else discrete_action_space |
|
|
|
behavior_spec = BehaviorSpec( |
|
|
|
[(84, 84, 3)] * int(use_visual) + [(vector_obs_space,)], |
|
|
|
ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS, |
|
|
|
tuple(vector_action_space) if use_discrete else vector_action_space, |
|
|
|
vector_action_space = [2] if vector_action_space is None else vector_action_space |
|
|
|
return behavior_spec |
|
|
|
if not use_visual: |
|
|
|
mock_brain = create_mock_brainparams( |
|
|
|
vector_action_space_type="discrete" if use_discrete else "continuous", |
|
|
|
vector_action_space_size=discrete_action_space |
|
|
|
if use_discrete |
|
|
|
else vector_action_space, |
|
|
|
vector_observation_space_size=vector_obs_space, |
|
|
|
) |
|
|
|
else: |
|
|
|
mock_brain = create_mock_brainparams( |
|
|
|
vector_action_space_type="discrete" if use_discrete else "continuous", |
|
|
|
vector_action_space_size=discrete_action_space |
|
|
|
if use_discrete |
|
|
|
else vector_action_space, |
|
|
|
vector_observation_space_size=0, |
|
|
|
number_visual_observations=1, |
|
|
|
) |
|
|
|
return mock_brain |
|
|
|
|
|
|
|
def create_mock_3dball_brain(): |
|
|
|
mock_brain = create_mock_brainparams( |
|
|
|
vector_action_space_type="continuous", |
|
|
|
vector_action_space_size=[2], |
|
|
|
vector_observation_space_size=8, |
|
|
|
def create_mock_3dball_behavior_specs(): |
|
|
|
return setup_test_behavior_specs( |
|
|
|
False, False, vector_action_space=2, vector_obs_space=8 |
|
|
|
mock_brain.brain_name = "Ball3DBrain" |
|
|
|
return mock_brain |
|
|
|
def create_mock_pushblock_brain(): |
|
|
|
mock_brain = create_mock_brainparams( |
|
|
|
vector_action_space_type="discrete", |
|
|
|
vector_action_space_size=[7], |
|
|
|
vector_observation_space_size=70, |
|
|
|
) |
|
|
|
mock_brain.brain_name = "PushblockLearning" |
|
|
|
return mock_brain |
|
|
|
|
|
|
|
|
|
|
|
def create_mock_banana_brain(): |
|
|
|
mock_brain = create_mock_brainparams( |
|
|
|
number_visual_observations=1, |
|
|
|
vector_action_space_type="discrete", |
|
|
|
vector_action_space_size=[3, 3, 3, 2], |
|
|
|
vector_observation_space_size=0, |
|
|
|
def create_mock_pushblock_behavior_specs(): |
|
|
|
return setup_test_behavior_specs( |
|
|
|
True, False, vector_action_space=7, vector_obs_space=70 |
|
|
|
return mock_brain |
|
|
|
def make_brain_parameters( |
|
|
|
discrete_action: bool = False, |
|
|
|
visual_inputs: int = 0, |
|
|
|
brain_name: str = "RealFakeBrain", |
|
|
|
vec_obs_size: int = 6, |
|
|
|
) -> BrainParameters: |
|
|
|
resolutions = [ |
|
|
|
CameraResolution(width=30, height=40, num_channels=3) |
|
|
|
for _ in range(visual_inputs) |
|
|
|
] |
|
|
|
|
|
|
|
return BrainParameters( |
|
|
|
vector_observation_space_size=vec_obs_size, |
|
|
|
camera_resolutions=resolutions, |
|
|
|
vector_action_space_size=[2], |
|
|
|
vector_action_descriptions=["", ""], |
|
|
|
vector_action_space_type=int(not discrete_action), |
|
|
|
brain_name=brain_name, |
|
|
|
def create_mock_banana_behavior_specs(): |
|
|
|
return setup_test_behavior_specs( |
|
|
|
True, True, vector_action_space=[3, 3, 3, 2], vector_obs_space=0 |
|
|
|
) |