您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
277 行
10 KiB
277 行
10 KiB
from unittest import mock
|
|
import pytest
|
|
import numpy as np
|
|
|
|
from gym import spaces
|
|
from gym_unity.envs import UnityToGymWrapper
|
|
from mlagents_envs.base_env import (
|
|
BehaviorSpec,
|
|
ActionSpec,
|
|
DecisionSteps,
|
|
TerminalSteps,
|
|
BehaviorMapping,
|
|
)
|
|
from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes
|
|
|
|
|
|
def test_gym_wrapper():
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec()
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(mock_spec)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
env = UnityToGymWrapper(mock_env)
|
|
assert isinstance(env.reset(), np.ndarray)
|
|
actions = env.action_space.sample()
|
|
assert actions.shape[0] == 2
|
|
obs, rew, done, info = env.step(actions)
|
|
assert env.observation_space.contains(obs)
|
|
assert isinstance(obs, np.ndarray)
|
|
assert isinstance(rew, float)
|
|
assert isinstance(done, (bool, np.bool_))
|
|
assert isinstance(info, dict)
|
|
|
|
|
|
def test_branched_flatten():
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec(
|
|
vector_action_space_type="discrete", vector_action_space_size=[2, 2, 3]
|
|
)
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
|
|
mock_spec, num_agents=1
|
|
)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
|
|
env = UnityToGymWrapper(mock_env, flatten_branched=True)
|
|
assert isinstance(env.action_space, spaces.Discrete)
|
|
assert env.action_space.n == 12
|
|
assert env._flattener.lookup_action(0) == [0, 0, 0]
|
|
assert env._flattener.lookup_action(11) == [1, 1, 2]
|
|
|
|
# Check that False produces a MultiDiscrete
|
|
env = UnityToGymWrapper(mock_env, flatten_branched=False)
|
|
assert isinstance(env.action_space, spaces.MultiDiscrete)
|
|
|
|
|
|
def test_action_space():
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec(
|
|
vector_action_space_type="discrete", vector_action_space_size=[5]
|
|
)
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
|
|
mock_spec, num_agents=1
|
|
)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
|
|
env = UnityToGymWrapper(mock_env, flatten_branched=True)
|
|
assert isinstance(env.action_space, spaces.Discrete)
|
|
assert env.action_space.n == 5
|
|
|
|
env = UnityToGymWrapper(mock_env, flatten_branched=False)
|
|
assert isinstance(env.action_space, spaces.Discrete)
|
|
assert env.action_space.n == 5
|
|
|
|
|
|
def test_action_space_seed():
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec()
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(mock_spec)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
actions = []
|
|
for _ in range(0, 2):
|
|
env = UnityToGymWrapper(mock_env, action_space_seed=1337)
|
|
env.reset()
|
|
actions.append(env.action_space.sample())
|
|
assert (actions[0] == actions[1]).all()
|
|
|
|
|
|
@pytest.mark.parametrize("use_uint8", [True, False], ids=["float", "uint8"])
|
|
def test_gym_wrapper_visual(use_uint8):
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec(
|
|
number_visual_observations=1, vector_observation_space_size=0
|
|
)
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
|
|
mock_spec, number_visual_observations=1
|
|
)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
|
|
env = UnityToGymWrapper(mock_env, uint8_visual=use_uint8)
|
|
assert isinstance(env.observation_space, spaces.Box)
|
|
assert isinstance(env.reset(), np.ndarray)
|
|
actions = env.action_space.sample()
|
|
assert actions.shape[0] == 2
|
|
obs, rew, done, info = env.step(actions)
|
|
assert env.observation_space.contains(obs)
|
|
assert isinstance(obs, np.ndarray)
|
|
assert isinstance(rew, float)
|
|
assert isinstance(done, (bool, np.bool_))
|
|
assert isinstance(info, dict)
|
|
|
|
|
|
@pytest.mark.parametrize("use_uint8", [True, False], ids=["float", "uint8"])
|
|
def test_gym_wrapper_single_visual_and_vector(use_uint8):
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec(
|
|
number_visual_observations=1,
|
|
vector_observation_space_size=3,
|
|
vector_action_space_size=[2],
|
|
)
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
|
|
mock_spec, number_visual_observations=1
|
|
)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
|
|
env = UnityToGymWrapper(mock_env, uint8_visual=use_uint8, allow_multiple_obs=True)
|
|
assert isinstance(env.observation_space, spaces.Tuple)
|
|
assert len(env.observation_space) == 2
|
|
reset_obs = env.reset()
|
|
assert isinstance(reset_obs, list)
|
|
assert len(reset_obs) == 2
|
|
assert all(isinstance(ob, np.ndarray) for ob in reset_obs)
|
|
assert reset_obs[-1].shape == (3,)
|
|
assert len(reset_obs[0].shape) == 3
|
|
actions = env.action_space.sample()
|
|
assert actions.shape == (2,)
|
|
obs, rew, done, info = env.step(actions)
|
|
assert isinstance(obs, list)
|
|
assert len(obs) == 2
|
|
assert all(isinstance(ob, np.ndarray) for ob in obs)
|
|
assert reset_obs[-1].shape == (3,)
|
|
assert isinstance(rew, float)
|
|
assert isinstance(done, (bool, np.bool_))
|
|
assert isinstance(info, dict)
|
|
|
|
# check behavior for allow_multiple_obs = False
|
|
env = UnityToGymWrapper(mock_env, uint8_visual=use_uint8, allow_multiple_obs=False)
|
|
assert isinstance(env.observation_space, spaces.Box)
|
|
reset_obs = env.reset()
|
|
assert isinstance(reset_obs, np.ndarray)
|
|
assert len(reset_obs.shape) == 3
|
|
actions = env.action_space.sample()
|
|
assert actions.shape == (2,)
|
|
obs, rew, done, info = env.step(actions)
|
|
assert isinstance(obs, np.ndarray)
|
|
|
|
|
|
@pytest.mark.parametrize("use_uint8", [True, False], ids=["float", "uint8"])
|
|
def test_gym_wrapper_multi_visual_and_vector(use_uint8):
|
|
mock_env = mock.MagicMock()
|
|
mock_spec = create_mock_group_spec(
|
|
number_visual_observations=2,
|
|
vector_observation_space_size=3,
|
|
vector_action_space_size=[2],
|
|
)
|
|
mock_decision_step, mock_terminal_step = create_mock_vector_steps(
|
|
mock_spec, number_visual_observations=2
|
|
)
|
|
setup_mock_unityenvironment(
|
|
mock_env, mock_spec, mock_decision_step, mock_terminal_step
|
|
)
|
|
|
|
env = UnityToGymWrapper(mock_env, uint8_visual=use_uint8, allow_multiple_obs=True)
|
|
assert isinstance(env.observation_space, spaces.Tuple)
|
|
assert len(env.observation_space) == 3
|
|
reset_obs = env.reset()
|
|
assert isinstance(reset_obs, list)
|
|
assert len(reset_obs) == 3
|
|
assert all(isinstance(ob, np.ndarray) for ob in reset_obs)
|
|
assert reset_obs[-1].shape == (3,)
|
|
actions = env.action_space.sample()
|
|
assert actions.shape == (2,)
|
|
obs, rew, done, info = env.step(actions)
|
|
assert all(isinstance(ob, np.ndarray) for ob in obs)
|
|
assert isinstance(rew, float)
|
|
assert isinstance(done, (bool, np.bool_))
|
|
assert isinstance(info, dict)
|
|
|
|
# check behavior for allow_multiple_obs = False
|
|
env = UnityToGymWrapper(mock_env, uint8_visual=use_uint8, allow_multiple_obs=False)
|
|
assert isinstance(env.observation_space, spaces.Box)
|
|
reset_obs = env.reset()
|
|
assert isinstance(reset_obs, np.ndarray)
|
|
assert len(reset_obs.shape) == 3
|
|
actions = env.action_space.sample()
|
|
assert actions.shape == (2,)
|
|
obs, rew, done, info = env.step(actions)
|
|
assert isinstance(obs, np.ndarray)
|
|
|
|
|
|
# Helper methods
|
|
|
|
|
|
def create_mock_group_spec(
|
|
number_visual_observations=0,
|
|
vector_action_space_type="continuous",
|
|
vector_observation_space_size=3,
|
|
vector_action_space_size=None,
|
|
):
|
|
"""
|
|
Creates a mock BrainParameters object with parameters.
|
|
"""
|
|
# Avoid using mutable object as default param
|
|
if vector_action_space_type == "continuous":
|
|
if vector_action_space_size is None:
|
|
vector_action_space_size = 2
|
|
else:
|
|
vector_action_space_size = vector_action_space_size[0]
|
|
action_spec = ActionSpec.create_continuous(vector_action_space_size)
|
|
else:
|
|
if vector_action_space_size is None:
|
|
vector_action_space_size = (2,)
|
|
else:
|
|
vector_action_space_size = tuple(vector_action_space_size)
|
|
action_spec = ActionSpec.create_discrete(vector_action_space_size)
|
|
obs_shapes = [(vector_observation_space_size,)]
|
|
for _ in range(number_visual_observations):
|
|
obs_shapes += [(8, 8, 3)]
|
|
obs_spec = create_observation_specs_with_shapes(obs_shapes)
|
|
return BehaviorSpec(obs_spec, action_spec)
|
|
|
|
|
|
def create_mock_vector_steps(specs, num_agents=1, number_visual_observations=0):
|
|
"""
|
|
Creates a mock BatchedStepResult with vector observations. Imitates constant
|
|
vector observations, rewards, dones, and agents.
|
|
|
|
:BehaviorSpecs specs: The BehaviorSpecs for this mock
|
|
:int num_agents: Number of "agents" to imitate in your BatchedStepResult values.
|
|
"""
|
|
obs = [np.array([num_agents * [1, 2, 3]]).reshape(num_agents, 3)]
|
|
if number_visual_observations:
|
|
obs += [
|
|
np.zeros(shape=(num_agents, 8, 8, 3), dtype=np.float32)
|
|
] * number_visual_observations
|
|
rewards = np.array(num_agents * [1.0])
|
|
agents = np.array(range(0, num_agents))
|
|
group_id = np.array(num_agents * [0])
|
|
group_rewards = np.array(num_agents * [0.0])
|
|
return (
|
|
DecisionSteps(obs, rewards, agents, None, group_id, group_rewards),
|
|
TerminalSteps.empty(specs),
|
|
)
|
|
|
|
|
|
def setup_mock_unityenvironment(mock_env, mock_spec, mock_decision, mock_termination):
|
|
"""
|
|
Takes a mock UnityEnvironment and adds the appropriate properties, defined by the mock
|
|
GroupSpec and BatchedStepResult.
|
|
|
|
:Mock mock_env: A mock UnityEnvironment, usually empty.
|
|
:Mock mock_spec: An AgentGroupSpec object that specifies the params of this environment.
|
|
:Mock mock_decision: A DecisionSteps object that will be returned at each step and reset.
|
|
:Mock mock_termination: A TerminationSteps object that will be returned at each step and reset.
|
|
"""
|
|
mock_env.behavior_specs = BehaviorMapping({"MockBrain": mock_spec})
|
|
mock_env.get_steps.return_value = (mock_decision, mock_termination)
|