import math import tempfile import pytest import numpy as np import attr from typing import Dict from mlagents.trainers.tests.simple_test_envs import ( SimpleEnvironment, MemoryEnvironment, RecordEnvironment, ) from mlagents.trainers.trainer_controller import TrainerController from mlagents.trainers.trainer import TrainerFactory from mlagents.trainers.simple_env_manager import SimpleEnvManager from mlagents.trainers.demo_loader import write_demo from mlagents.trainers.stats import StatsReporter, StatsWriter, StatsSummary from mlagents.trainers.settings import ( NetworkSettings, SelfPlaySettings, BehavioralCloningSettings, GAILSettings, RewardSignalType, EncoderType, FrameworkType, ) from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager from mlagents_envs.side_channel.environment_parameters_channel import ( EnvironmentParametersChannel, ) from mlagents_envs.communicator_objects.demonstration_meta_pb2 import ( DemonstrationMetaProto, ) from mlagents_envs.communicator_objects.brain_parameters_pb2 import BrainParametersProto from mlagents_envs.communicator_objects.space_type_pb2 import discrete, continuous from mlagents.trainers.tests.dummy_config import ppo_dummy_config, sac_dummy_config PPO_TF_CONFIG = attr.evolve(ppo_dummy_config(), framework=FrameworkType.TENSORFLOW) SAC_TF_CONFIG = attr.evolve(sac_dummy_config(), framework=FrameworkType.TENSORFLOW) BRAIN_NAME = "1D" # The reward processor is passed as an argument to _check_environment_trains. # It is applied to the list of all final rewards for each brain individually. # This is so that we can process all final rewards in different ways for different algorithms. # Custom reward processors should be built within the test function and passed to _check_environment_trains # Default is average over the last 5 final rewards def default_reward_processor(rewards, last_n_rewards=5): rewards_to_use = rewards[-last_n_rewards:] # For debugging tests print(f"Last {last_n_rewards} rewards:", rewards_to_use) return np.array(rewards[-last_n_rewards:], dtype=np.float32).mean() class DebugWriter(StatsWriter): """ Print to stdout so stats can be viewed in pytest """ def __init__(self): self._last_reward_summary: Dict[str, float] = {} def get_last_rewards(self): return self._last_reward_summary def write_stats( self, category: str, values: Dict[str, StatsSummary], step: int ) -> None: for val, stats_summary in values.items(): if val == "Environment/Cumulative Reward": print(step, val, stats_summary.mean) self._last_reward_summary[category] = stats_summary.mean def _check_environment_trains( env, trainer_config, reward_processor=default_reward_processor, env_parameter_manager=None, success_threshold=0.9, env_manager=None, ): if env_parameter_manager is None: env_parameter_manager = EnvironmentParameterManager() # Create controller and begin training. with tempfile.TemporaryDirectory() as dir: run_id = "id" seed = 1337 StatsReporter.writers.clear() # Clear StatsReporters so we don't write to file debug_writer = DebugWriter() StatsReporter.add_writer(debug_writer) if env_manager is None: env_manager = SimpleEnvManager(env, EnvironmentParametersChannel()) trainer_factory = TrainerFactory( trainer_config=trainer_config, output_path=dir, train_model=True, load_model=False, seed=seed, param_manager=env_parameter_manager, multi_gpu=False, ) tc = TrainerController( trainer_factory=trainer_factory, output_path=dir, run_id=run_id, param_manager=env_parameter_manager, train=True, training_seed=seed, ) # Begin training tc.start_learning(env_manager) if ( success_threshold is not None ): # For tests where we are just checking setup and not reward processed_rewards = [ reward_processor(rewards) for rewards in env.final_rewards.values() ] assert all(not math.isnan(reward) for reward in processed_rewards) assert all(reward > success_threshold for reward in processed_rewards) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_ppo(action_sizes): env = SimpleEnvironment([BRAIN_NAME], action_sizes=action_sizes) config = attr.evolve(PPO_TF_CONFIG, framework=FrameworkType.TENSORFLOW) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("action_sizes", [(0, 2), (2, 0)]) def test_2d_ppo(action_sizes): env = SimpleEnvironment([BRAIN_NAME], action_sizes=action_sizes, step_size=0.8) new_hyperparams = attr.evolve( PPO_TF_CONFIG.hyperparameters, batch_size=64, buffer_size=640 ) config = attr.evolve( PPO_TF_CONFIG, hyperparameters=new_hyperparams, max_steps=10000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) @pytest.mark.parametrize("num_visual", [1, 2]) def test_visual_ppo(num_visual, action_sizes): env = SimpleEnvironment( [BRAIN_NAME], action_sizes=action_sizes, num_visual=num_visual, num_vector=0, step_size=0.2, ) new_hyperparams = attr.evolve(PPO_TF_CONFIG.hyperparameters, learning_rate=3.0e-4) config = attr.evolve( PPO_TF_CONFIG, hyperparameters=new_hyperparams, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("num_visual", [1, 2]) @pytest.mark.parametrize("vis_encode_type", ["resnet", "nature_cnn", "match3"]) def test_visual_advanced_ppo(vis_encode_type, num_visual): env = SimpleEnvironment( [BRAIN_NAME], action_sizes=(0, 1), num_visual=num_visual, num_vector=0, step_size=0.5, vis_obs_size=(5, 5, 5) if vis_encode_type == "match3" else (36, 36, 3), ) new_networksettings = attr.evolve( SAC_TF_CONFIG.network_settings, vis_encode_type=EncoderType(vis_encode_type) ) new_hyperparams = attr.evolve(PPO_TF_CONFIG.hyperparameters, learning_rate=3.0e-4) config = attr.evolve( PPO_TF_CONFIG, hyperparameters=new_hyperparams, network_settings=new_networksettings, max_steps=300, summary_freq=100, framework=FrameworkType.TENSORFLOW, ) # The number of steps is pretty small for these encoders _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.5) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_recurrent_ppo(action_sizes): env = MemoryEnvironment([BRAIN_NAME], action_sizes=action_sizes) new_network_settings = attr.evolve( PPO_TF_CONFIG.network_settings, memory=NetworkSettings.MemorySettings(memory_size=16), ) new_hyperparams = attr.evolve( PPO_TF_CONFIG.hyperparameters, learning_rate=1.0e-3, batch_size=64, buffer_size=128, ) config = attr.evolve( PPO_TF_CONFIG, hyperparameters=new_hyperparams, network_settings=new_network_settings, max_steps=5000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.9) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_sac(action_sizes): env = SimpleEnvironment([BRAIN_NAME], action_sizes=action_sizes) config = attr.evolve(SAC_TF_CONFIG, framework=FrameworkType.TENSORFLOW) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("action_sizes", [(0, 2), (2, 0)]) def test_2d_sac(action_sizes): env = SimpleEnvironment( [BRAIN_NAME], action_sizes=action_sizes, action_size=2, step_size=0.8 ) new_hyperparams = attr.evolve(SAC_TF_CONFIG.hyperparameters, buffer_init_steps=2000) config = attr.evolve( SAC_TF_CONFIG, hyperparameters=new_hyperparams, max_steps=10000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.8) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) @pytest.mark.parametrize("num_visual", [1, 2]) def test_visual_sac(num_visual, action_sizes): env = SimpleEnvironment( [BRAIN_NAME], action_sizes=action_sizes, num_visual=num_visual, num_vector=0, step_size=0.2, ) new_hyperparams = attr.evolve( SAC_TF_CONFIG.hyperparameters, batch_size=16, learning_rate=3e-4 ) config = attr.evolve( SAC_TF_CONFIG, hyperparameters=new_hyperparams, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("num_visual", [1, 2]) @pytest.mark.parametrize("vis_encode_type", ["resnet", "nature_cnn", "match3"]) def test_visual_advanced_sac(vis_encode_type, num_visual): env = SimpleEnvironment( [BRAIN_NAME], action_sizes=(0, 1), num_visual=num_visual, num_vector=0, step_size=0.5, vis_obs_size=(5, 5, 5) if vis_encode_type == "match3" else (36, 36, 3), ) new_networksettings = attr.evolve( SAC_TF_CONFIG.network_settings, vis_encode_type=EncoderType(vis_encode_type) ) new_hyperparams = attr.evolve( SAC_TF_CONFIG.hyperparameters, batch_size=16, learning_rate=3e-4, buffer_init_steps=0, ) config = attr.evolve( SAC_TF_CONFIG, hyperparameters=new_hyperparams, network_settings=new_networksettings, max_steps=100, framework=FrameworkType.TENSORFLOW, ) # The number of steps is pretty small for these encoders _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.5) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_recurrent_sac(action_sizes): step_size = 0.2 if action_sizes else 0.5 env = MemoryEnvironment( [BRAIN_NAME], action_sizes=action_sizes, step_size=step_size ) new_networksettings = attr.evolve( SAC_TF_CONFIG.network_settings, memory=NetworkSettings.MemorySettings(memory_size=16), ) new_hyperparams = attr.evolve( SAC_TF_CONFIG.hyperparameters, batch_size=128, learning_rate=1e-3, buffer_init_steps=1000, steps_per_update=2, ) config = attr.evolve( SAC_TF_CONFIG, hyperparameters=new_hyperparams, network_settings=new_networksettings, max_steps=4000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_ghost(action_sizes): env = SimpleEnvironment( [BRAIN_NAME + "?team=0", BRAIN_NAME + "?team=1"], action_sizes=action_sizes ) self_play_settings = SelfPlaySettings( play_against_latest_model_ratio=1.0, save_steps=2000, swap_steps=2000 ) config = attr.evolve( PPO_TF_CONFIG, self_play=self_play_settings, max_steps=2500, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_ghost_fails(action_sizes): env = SimpleEnvironment( [BRAIN_NAME + "?team=0", BRAIN_NAME + "?team=1"], action_sizes=action_sizes ) # This config should fail because the ghosted policy is never swapped with a competent policy. # Swap occurs after max step is reached. self_play_settings = SelfPlaySettings( play_against_latest_model_ratio=1.0, save_steps=2000, swap_steps=4000 ) config = attr.evolve( PPO_TF_CONFIG, self_play=self_play_settings, max_steps=2500, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=None) processed_rewards = [ default_reward_processor(rewards) for rewards in env.final_rewards.values() ] success_threshold = 0.9 assert any(reward > success_threshold for reward in processed_rewards) and any( reward < success_threshold for reward in processed_rewards ) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_asymm_ghost(action_sizes): # Make opponent for asymmetric case brain_name_opp = BRAIN_NAME + "Opp" env = SimpleEnvironment( [BRAIN_NAME + "?team=0", brain_name_opp + "?team=1"], action_sizes=action_sizes ) self_play_settings = SelfPlaySettings( play_against_latest_model_ratio=1.0, save_steps=10000, swap_steps=10000, team_change=400, ) config = attr.evolve( PPO_TF_CONFIG, self_play=self_play_settings, max_steps=4000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config, brain_name_opp: config}) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_simple_asymm_ghost_fails(action_sizes): # Make opponent for asymmetric case brain_name_opp = BRAIN_NAME + "Opp" env = SimpleEnvironment( [BRAIN_NAME + "?team=0", brain_name_opp + "?team=1"], action_sizes=action_sizes ) # This config should fail because the team that us not learning when both have reached # max step should be executing the initial, untrained poliy. self_play_settings = SelfPlaySettings( play_against_latest_model_ratio=0.0, save_steps=5000, swap_steps=5000, team_change=2000, ) config = attr.evolve( PPO_TF_CONFIG, self_play=self_play_settings, max_steps=3000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains( env, {BRAIN_NAME: config, brain_name_opp: config}, success_threshold=None ) processed_rewards = [ default_reward_processor(rewards) for rewards in env.final_rewards.values() ] success_threshold = 0.9 assert any(reward > success_threshold for reward in processed_rewards) and any( reward < success_threshold for reward in processed_rewards ) @pytest.fixture(scope="session") def simple_record(tmpdir_factory): def record_demo(action_sizes, num_visual=0, num_vector=1): env = RecordEnvironment( [BRAIN_NAME], action_sizes=action_sizes, num_visual=num_visual, num_vector=num_vector, n_demos=100, ) # If we want to use true demos, we can solve the env in the usual way # Otherwise, we can just call solve to execute the optimal policy env.solve() continuous_size, discrete_size = action_sizes use_discrete = True if discrete_size > 0 else False agent_info_protos = env.demonstration_protos[BRAIN_NAME] meta_data_proto = DemonstrationMetaProto() brain_param_proto = BrainParametersProto( vector_action_size=[2] if use_discrete else [1], vector_action_descriptions=[""], vector_action_space_type=discrete if use_discrete else continuous, brain_name=BRAIN_NAME, is_training=True, ) action_type = "Discrete" if use_discrete else "Continuous" demo_path_name = "1DTest" + action_type + ".demo" demo_path = str(tmpdir_factory.mktemp("tmp_demo").join(demo_path_name)) write_demo(demo_path, meta_data_proto, brain_param_proto, agent_info_protos) return demo_path return record_demo @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) @pytest.mark.parametrize("trainer_config", [PPO_TF_CONFIG, SAC_TF_CONFIG]) def test_gail(simple_record, action_sizes, trainer_config): demo_path = simple_record(action_sizes) env = SimpleEnvironment([BRAIN_NAME], action_sizes=action_sizes, step_size=0.2) bc_settings = BehavioralCloningSettings(demo_path=demo_path, steps=1000) reward_signals = { RewardSignalType.GAIL: GAILSettings(encoding_size=32, demo_path=demo_path) } config = attr.evolve( trainer_config, reward_signals=reward_signals, behavioral_cloning=bc_settings, max_steps=500, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.9) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_gail_visual_ppo(simple_record, action_sizes): demo_path = simple_record(action_sizes, num_visual=1, num_vector=0) env = SimpleEnvironment( [BRAIN_NAME], num_visual=1, num_vector=0, action_sizes=action_sizes, step_size=0.2, ) bc_settings = BehavioralCloningSettings(demo_path=demo_path, steps=1500) reward_signals = { RewardSignalType.GAIL: GAILSettings(encoding_size=32, demo_path=demo_path) } hyperparams = attr.evolve(PPO_TF_CONFIG.hyperparameters, learning_rate=3e-4) config = attr.evolve( PPO_TF_CONFIG, reward_signals=reward_signals, hyperparameters=hyperparams, behavioral_cloning=bc_settings, max_steps=1000, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.9) @pytest.mark.parametrize("action_sizes", [(0, 1), (1, 0)]) def test_gail_visual_sac(simple_record, action_sizes): demo_path = simple_record(action_sizes, num_visual=1, num_vector=0) env = SimpleEnvironment( [BRAIN_NAME], num_visual=1, num_vector=0, action_sizes=action_sizes, step_size=0.2, ) bc_settings = BehavioralCloningSettings(demo_path=demo_path, steps=1000) reward_signals = { RewardSignalType.GAIL: GAILSettings(encoding_size=32, demo_path=demo_path) } hyperparams = attr.evolve( SAC_TF_CONFIG.hyperparameters, learning_rate=3e-4, batch_size=16 ) config = attr.evolve( SAC_TF_CONFIG, reward_signals=reward_signals, hyperparameters=hyperparams, behavioral_cloning=bc_settings, max_steps=500, framework=FrameworkType.TENSORFLOW, ) _check_environment_trains(env, {BRAIN_NAME: config}, success_threshold=0.9)