import unittest.mock as mock import pytest import yaml import numpy as np from mlagents.tf_utils import tf from mlagents.trainers.sac.models import SACModel from mlagents.trainers.sac.policy import SACPolicy from mlagents.trainers.sac.trainer import SACTrainer from mlagents.trainers.tests import mock_brain as mb from mlagents.trainers.tests.mock_brain import make_brain_parameters @pytest.fixture def dummy_config(): return yaml.safe_load( """ trainer: sac batch_size: 32 buffer_size: 10240 buffer_init_steps: 0 hidden_units: 32 init_entcoef: 0.1 learning_rate: 3.0e-4 max_steps: 1024 memory_size: 8 normalize: false num_update: 1 train_interval: 1 num_layers: 1 time_horizon: 64 sequence_length: 16 summary_freq: 1000 tau: 0.005 use_recurrent: false curiosity_enc_size: 128 demo_path: None vis_encode_type: simple reward_signals: extrinsic: strength: 1.0 gamma: 0.99 """ ) VECTOR_ACTION_SPACE = [2] VECTOR_OBS_SPACE = 8 DISCRETE_ACTION_SPACE = [3, 3, 3, 2] BUFFER_INIT_SAMPLES = 32 NUM_AGENTS = 12 def create_sac_policy_mock(mock_env, dummy_config, use_rnn, use_discrete, use_visual): env, mock_brain, _ = mb.setup_mock_env_and_brains( mock_env, use_discrete, use_visual, num_agents=NUM_AGENTS, vector_action_space=VECTOR_ACTION_SPACE, vector_obs_space=VECTOR_OBS_SPACE, discrete_action_space=DISCRETE_ACTION_SPACE, ) trainer_parameters = dummy_config model_path = env.external_brain_names[0] trainer_parameters["model_path"] = model_path trainer_parameters["keep_checkpoints"] = 3 trainer_parameters["use_recurrent"] = use_rnn policy = SACPolicy(0, mock_brain, trainer_parameters, False, False) return env, policy @mock.patch("mlagents.envs.environment.UnityEnvironment") def test_sac_cc_policy(mock_env, dummy_config): # Test evaluate tf.reset_default_graph() env, policy = create_sac_policy_mock( mock_env, dummy_config, use_rnn=False, use_discrete=False, use_visual=False ) brain_infos = env.reset() brain_info = brain_infos[env.external_brain_names[0]] run_out = policy.evaluate(brain_info) assert run_out["action"].shape == (NUM_AGENTS, VECTOR_ACTION_SPACE[0]) # Test update buffer = mb.simulate_rollout(env, policy, BUFFER_INIT_SAMPLES) # Mock out reward signal eval buffer.update_buffer["extrinsic_rewards"] = buffer.update_buffer["rewards"] policy.update( buffer.update_buffer, num_sequences=len(buffer.update_buffer["actions"]) ) env.close() @pytest.mark.parametrize("discrete", [True, False], ids=["discrete", "continuous"]) @mock.patch("mlagents.envs.environment.UnityEnvironment") def test_sac_update_reward_signals(mock_env, dummy_config, discrete): # Test evaluate tf.reset_default_graph() # Add a Curiosity module dummy_config["reward_signals"]["curiosity"] = {} dummy_config["reward_signals"]["curiosity"]["strength"] = 1.0 dummy_config["reward_signals"]["curiosity"]["gamma"] = 0.99 dummy_config["reward_signals"]["curiosity"]["encoding_size"] = 128 env, policy = create_sac_policy_mock( mock_env, dummy_config, use_rnn=False, use_discrete=discrete, use_visual=False ) # Test update, while removing PPO-specific buffer elements. buffer = mb.simulate_rollout( env, policy, BUFFER_INIT_SAMPLES, exclude_key_list=["advantages", "actions_pre", "random_normal_epsilon"], ) # Mock out reward signal eval buffer.update_buffer["extrinsic_rewards"] = buffer.update_buffer["rewards"] buffer.update_buffer["curiosity_rewards"] = buffer.update_buffer["rewards"] policy.update_reward_signals( {"curiosity": buffer.update_buffer}, num_sequences=len(buffer.update_buffer["actions"]), ) env.close() @mock.patch("mlagents.envs.environment.UnityEnvironment") def test_sac_dc_policy(mock_env, dummy_config): # Test evaluate tf.reset_default_graph() env, policy = create_sac_policy_mock( mock_env, dummy_config, use_rnn=False, use_discrete=True, use_visual=False ) brain_infos = env.reset() brain_info = brain_infos[env.external_brain_names[0]] run_out = policy.evaluate(brain_info) assert run_out["action"].shape == (NUM_AGENTS, len(DISCRETE_ACTION_SPACE)) # Test update buffer = mb.simulate_rollout(env, policy, BUFFER_INIT_SAMPLES) # Mock out reward signal eval buffer.update_buffer["extrinsic_rewards"] = buffer.update_buffer["rewards"] policy.update( buffer.update_buffer, num_sequences=len(buffer.update_buffer["actions"]) ) env.close() @mock.patch("mlagents.envs.environment.UnityEnvironment") def test_sac_visual_policy(mock_env, dummy_config): # Test evaluate tf.reset_default_graph() env, policy = create_sac_policy_mock( mock_env, dummy_config, use_rnn=False, use_discrete=True, use_visual=True ) brain_infos = env.reset() brain_info = brain_infos[env.external_brain_names[0]] run_out = policy.evaluate(brain_info) assert run_out["action"].shape == (NUM_AGENTS, len(DISCRETE_ACTION_SPACE)) # Test update buffer = mb.simulate_rollout(env, policy, BUFFER_INIT_SAMPLES) # Mock out reward signal eval buffer.update_buffer["extrinsic_rewards"] = buffer.update_buffer["rewards"] run_out = policy.update( buffer.update_buffer, num_sequences=len(buffer.update_buffer["actions"]) ) assert type(run_out) is dict @mock.patch("mlagents.envs.environment.UnityEnvironment") def test_sac_rnn_policy(mock_env, dummy_config): # Test evaluate tf.reset_default_graph() env, policy = create_sac_policy_mock( mock_env, dummy_config, use_rnn=True, use_discrete=True, use_visual=False ) brain_infos = env.reset() brain_info = brain_infos[env.external_brain_names[0]] run_out = policy.evaluate(brain_info) assert run_out["action"].shape == (NUM_AGENTS, len(DISCRETE_ACTION_SPACE)) # Test update buffer = mb.simulate_rollout(env, policy, BUFFER_INIT_SAMPLES) # Mock out reward signal eval buffer.update_buffer["extrinsic_rewards"] = buffer.update_buffer["rewards"] policy.update(buffer.update_buffer, num_sequences=2) env.close() def test_sac_model_cc_vector(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): model = SACModel( make_brain_parameters(discrete_action=False, visual_inputs=0) ) init = tf.global_variables_initializer() sess.run(init) run_list = [model.output, model.value, model.entropy, model.learning_rate] feed_dict = { model.batch_size: 2, model.sequence_length: 1, model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_model_cc_visual(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): model = SACModel( make_brain_parameters(discrete_action=False, visual_inputs=2) ) init = tf.global_variables_initializer() sess.run(init) run_list = [model.output, model.value, model.entropy, model.learning_rate] feed_dict = { model.batch_size: 2, model.sequence_length: 1, model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), model.visual_in[0]: np.ones([2, 40, 30, 3]), model.visual_in[1]: np.ones([2, 40, 30, 3]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_model_dc_visual(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): model = SACModel( make_brain_parameters(discrete_action=True, visual_inputs=2) ) init = tf.global_variables_initializer() sess.run(init) run_list = [model.output, model.value, model.entropy, model.learning_rate] feed_dict = { model.batch_size: 2, model.sequence_length: 1, model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), model.visual_in[0]: np.ones([2, 40, 30, 3]), model.visual_in[1]: np.ones([2, 40, 30, 3]), model.action_masks: np.ones([2, 2]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_model_dc_vector(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): model = SACModel( make_brain_parameters(discrete_action=True, visual_inputs=0) ) init = tf.global_variables_initializer() sess.run(init) run_list = [model.output, model.value, model.entropy, model.learning_rate] feed_dict = { model.batch_size: 2, model.sequence_length: 1, model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), model.action_masks: np.ones([2, 2]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_model_dc_vector_rnn(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): memory_size = 128 model = SACModel( make_brain_parameters(discrete_action=True, visual_inputs=0), use_recurrent=True, m_size=memory_size, ) init = tf.global_variables_initializer() sess.run(init) run_list = [ model.output, model.all_log_probs, model.value, model.entropy, model.learning_rate, model.memory_out, ] feed_dict = { model.batch_size: 1, model.sequence_length: 2, model.prev_action: [[0], [0]], model.memory_in: np.zeros((1, memory_size)), model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), model.action_masks: np.ones([1, 2]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_model_cc_vector_rnn(): tf.reset_default_graph() with tf.Session() as sess: with tf.variable_scope("FakeGraphScope"): memory_size = 128 model = SACModel( make_brain_parameters(discrete_action=False, visual_inputs=0), use_recurrent=True, m_size=memory_size, ) init = tf.global_variables_initializer() sess.run(init) run_list = [ model.output, model.all_log_probs, model.value, model.entropy, model.learning_rate, model.memory_out, ] feed_dict = { model.batch_size: 1, model.sequence_length: 2, model.memory_in: np.zeros((1, memory_size)), model.vector_in: np.array([[1, 2, 3, 1, 2, 3], [3, 4, 5, 3, 4, 5]]), } sess.run(run_list, feed_dict=feed_dict) def test_sac_save_load_buffer(tmpdir): env, mock_brain, _ = mb.setup_mock_env_and_brains( mock.Mock(), False, False, num_agents=NUM_AGENTS, vector_action_space=VECTOR_ACTION_SPACE, vector_obs_space=VECTOR_OBS_SPACE, discrete_action_space=DISCRETE_ACTION_SPACE, ) trainer_params = dummy_config() trainer_params["summary_path"] = str(tmpdir) trainer_params["model_path"] = str(tmpdir) trainer_params["save_replay_buffer"] = True trainer = SACTrainer(mock_brain, 1, trainer_params, True, False, 0, 0) trainer.training_buffer = mb.simulate_rollout( env, trainer.policy, BUFFER_INIT_SAMPLES ) buffer_len = len(trainer.training_buffer.update_buffer["actions"]) trainer.save_model() # Wipe Trainer and try to load trainer2 = SACTrainer(mock_brain, 1, trainer_params, True, True, 0, 0) assert len(trainer2.training_buffer.update_buffer["actions"]) == buffer_len if __name__ == "__main__": pytest.main()