Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

232 行
7.6 KiB

import pytest
import os
from typing import Dict, Any
import numpy as np
from mlagents.tf_utils import tf
import yaml
from mlagents.trainers.policy.nn_policy import NNPolicy
from mlagents.trainers.models import EncoderType, ModelUtils
from mlagents.trainers.exception import UnityTrainerException
from mlagents.trainers.brain import BrainParameters, CameraResolution
from mlagents.trainers.tests import mock_brain as mb
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
@pytest.fixture
def dummy_config():
return yaml.safe_load(
"""
trainer: ppo
batch_size: 32
beta: 5.0e-3
buffer_size: 512
epsilon: 0.2
hidden_units: 128
lambd: 0.95
learning_rate: 3.0e-4
max_steps: 5.0e4
normalize: true
num_epoch: 5
num_layers: 2
time_horizon: 64
sequence_length: 64
summary_freq: 1000
use_recurrent: false
normalize: true
memory_size: 8
curiosity_strength: 0.0
curiosity_enc_size: 1
output_path: test
reward_signals:
extrinsic:
strength: 1.0
gamma: 0.99
"""
)
VECTOR_ACTION_SPACE = [2]
VECTOR_OBS_SPACE = 8
DISCRETE_ACTION_SPACE = [3, 3, 3, 2]
BUFFER_INIT_SAMPLES = 32
NUM_AGENTS = 12
def create_policy_mock(
dummy_config: Dict[str, Any],
use_rnn: bool = False,
use_discrete: bool = True,
use_visual: bool = False,
load: bool = False,
seed: int = 0,
) -> NNPolicy:
mock_brain = mb.setup_mock_brain(
use_discrete,
use_visual,
vector_action_space=VECTOR_ACTION_SPACE,
vector_obs_space=VECTOR_OBS_SPACE,
discrete_action_space=DISCRETE_ACTION_SPACE,
)
trainer_parameters = dummy_config
trainer_parameters["keep_checkpoints"] = 3
trainer_parameters["use_recurrent"] = use_rnn
policy = NNPolicy(seed, mock_brain, trainer_parameters, False, load)
return policy
def test_load_save(dummy_config, tmp_path):
path1 = os.path.join(tmp_path, "runid1")
path2 = os.path.join(tmp_path, "runid2")
trainer_params = dummy_config
trainer_params["output_path"] = path1
policy = create_policy_mock(trainer_params)
policy.initialize_or_load()
policy.save_model(2000)
assert len(os.listdir(tmp_path)) > 0
# Try load from this path
policy2 = create_policy_mock(trainer_params, load=True, seed=1)
policy2.initialize_or_load()
_compare_two_policies(policy, policy2)
# Try initialize from path 1
trainer_params["model_path"] = path2
trainer_params["init_path"] = path1
policy3 = create_policy_mock(trainer_params, load=False, seed=2)
policy3.initialize_or_load()
_compare_two_policies(policy2, policy3)
def _compare_two_policies(policy1: NNPolicy, policy2: NNPolicy) -> None:
"""
Make sure two policies have the same output for the same input.
"""
decision_step, _ = mb.create_steps_from_brainparams(policy1.brain, num_agents=1)
run_out1 = policy1.evaluate(decision_step, list(decision_step.agent_id))
run_out2 = policy2.evaluate(decision_step, list(decision_step.agent_id))
np.testing.assert_array_equal(run_out2["log_probs"], run_out1["log_probs"])
@pytest.mark.parametrize("discrete", [True, False], ids=["discrete", "continuous"])
@pytest.mark.parametrize("visual", [True, False], ids=["visual", "vector"])
@pytest.mark.parametrize("rnn", [True, False], ids=["rnn", "no_rnn"])
def test_policy_evaluate(dummy_config, rnn, visual, discrete):
# Test evaluate
tf.reset_default_graph()
policy = create_policy_mock(
dummy_config, use_rnn=rnn, use_discrete=discrete, use_visual=visual
)
decision_step, terminal_step = mb.create_steps_from_brainparams(
policy.brain, num_agents=NUM_AGENTS
)
run_out = policy.evaluate(decision_step, list(decision_step.agent_id))
if discrete:
run_out["action"].shape == (NUM_AGENTS, len(DISCRETE_ACTION_SPACE))
else:
assert run_out["action"].shape == (NUM_AGENTS, VECTOR_ACTION_SPACE[0])
def test_normalization(dummy_config):
brain_params = BrainParameters(
brain_name="test_brain",
vector_observation_space_size=1,
camera_resolutions=[],
vector_action_space_size=[2],
vector_action_descriptions=[],
vector_action_space_type=0,
)
dummy_config["output_path"] = "./results/test_trainer_models/TestModel"
time_horizon = 6
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=[2],
)
# Change half of the obs to 0
for i in range(3):
trajectory.steps[i].obs[0] = np.zeros(1, dtype=np.float32)
policy = policy = NNPolicy(0, brain_params, dummy_config, False, False)
trajectory_buffer = trajectory.to_agentbuffer()
policy.update_normalization(trajectory_buffer["vector_obs"])
# Check that the running mean and variance is correct
steps, mean, variance = policy.sess.run(
[policy.normalization_steps, policy.running_mean, policy.running_variance]
)
assert steps == 6
assert mean[0] == 0.5
# Note: variance is divided by number of steps, and initialized to 1 to avoid
# divide by 0. The right answer is 0.25
assert (variance[0] - 1) / steps == 0.25
# Make another update, this time with all 1's
time_horizon = 10
trajectory = make_fake_trajectory(
length=time_horizon,
max_step_complete=True,
vec_obs_size=1,
num_vis_obs=0,
action_space=[2],
)
trajectory_buffer = trajectory.to_agentbuffer()
policy.update_normalization(trajectory_buffer["vector_obs"])
# Check that the running mean and variance is correct
steps, mean, variance = policy.sess.run(
[policy.normalization_steps, policy.running_mean, policy.running_variance]
)
assert steps == 16
assert mean[0] == 0.8125
assert (variance[0] - 1) / steps == pytest.approx(0.152, abs=0.01)
def test_min_visual_size():
# Make sure each EncoderType has an entry in MIS_RESOLUTION_FOR_ENCODER
assert set(ModelUtils.MIN_RESOLUTION_FOR_ENCODER.keys()) == set(EncoderType)
for encoder_type in EncoderType:
with tf.Graph().as_default():
good_size = ModelUtils.MIN_RESOLUTION_FOR_ENCODER[encoder_type]
good_res = CameraResolution(
width=good_size, height=good_size, num_channels=3
)
vis_input = ModelUtils.create_visual_input(good_res, "test_min_visual_size")
ModelUtils._check_resolution_for_encoder(vis_input, encoder_type)
enc_func = ModelUtils.get_encoder_for_type(encoder_type)
enc_func(vis_input, 32, ModelUtils.swish, 1, "test", False)
# Anything under the min size should raise an exception. If not, decrease the min size!
with pytest.raises(Exception):
with tf.Graph().as_default():
bad_size = ModelUtils.MIN_RESOLUTION_FOR_ENCODER[encoder_type] - 1
bad_res = CameraResolution(
width=bad_size, height=bad_size, num_channels=3
)
vis_input = ModelUtils.create_visual_input(
bad_res, "test_min_visual_size"
)
with pytest.raises(UnityTrainerException):
# Make sure we'd hit a friendly error during model setup time.
ModelUtils._check_resolution_for_encoder(vis_input, encoder_type)
enc_func = ModelUtils.get_encoder_for_type(encoder_type)
enc_func(vis_input, 32, ModelUtils.swish, 1, "test", False)
if __name__ == "__main__":
pytest.main()