|
|
|
|
|
|
class LearningModel(object): |
|
|
|
def __init__(self, m_size, normalize, use_recurrent, brain): |
|
|
|
self.brain = brain |
|
|
|
self.state_in = None |
|
|
|
self.vector_in = None |
|
|
|
self.observation_in = [] |
|
|
|
self.visual_in = [] |
|
|
|
self.batch_size = tf.placeholder(shape=None, dtype=tf.int32, name='batch_size') |
|
|
|
self.sequence_length = tf.placeholder(shape=None, dtype=tf.int32, name='sequence_length') |
|
|
|
self.m_size = m_size |
|
|
|
|
|
|
else: |
|
|
|
c_channels = 3 |
|
|
|
|
|
|
|
observation_in = tf.placeholder(shape=[None, o_size_h, o_size_w, c_channels], dtype=tf.float32, name=name) |
|
|
|
return observation_in |
|
|
|
visual_in = tf.placeholder(shape=[None, o_size_h, o_size_w, c_channels], dtype=tf.float32, name=name) |
|
|
|
return visual_in |
|
|
|
self.state_in = tf.placeholder(shape=[None, s_size], dtype=tf.float32, name='state') |
|
|
|
self.vector_in = tf.placeholder(shape=[None, s_size], dtype=tf.float32, name='vector_observation') |
|
|
|
if self.normalize: |
|
|
|
self.running_mean = tf.get_variable("running_mean", [s_size], trainable=False, dtype=tf.float32, |
|
|
|
initializer=tf.zeros_initializer()) |
|
|
|
|
|
|
self.update_mean = tf.assign(self.running_mean, self.new_mean) |
|
|
|
self.update_variance = tf.assign(self.running_variance, self.new_variance) |
|
|
|
|
|
|
|
self.normalized_state = tf.clip_by_value((self.state_in - self.running_mean) / tf.sqrt( |
|
|
|
self.normalized_state = tf.clip_by_value((self.vector_in - self.running_mean) / tf.sqrt( |
|
|
|
self.normalized_state = self.state_in |
|
|
|
self.normalized_state = self.vector_in |
|
|
|
self.state_in = tf.placeholder(shape=[None, 1], dtype=tf.int32, name='state') |
|
|
|
self.vector_in = tf.placeholder(shape=[None, 1], dtype=tf.int32, name='vector_observation') |
|
|
|
|
|
|
|
def create_continuous_state_encoder(self, h_size, activation, num_layers): |
|
|
|
""" |
|
|
|
|
|
|
:param num_layers: number of hidden layers to create. |
|
|
|
:return: List of hidden layer tensors. |
|
|
|
""" |
|
|
|
conv1 = tf.layers.conv2d(self.observation_in[-1], 16, kernel_size=[8, 8], strides=[4, 4], |
|
|
|
conv1 = tf.layers.conv2d(self.visual_in[-1], 16, kernel_size=[8, 8], strides=[4, 4], |
|
|
|
activation=tf.nn.elu) |
|
|
|
conv2 = tf.layers.conv2d(conv1, 32, kernel_size=[4, 4], strides=[2, 2], |
|
|
|
activation=tf.nn.elu) |
|
|
|
|
|
|
:param num_layers: number of hidden layers to create. |
|
|
|
:return: List of hidden layer tensors. |
|
|
|
""" |
|
|
|
state_in = tf.reshape(self.state_in, [-1]) |
|
|
|
state_onehot = c_layers.one_hot_encoding(state_in, s_size) |
|
|
|
vector_in = tf.reshape(self.vector_in, [-1]) |
|
|
|
state_onehot = c_layers.one_hot_encoding(vector_in, s_size) |
|
|
|
hidden = state_onehot |
|
|
|
for j in range(num_layers): |
|
|
|
hidden = tf.layers.dense(hidden, h_size, use_bias=False, activation=activation) |
|
|
|
|
|
|
else: |
|
|
|
activation_fn = self.swish |
|
|
|
|
|
|
|
self.observation_in = [] |
|
|
|
self.visual_in = [] |
|
|
|
self.observation_in.append(visual_input) |
|
|
|
self.visual_in.append(visual_input) |
|
|
|
self.create_vector_input(s_size) |
|
|
|
|
|
|
|
final_hiddens = [] |
|
|
|
|
|
|
_half_point = int(m_size / 2) |
|
|
|
with tf.variable_scope(name): |
|
|
|
rnn_cell = tf.contrib.rnn.BasicLSTMCell(_half_point) |
|
|
|
lstm_state_in = tf.contrib.rnn.LSTMStateTuple(memory_in[:, :_half_point], memory_in[:, _half_point:]) |
|
|
|
lstm_vector_in = tf.contrib.rnn.LSTMStateTuple(memory_in[:, :_half_point], memory_in[:, _half_point:]) |
|
|
|
initial_state=lstm_state_in, |
|
|
|
initial_state=lstm_vector_in, |
|
|
|
time_major=False, |
|
|
|
dtype=tf.float32) |
|
|
|
|
|
|
|