浏览代码

Temporarily remove multi-GPU

/develop/nopreviousactions
Ervin Teng 4 年前
当前提交
00017bab
共有 9 个文件被更改,包括 15 次插入246 次删除
  1. 1
      docs/Training-ML-Agents.md
  2. 1
      ml-agents/mlagents/trainers/common/tf_optimizer.py
  3. 6
      ml-agents/mlagents/trainers/learn.py
  4. 30
      ml-agents/mlagents/trainers/ppo/trainer.py
  5. 8
      ml-agents/mlagents/trainers/tests/test_ghost.py
  6. 1
      ml-agents/mlagents/trainers/tests/test_learn.py
  7. 10
      ml-agents/mlagents/trainers/tests/test_ppo.py
  8. 2
      ml-agents/mlagents/trainers/trainer_util.py
  9. 202
      ml-agents/mlagents/trainers/ppo/multi_gpu_policy.py

1
docs/Training-ML-Agents.md


[here](https://docs.unity3d.com/Manual/CommandLineArguments.html) for more
details.
* `--debug`: Specify this option to enable debug-level logging for some parts of the code.
* `--multi-gpu`: Setting this flag enables the use of multiple GPU's (if available) during training.
* `--cpu`: Forces training using CPU only.
* Engine Configuration :
* `--width' : The width of the executable window of the environment(s) in pixels

1
ml-agents/mlagents/trainers/common/tf_optimizer.py


class TFOptimizer(Optimizer): # pylint: disable=W0223
def __init__(self, policy: TFPolicy, trainer_params: Dict[str, Any]):
super().__init__(policy)
self.sess = policy.sess
self.policy = policy
self.update_dict: Dict[str, tf.Tensor] = {}

6
ml-agents/mlagents/trainers/learn.py


help="Whether to run ML-Agents in debug mode with detailed logging",
)
argparser.add_argument(
"--multi-gpu",
default=False,
action="store_true",
help="Setting this flag enables the use of multiple GPU's (if available) during training",
)
argparser.add_argument(
"--env-args",
default=None,
nargs=argparse.REMAINDER,

30
ml-agents/mlagents/trainers/ppo/trainer.py


import numpy as np
from mlagents.trainers.common.nn_policy import NNPolicy
from mlagents.trainers.ppo.multi_gpu_policy import MultiGpuNNPolicy, get_devices
from mlagents.trainers.rl_trainer import RLTrainer
from mlagents.trainers.brain import BrainParameters
from mlagents.trainers.tf_policy import TFPolicy

load: bool,
seed: int,
run_id: str,
multi_gpu: bool,
):
"""
Responsible for collecting experiences and training PPO model.

:param load: Whether the model should be loaded.
:param seed: The seed the model will be initialized with
:param run_id: The identifier of the current run
:param multi_gpu: Boolean for multi-gpu policy model
"""
super(PPOTrainer, self).__init__(
brain_name, trainer_parameters, training, run_id, reward_buff_cap

]
self._check_param_keys()
self.load = load
self.multi_gpu = multi_gpu
self.seed = seed
self.policy: NNPolicy = None # type: ignore

:param brain_parameters: specifications for policy construction
:return policy
"""
if self.multi_gpu and len(get_devices()) > 1:
policy: NNPolicy = MultiGpuNNPolicy(
self.seed,
brain_parameters,
self.trainer_parameters,
self.is_training,
self.load,
)
else:
policy = NNPolicy(
self.seed,
brain_parameters,
self.trainer_parameters,
self.is_training,
self.load,
create_tf_graph=False, # We will create the TF graph in the Optimizer
)
policy = NNPolicy(
self.seed,
brain_parameters,
self.trainer_parameters,
self.is_training,
self.load,
create_tf_graph=False, # We will create the TF graph in the Optimizer
)
return policy

8
ml-agents/mlagents/trainers/tests/test_ghost.py


)
trainer_params = dummy_config
trainer = PPOTrainer(
mock_brain.brain_name, 0, trainer_params, True, False, 0, "0", False
)
trainer = PPOTrainer(mock_brain.brain_name, 0, trainer_params, True, False, 0, "0")
trainer.seed = 1
policy = trainer.create_policy(mock_brain)
policy.create_tf_graph()

)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0", False)
ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0")
trainer = GhostTrainer(ppo_trainer, brain_name, 0, dummy_config, True, "0")
# first policy encountered becomes policy trained by wrapped PPO

)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0", False)
ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0")
trainer = GhostTrainer(ppo_trainer, brain_name, 0, dummy_config, True, "0")
# First policy encountered becomes policy trained by wrapped PPO

1
ml-agents/mlagents/trainers/tests/test_learn.py


"--docker-target-name=mydockertarget",
"--no-graphics",
"--debug",
"--multi-gpu",
]
opt = parse_command_line(full_args)

10
ml-agents/mlagents/trainers/tests/test_ppo.py


)
trainer = PPOTrainer(
brain_params.brain_name, 0, trainer_params, True, False, 0, "0", False
brain_params.brain_name, 0, trainer_params, True, False, 0, "0"
)
policy_mock = mock.Mock(spec=NNPolicy)
policy_mock.get_current_step.return_value = 0

trainer_params["reward_signals"]["curiosity"]["gamma"] = 0.99
trainer_params["reward_signals"]["curiosity"]["encoding_size"] = 128
trainer = PPOTrainer(
mock_brain.brain_name, 0, trainer_params, True, False, 0, "0", False
)
trainer = PPOTrainer(mock_brain.brain_name, 0, trainer_params, True, False, 0, "0")
policy = trainer.create_policy(mock_brain)
trainer.add_policy(mock_brain.brain_name, policy)
# Test update with sequence length smaller than batch size

)
dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False)
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0")
policy = trainer.create_policy(brain_params)
trainer.add_policy(brain_params.brain_name, policy)
trajectory_queue = AgentManagerQueue("testbrain")

dummy_config["summary_path"] = "./summaries/test_trainer_summary"
dummy_config["model_path"] = "./models/test_trainer_models/TestModel"
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False)
trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0")
policy = mock.Mock(spec=NNPolicy)
policy.get_current_step.return_value = 2000

2
ml-agents/mlagents/trainers/trainer_util.py


:param load_model: Whether to load the model or randomly initialize
:param seed: The random seed to use
:param meta_curriculum: Optional meta_curriculum, used to determine a reward buffer length for PPOTrainer
:param multi_gpu: Whether to use multi-GPU training
:return:
"""
if "default" not in trainer_config and brain_name not in trainer_config:

load_model,
seed,
run_id,
multi_gpu,
)
elif trainer_type == "sac":
trainer = SACTrainer(

202
ml-agents/mlagents/trainers/ppo/multi_gpu_policy.py


import logging
from typing import Any, Dict, List, Optional
from mlagents.tf_utils import tf
from tensorflow.python.client import device_lib
from mlagents.trainers.brain import BrainParameters
from mlagents_envs.timers import timed
from mlagents.trainers.common.nn_policy import NNPolicy
from mlagents.trainers.components.reward_signals import RewardSignal
from mlagents.trainers.components.reward_signals.reward_signal_factory import (
create_reward_signal,
)
# Variable scope in which created variables will be placed under
TOWER_SCOPE_NAME = "tower"
logger = logging.getLogger("mlagents.trainers")
class MultiGpuNNPolicy(NNPolicy):
def __init__(
self,
seed: int,
brain: BrainParameters,
trainer_params: Dict[str, Any],
is_training: bool,
load: bool,
):
self.towers: List[NNPolicy] = []
self.devices: List[str] = []
self.model: Optional[NNPolicy] = None
self.total_policy_loss: Optional[tf.Tensor] = None
self.reward_signal_towers: List[Dict[str, RewardSignal]] = []
self.reward_signals: Dict[str, RewardSignal] = {}
super().__init__(seed, brain, trainer_params, is_training, load)
def create_model(
self, brain, trainer_params, reward_signal_configs, is_training, load, seed
):
"""
Create PPO models, one on each device
:param brain: Assigned Brain object.
:param trainer_params: Defined training parameters.
:param reward_signal_configs: Reward signal config
:param seed: Random seed.
"""
self.devices = get_devices()
with self.graph.as_default():
with tf.variable_scope("", reuse=tf.AUTO_REUSE):
for device in self.devices:
with tf.device(device):
self.towers.append(
NNPolicy(
seed=seed,
brain=brain,
trainer_params=trainer_params,
is_training=is_training,
load=load,
)
)
self.towers[-1].create_ppo_optimizer()
self.model = self.towers[0]
avg_grads = self.average_gradients([t.grads for t in self.towers])
update_batch = self.model.optimizer.apply_gradients(avg_grads)
avg_value_loss = tf.reduce_mean(
tf.stack([model.value_loss for model in self.towers]), 0
)
avg_policy_loss = tf.reduce_mean(
tf.stack([model.policy_loss for model in self.towers]), 0
)
self.inference_dict.update(
{
"action": self.model.output,
"log_probs": self.model.all_log_probs,
"value_heads": self.model.value_heads,
"value": self.model.value,
"entropy": self.model.entropy,
"learning_rate": self.model.learning_rate,
}
)
if self.use_continuous_act:
self.inference_dict["pre_action"] = self.model.output_pre
if self.use_recurrent:
self.inference_dict["memory_out"] = self.model.memory_out
if (
is_training
and self.use_vec_obs
and trainer_params["normalize"]
and not load
):
self.inference_dict["update_mean"] = self.model.update_normalization
self.total_policy_loss = self.model.abs_policy_loss
self.update_dict.update(
{
"value_loss": avg_value_loss,
"policy_loss": avg_policy_loss,
"update_batch": update_batch,
}
)
def create_reward_signals(self, reward_signal_configs):
"""
Create reward signals
:param reward_signal_configs: Reward signal config.
"""
with self.graph.as_default():
with tf.variable_scope(TOWER_SCOPE_NAME, reuse=tf.AUTO_REUSE):
for device_id, device in enumerate(self.devices):
with tf.device(device):
reward_tower = {}
for reward_signal, config in reward_signal_configs.items():
reward_tower[reward_signal] = create_reward_signal(
self.towers[device_id], reward_signal, config
)
for k, v in reward_tower[reward_signal].update_dict.items():
self.update_dict[k + "_" + str(device_id)] = v
self.reward_signal_towers.append(reward_tower)
for _, reward_tower in self.reward_signal_towers[0].items():
for _, update_key in reward_tower.stats_name_to_update_name.items():
all_reward_signal_stats = tf.stack(
[
self.update_dict[update_key + "_" + str(i)]
for i in range(len(self.towers))
]
)
mean_reward_signal_stats = tf.reduce_mean(
all_reward_signal_stats, 0
)
self.update_dict.update({update_key: mean_reward_signal_stats})
self.reward_signals = self.reward_signal_towers[0]
@timed
def update(self, mini_batch, num_sequences):
"""
Updates model using buffer.
:param n_sequences: Number of trajectories in batch.
:param mini_batch: Experience batch.
:return: Output from update process.
"""
feed_dict = {}
stats_needed = self.stats_name_to_update_name
device_batch_size = num_sequences // len(self.devices)
device_batches = []
for i in range(len(self.devices)):
device_batches.append(
{
k: v[
i * device_batch_size : i * device_batch_size
+ device_batch_size
]
for (k, v) in mini_batch.items()
}
)
for batch, tower, reward_tower in zip(
device_batches, self.towers, self.reward_signal_towers
):
# feed_dict.update(self.construct_feed_dict(tower, batch, num_sequences)) TODO: Fix multi-GPU optimizer
stats_needed.update(self.stats_name_to_update_name)
for _, reward_signal in reward_tower.items():
feed_dict.update(
reward_signal.prepare_update(tower, batch, num_sequences)
)
stats_needed.update(reward_signal.stats_name_to_update_name)
update_vals = self._execute_model(feed_dict, self.update_dict)
update_stats = {}
for stat_name, update_name in stats_needed.items():
update_stats[stat_name] = update_vals[update_name]
return update_stats
def average_gradients(self, tower_grads):
"""
Average gradients from all towers
:param tower_grads: Gradients from all towers
"""
average_grads = []
for grad_and_vars in zip(*tower_grads):
grads = [g for g, _ in grad_and_vars if g is not None]
if not grads:
continue
avg_grad = tf.reduce_mean(tf.stack(grads), 0)
var = grad_and_vars[0][1]
average_grads.append((avg_grad, var))
return average_grads
def get_devices() -> List[str]:
"""
Get all available GPU devices
"""
local_device_protos = device_lib.list_local_devices()
devices = [x.name for x in local_device_protos if x.device_type == "GPU"]
return devices
正在加载...
取消
保存