Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

310 行
12 KiB

from typing import Optional, Tuple
from mlagents.tf_utils import tf
from mlagents.trainers.policy.tf_policy import TFPolicy
from mlagents.trainers.tf.models import ModelUtils
EPSILON = 1e-7
class GAILModel:
def __init__(
self,
policy: TFPolicy,
h_size: int = 128,
learning_rate: float = 3e-4,
encoding_size: int = 64,
use_actions: bool = False,
use_vail: bool = False,
gradient_penalty_weight: float = 10.0,
):
"""
The initializer for the GAIL reward generator.
https://arxiv.org/abs/1606.03476
:param policy_model: The policy of the learning algorithm
:param h_size: Size of the hidden layer for the discriminator
:param learning_rate: The learning Rate for the discriminator
:param encoding_size: The encoding size for the encoder
:param use_actions: Whether or not to use actions to discriminate
:param use_vail: Whether or not to use a variational bottleneck for the
discriminator. See https://arxiv.org/abs/1810.00821.
"""
self.h_size = h_size
self.z_size = 128
self.alpha = 0.0005
self.mutual_information = 0.5
self.policy = policy
self.encoding_size = encoding_size
self.gradient_penalty_weight = gradient_penalty_weight
self.use_vail = use_vail
self.use_actions = use_actions # True # Not using actions
self.noise: Optional[tf.Tensor] = None
self.z: Optional[tf.Tensor] = None
self.make_inputs()
self.create_network()
self.create_loss(learning_rate)
if self.use_vail:
self.make_beta_update()
def make_beta_update(self) -> None:
"""
Creates the beta parameter and its updater for GAIL
"""
new_beta = tf.maximum(
self.beta + self.alpha * (self.kl_loss - self.mutual_information), EPSILON
)
with tf.control_dependencies([self.update_batch]):
self.update_beta = tf.assign(self.beta, new_beta)
def make_inputs(self) -> None:
"""
Creates the input layers for the discriminator
"""
self.done_expert_holder = tf.placeholder(shape=[None], dtype=tf.float32)
self.done_policy_holder = tf.placeholder(shape=[None], dtype=tf.float32)
self.done_expert = tf.expand_dims(self.done_expert_holder, -1)
self.done_policy = tf.expand_dims(self.done_policy_holder, -1)
if self.policy.behavior_spec.action_spec.is_continuous():
action_length = self.policy.act_size[0]
self.action_in_expert = tf.placeholder(
shape=[None, action_length], dtype=tf.float32
)
self.expert_action = tf.identity(self.action_in_expert)
else:
action_length = len(self.policy.act_size)
self.action_in_expert = tf.placeholder(
shape=[None, action_length], dtype=tf.int32
)
self.expert_action = tf.concat(
[
tf.one_hot(self.action_in_expert[:, i], act_size)
for i, act_size in enumerate(self.policy.act_size)
],
axis=1,
)
encoded_policy_list = []
encoded_expert_list = []
(
self.obs_in_expert,
self.expert_visual_in,
) = ModelUtils.create_input_placeholders(
self.policy.behavior_spec.observation_shapes, "gail_"
)
if self.policy.vec_obs_size > 0:
if self.policy.normalize:
encoded_expert_list.append(
ModelUtils.normalize_vector_obs(
self.obs_in_expert,
self.policy.running_mean,
self.policy.running_variance,
self.policy.normalization_steps,
)
)
encoded_policy_list.append(self.policy.processed_vector_in)
else:
encoded_expert_list.append(self.obs_in_expert)
encoded_policy_list.append(self.policy.vector_in)
if self.expert_visual_in:
visual_policy_encoders = []
visual_expert_encoders = []
for i, (vis_in, exp_vis_in) in enumerate(
zip(self.policy.visual_in, self.expert_visual_in)
):
encoded_policy_visual = ModelUtils.create_visual_observation_encoder(
vis_in,
self.encoding_size,
ModelUtils.swish,
1,
f"gail_stream_{i}_visual_obs_encoder",
False,
)
encoded_expert_visual = ModelUtils.create_visual_observation_encoder(
exp_vis_in,
self.encoding_size,
ModelUtils.swish,
1,
f"gail_stream_{i}_visual_obs_encoder",
True,
)
visual_policy_encoders.append(encoded_policy_visual)
visual_expert_encoders.append(encoded_expert_visual)
hidden_policy_visual = tf.concat(visual_policy_encoders, axis=1)
hidden_expert_visual = tf.concat(visual_expert_encoders, axis=1)
encoded_policy_list.append(hidden_policy_visual)
encoded_expert_list.append(hidden_expert_visual)
self.encoded_expert = tf.concat(encoded_expert_list, axis=1)
self.encoded_policy = tf.concat(encoded_policy_list, axis=1)
def create_encoder(
self, state_in: tf.Tensor, action_in: tf.Tensor, done_in: tf.Tensor, reuse: bool
) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Creates the encoder for the discriminator
:param state_in: The encoded observation input
:param action_in: The action input
:param done_in: The done flags input
:param reuse: If true, the weights will be shared with the previous encoder created
"""
with tf.variable_scope("GAIL_model"):
if self.use_actions:
concat_input = tf.concat([state_in, action_in, done_in], axis=1)
else:
concat_input = state_in
hidden_1 = tf.layers.dense(
concat_input,
self.h_size,
activation=ModelUtils.swish,
name="gail_d_hidden_1",
reuse=reuse,
)
hidden_2 = tf.layers.dense(
hidden_1,
self.h_size,
activation=ModelUtils.swish,
name="gail_d_hidden_2",
reuse=reuse,
)
z_mean = None
if self.use_vail:
# Latent representation
z_mean = tf.layers.dense(
hidden_2,
self.z_size,
reuse=reuse,
name="gail_z_mean",
kernel_initializer=ModelUtils.scaled_init(0.01),
)
self.noise = tf.random_normal(tf.shape(z_mean), dtype=tf.float32)
# Sampled latent code
self.z = z_mean + self.z_sigma * self.noise * self.use_noise
estimate_input = self.z
else:
estimate_input = hidden_2
estimate = tf.layers.dense(
estimate_input,
1,
activation=tf.nn.sigmoid,
name="gail_d_estimate",
reuse=reuse,
)
return estimate, z_mean, concat_input
def create_network(self) -> None:
"""
Helper for creating the intrinsic reward nodes
"""
if self.use_vail:
self.z_sigma = tf.get_variable(
"gail_sigma_vail",
self.z_size,
dtype=tf.float32,
initializer=tf.ones_initializer(),
)
self.z_sigma_sq = self.z_sigma * self.z_sigma
self.z_log_sigma_sq = tf.log(self.z_sigma_sq + EPSILON)
self.use_noise = tf.placeholder(
shape=[1], dtype=tf.float32, name="gail_NoiseLevel"
)
self.expert_estimate, self.z_mean_expert, _ = self.create_encoder(
self.encoded_expert, self.expert_action, self.done_expert, reuse=False
)
self.policy_estimate, self.z_mean_policy, _ = self.create_encoder(
self.encoded_policy,
self.policy.selected_actions,
self.done_policy,
reuse=True,
)
self.mean_policy_estimate = tf.reduce_mean(self.policy_estimate)
self.mean_expert_estimate = tf.reduce_mean(self.expert_estimate)
self.discriminator_score = tf.reshape(
self.policy_estimate, [-1], name="gail_reward"
)
self.intrinsic_reward = -tf.log(1.0 - self.discriminator_score + EPSILON)
def create_gradient_magnitude(self) -> tf.Tensor:
"""
Gradient penalty from https://arxiv.org/pdf/1704.00028. Adds stability esp.
for off-policy. Compute gradients w.r.t randomly interpolated input.
"""
expert = [self.encoded_expert, self.expert_action, self.done_expert]
policy = [self.encoded_policy, self.policy.selected_actions, self.done_policy]
interp = []
for _expert_in, _policy_in in zip(expert, policy):
alpha = tf.random_uniform(tf.shape(_expert_in))
interp.append(alpha * _expert_in + (1 - alpha) * _policy_in)
grad_estimate, _, grad_input = self.create_encoder(
interp[0], interp[1], interp[2], reuse=True
)
grad = tf.gradients(grad_estimate, [grad_input])[0]
# Norm's gradient could be NaN at 0. Use our own safe_norm
safe_norm = tf.sqrt(tf.reduce_sum(grad ** 2, axis=-1) + EPSILON)
gradient_mag = tf.reduce_mean(tf.pow(safe_norm - 1, 2))
return gradient_mag
def create_loss(self, learning_rate: float) -> None:
"""
Creates the loss and update nodes for the GAIL reward generator
:param learning_rate: The learning rate for the optimizer
"""
self.mean_expert_estimate = tf.reduce_mean(self.expert_estimate)
self.mean_policy_estimate = tf.reduce_mean(self.policy_estimate)
if self.use_vail:
self.beta = tf.get_variable(
"gail_beta",
[],
trainable=False,
dtype=tf.float32,
initializer=tf.ones_initializer(),
)
self.discriminator_loss = -tf.reduce_mean(
tf.log(self.expert_estimate + EPSILON)
+ tf.log(1.0 - self.policy_estimate + EPSILON)
)
if self.use_vail:
# KL divergence loss (encourage latent representation to be normal)
self.kl_loss = tf.reduce_mean(
-tf.reduce_sum(
1
+ self.z_log_sigma_sq
- 0.5 * tf.square(self.z_mean_expert)
- 0.5 * tf.square(self.z_mean_policy)
- tf.exp(self.z_log_sigma_sq),
1,
)
)
self.loss = (
self.beta * (self.kl_loss - self.mutual_information)
+ self.discriminator_loss
)
else:
self.loss = self.discriminator_loss
if self.gradient_penalty_weight > 0.0:
self.loss += self.gradient_penalty_weight * self.create_gradient_magnitude()
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
self.update_batch = optimizer.minimize(self.loss)