|
|
|
|
|
|
|
|
|
|
from mlagents.trainers.ppo.trainer import PPOTrainer, discount_rewards |
|
|
|
from mlagents.trainers.common.nn_policy import NNPolicy |
|
|
|
from mlagents.trainers.models import EncoderType, LearningModel |
|
|
|
from mlagents.trainers.exception import UnityTrainerException |
|
|
|
from mlagents.trainers.brain import BrainParameters, CameraResolution |
|
|
|
from mlagents.trainers.brain import BrainParameters |
|
|
|
from mlagents.trainers.agent_processor import AgentManagerQueue |
|
|
|
from mlagents.trainers.tests import mock_brain as mb |
|
|
|
from mlagents.trainers.tests.mock_brain import make_brain_parameters |
|
|
|
|
|
|
policy = mock.Mock() |
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
trainer.add_policy(brain_params, policy) |
|
|
|
|
|
|
|
|
|
|
|
def test_normalization(dummy_config): |
|
|
|
brain_params = BrainParameters( |
|
|
|
brain_name="test_brain", |
|
|
|
vector_observation_space_size=1, |
|
|
|
camera_resolutions=[], |
|
|
|
vector_action_space_size=[2], |
|
|
|
vector_action_descriptions=[], |
|
|
|
vector_action_space_type=0, |
|
|
|
) |
|
|
|
dummy_config["summary_path"] = "./summaries/test_trainer_summary" |
|
|
|
dummy_config["model_path"] = "./models/test_trainer_models/TestModel" |
|
|
|
trainer = PPOTrainer( |
|
|
|
brain_params.brain_name, 0, dummy_config, True, False, 0, "0", False |
|
|
|
) |
|
|
|
time_horizon = 6 |
|
|
|
trajectory = make_fake_trajectory( |
|
|
|
length=time_horizon, |
|
|
|
max_step_complete=True, |
|
|
|
vec_obs_size=1, |
|
|
|
num_vis_obs=0, |
|
|
|
action_space=[2], |
|
|
|
) |
|
|
|
# Change half of the obs to 0 |
|
|
|
for i in range(3): |
|
|
|
trajectory.steps[i].obs[0] = np.zeros(1, dtype=np.float32) |
|
|
|
policy = trainer.create_policy(brain_params) |
|
|
|
trainer.add_policy(brain_params.brain_name, policy) |
|
|
|
|
|
|
|
trainer._process_trajectory(trajectory) |
|
|
|
|
|
|
|
# Check that the running mean and variance is correct |
|
|
|
steps, mean, variance = trainer.policy.sess.run( |
|
|
|
[ |
|
|
|
trainer.policy.normalization_steps, |
|
|
|
trainer.policy.running_mean, |
|
|
|
trainer.policy.running_variance, |
|
|
|
] |
|
|
|
) |
|
|
|
|
|
|
|
assert steps == 6 |
|
|
|
assert mean[0] == 0.5 |
|
|
|
# Note: variance is divided by number of steps, and initialized to 1 to avoid |
|
|
|
# divide by 0. The right answer is 0.25 |
|
|
|
assert (variance[0] - 1) / steps == 0.25 |
|
|
|
|
|
|
|
# Make another update, this time with all 1's |
|
|
|
time_horizon = 10 |
|
|
|
trajectory = make_fake_trajectory( |
|
|
|
length=time_horizon, |
|
|
|
max_step_complete=True, |
|
|
|
vec_obs_size=1, |
|
|
|
num_vis_obs=0, |
|
|
|
action_space=[2], |
|
|
|
) |
|
|
|
trainer._process_trajectory(trajectory) |
|
|
|
|
|
|
|
# Check that the running mean and variance is correct |
|
|
|
steps, mean, variance = trainer.policy.sess.run( |
|
|
|
[ |
|
|
|
trainer.policy.normalization_steps, |
|
|
|
trainer.policy.running_mean, |
|
|
|
trainer.policy.running_variance, |
|
|
|
] |
|
|
|
) |
|
|
|
|
|
|
|
assert steps == 16 |
|
|
|
assert mean[0] == 0.8125 |
|
|
|
assert (variance[0] - 1) / steps == pytest.approx(0.152, abs=0.01) |
|
|
|
|
|
|
|
|
|
|
|
def test_min_visual_size(): |
|
|
|
# Make sure each EncoderType has an entry in MIS_RESOLUTION_FOR_ENCODER |
|
|
|
assert set(LearningModel.MIN_RESOLUTION_FOR_ENCODER.keys()) == set(EncoderType) |
|
|
|
|
|
|
|
for encoder_type in EncoderType: |
|
|
|
with tf.Graph().as_default(): |
|
|
|
good_size = LearningModel.MIN_RESOLUTION_FOR_ENCODER[encoder_type] |
|
|
|
good_res = CameraResolution( |
|
|
|
width=good_size, height=good_size, num_channels=3 |
|
|
|
) |
|
|
|
LearningModel._check_resolution_for_encoder(good_res, encoder_type) |
|
|
|
vis_input = LearningModel.create_visual_input( |
|
|
|
good_res, "test_min_visual_size" |
|
|
|
) |
|
|
|
enc_func = LearningModel.get_encoder_for_type(encoder_type) |
|
|
|
enc_func(vis_input, 32, LearningModel.swish, 1, "test", False) |
|
|
|
|
|
|
|
# Anything under the min size should raise an exception. If not, decrease the min size! |
|
|
|
with pytest.raises(Exception): |
|
|
|
with tf.Graph().as_default(): |
|
|
|
bad_size = LearningModel.MIN_RESOLUTION_FOR_ENCODER[encoder_type] - 1 |
|
|
|
bad_res = CameraResolution( |
|
|
|
width=bad_size, height=bad_size, num_channels=3 |
|
|
|
) |
|
|
|
|
|
|
|
with pytest.raises(UnityTrainerException): |
|
|
|
# Make sure we'd hit a friendly error during model setup time. |
|
|
|
LearningModel._check_resolution_for_encoder(bad_res, encoder_type) |
|
|
|
|
|
|
|
vis_input = LearningModel.create_visual_input( |
|
|
|
bad_res, "test_min_visual_size" |
|
|
|
) |
|
|
|
enc_func = LearningModel.get_encoder_for_type(encoder_type) |
|
|
|
enc_func(vis_input, 32, LearningModel.swish, 1, "test", False) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |