|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("mlagents.trainers") |
|
|
|
|
|
|
|
LOG_STD_MAX = 2 |
|
|
|
LOG_STD_MIN = -20 |
|
|
|
EPSILON = 1e-6 # Small value to avoid divide by zero |
|
|
|
|
|
|
|
|
|
|
|
class NNPolicy(TFPolicy): |
|
|
|
def __init__( |
|
|
|
|
|
|
trainer_params: Dict[str, Any], |
|
|
|
is_training: bool, |
|
|
|
load: bool, |
|
|
|
tanh_squash: bool = False, |
|
|
|
): |
|
|
|
""" |
|
|
|
Policy for Proximal Policy Optimization Networks. |
|
|
|
|
|
|
|
|
|
|
with self.graph.as_default(): |
|
|
|
if self.use_continuous_act: |
|
|
|
self.create_cc_actor(h_size, num_layers, vis_encode_type) |
|
|
|
self.create_cc_actor( |
|
|
|
h_size, num_layers, vis_encode_type, tanh_squash |
|
|
|
) |
|
|
|
else: |
|
|
|
self.create_dc_actor(h_size, num_layers, vis_encode_type) |
|
|
|
self.bc_module: Optional[BCModule] = None |
|
|
|
|
|
|
return run_out |
|
|
|
|
|
|
|
def create_cc_actor( |
|
|
|
self, h_size: int, num_layers: int, vis_encode_type: EncoderType |
|
|
|
self, |
|
|
|
h_size: int, |
|
|
|
num_layers: int, |
|
|
|
vis_encode_type: EncoderType, |
|
|
|
tanh_squash: bool = False, |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Creates Continuous control actor-critic model. |
|
|
|
|
|
|
reuse=tf.AUTO_REUSE, |
|
|
|
) |
|
|
|
|
|
|
|
self.log_sigma_sq = tf.get_variable( |
|
|
|
"log_sigma_squared", |
|
|
|
[self.act_size[0]], |
|
|
|
dtype=tf.float32, |
|
|
|
initializer=tf.zeros_initializer(), |
|
|
|
# Policy-dependent log_sigma_sq |
|
|
|
log_sigma = tf.layers.dense( |
|
|
|
hidden_policy, |
|
|
|
self.act_size[0], |
|
|
|
activation=None, |
|
|
|
name="log_std", |
|
|
|
kernel_initializer=LearningModel.scaled_init(0.01), |
|
|
|
sigma_sq = tf.exp(self.log_sigma_sq) |
|
|
|
self.log_sigma = tf.clip_by_value(log_sigma, LOG_STD_MIN, LOG_STD_MAX) |
|
|
|
|
|
|
|
sigma = tf.exp(self.log_sigma) |
|
|
|
self.output_pre = mu + tf.sqrt(sigma_sq) * self.epsilon |
|
|
|
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3 |
|
|
|
self.output = tf.identity(output_post, name="action") |
|
|
|
self.selected_actions = tf.stop_gradient(output_post) |
|
|
|
policy_ = mu + sigma * self.epsilon |
|
|
|
all_probs = ( |
|
|
|
-0.5 * tf.square(tf.stop_gradient(self.output_pre) - mu) / sigma_sq |
|
|
|
- 0.5 * tf.log(2.0 * np.pi) |
|
|
|
- 0.5 * self.log_sigma_sq |
|
|
|
_gauss_pre = -0.5 * ( |
|
|
|
((tf.stop_gradient(policy_) - mu) / (sigma + EPSILON)) ** 2 |
|
|
|
+ 2 * self.log_sigma |
|
|
|
+ np.log(2 * np.pi) |
|
|
|
all_probs = _gauss_pre |
|
|
|
all_probs = tf.reduce_sum(_gauss_pre, axis=1, keepdims=True) |
|
|
|
|
|
|
|
if tanh_squash: |
|
|
|
self.output_pre = tf.tanh(policy_) |
|
|
|
|
|
|
|
# Squash correction |
|
|
|
all_probs -= tf.reduce_sum( |
|
|
|
tf.log(1 - self.output_pre ** 2 + EPSILON), axis=1, keepdims=True |
|
|
|
) |
|
|
|
self.output = tf.identity(self.output_pre, name="action") |
|
|
|
else: |
|
|
|
self.output_pre = policy_ |
|
|
|
output_post = tf.clip_by_value(self.output_pre, -3, 3) / 3 |
|
|
|
self.output = tf.identity(output_post, name="action") |
|
|
|
|
|
|
|
self.selected_actions = tf.stop_gradient(self.output) |
|
|
|
tf.log(2 * np.pi * np.e) + self.log_sigma_sq |
|
|
|
tf.log(2 * np.pi * np.e) + tf.square(self.log_sigma) |
|
|
|
) |
|
|
|
# Make entropy the right shape |
|
|
|
self.entropy = tf.ones_like(tf.reshape(mu[:, 0], [-1])) * single_dim_entropy |
|
|
|