Source code for cadl.vae

"""Convolutional/Variational autoencoder, including demonstration of
training such a network on MNIST, CelebNet and the film, "Sita Sings The Blues"
using an image pipeline.
"""
"""
Copyright 2017 Parag K. Mital.  See also NOTICE.md.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import tensorflow as tf
import numpy as np
import os
from cadl.dataset_utils import create_input_pipeline
from cadl.datasets import CELEB, MNIST
from cadl.batch_norm import batch_norm
from cadl import utils


[docs]def VAE(input_shape=[None, 784], n_filters=[64, 64, 64], filter_sizes=[4, 4, 4], n_hidden=32, n_code=2, activation=tf.nn.tanh, dropout=False, denoising=False, convolutional=False, variational=False): """(Variational) (Convolutional) (Denoising) Autoencoder. Uses tied weights. Parameters ---------- input_shape : list, optional Shape of the input to the network. e.g. for MNIST: [None, 784]. n_filters : list, optional Number of filters for each layer. If convolutional=True, this refers to the total number of output filters to create for each layer, with each layer's number of output filters as a list. If convolutional=False, then this refers to the total number of neurons for each layer in a fully connected network. filter_sizes : list, optional Only applied when convolutional=True. This refers to the ksize (height and width) of each convolutional layer. n_hidden : int, optional Only applied when variational=True. This refers to the first fully connected layer prior to the variational embedding, directly after the encoding. After the variational embedding, another fully connected layer is created with the same size prior to decoding. Set to 0 to not use an additional hidden layer. n_code : int, optional Only applied when variational=True. This refers to the number of latent Gaussians to sample for creating the inner most encoding. activation : function, optional Activation function to apply to each layer, e.g. tf.nn.relu dropout : bool, optional Whether or not to apply dropout. If using dropout, you must feed a value for 'keep_prob', as returned in the dictionary. 1.0 means no dropout is used. 0.0 means every connection is dropped. Sensible values are between 0.5-0.8. denoising : bool, optional Whether or not to apply denoising. If using denoising, you must feed a value for 'corrupt_prob', as returned in the dictionary. 1.0 means no corruption is used. 0.0 means every feature is corrupted. Sensible values are between 0.5-0.8. convolutional : bool, optional Whether or not to use a convolutional network or else a fully connected network will be created. This effects the n_filters parameter's meaning. variational : bool, optional Whether or not to create a variational embedding layer. This will create a fully connected layer after the encoding, if `n_hidden` is greater than 0, then will create a multivariate gaussian sampling layer, then another fully connected layer. The size of the fully connected layers are determined by `n_hidden`, and the size of the sampling layer is determined by `n_code`. Returns ------- model : dict { 'cost': Tensor to optimize. 'Ws': All weights of the encoder. 'x': Input Placeholder 'z': Inner most encoding Tensor (latent features) 'y': Reconstruction of the Decoder 'keep_prob': Amount to keep when using Dropout 'corrupt_prob': Amount to corrupt when using Denoising 'train': Set to True when training/Applies to Batch Normalization. } """ # network input / placeholders for train (bn) and dropout x = tf.placeholder(tf.float32, input_shape, 'x') phase_train = tf.placeholder(tf.bool, name='phase_train') keep_prob = tf.placeholder(tf.float32, name='keep_prob') corrupt_prob = tf.placeholder(tf.float32, [1]) if denoising: current_input = utils.corrupt(x) * corrupt_prob + x * (1 - corrupt_prob) # 2d -> 4d if convolution x_tensor = utils.to_tensor(x) if convolutional else x current_input = x_tensor Ws = [] shapes = [] # Build the encoder for layer_i, n_output in enumerate(n_filters): with tf.variable_scope('encoder/{}'.format(layer_i)): shapes.append(current_input.get_shape().as_list()) if convolutional: h, W = utils.conv2d( x=current_input, n_output=n_output, k_h=filter_sizes[layer_i], k_w=filter_sizes[layer_i]) else: h, W = utils.linear(x=current_input, n_output=n_output) h = activation(batch_norm(h, phase_train, 'bn' + str(layer_i))) if dropout: h = tf.nn.dropout(h, keep_prob) Ws.append(W) current_input = h shapes.append(current_input.get_shape().as_list()) with tf.variable_scope('variational'): if variational: dims = current_input.get_shape().as_list() flattened = utils.flatten(current_input) if n_hidden: h = utils.linear(flattened, n_hidden, name='W_fc')[0] h = activation(batch_norm(h, phase_train, 'fc/bn')) if dropout: h = tf.nn.dropout(h, keep_prob) else: h = flattened z_mu = utils.linear(h, n_code, name='mu')[0] z_log_sigma = 0.5 * utils.linear(h, n_code, name='log_sigma')[0] # Sample from noise distribution p(eps) ~ N(0, 1) epsilon = tf.random_normal(tf.stack([tf.shape(x)[0], n_code])) # Sample from posterior z = z_mu + tf.multiply(epsilon, tf.exp(z_log_sigma)) if n_hidden: h = utils.linear(z, n_hidden, name='fc_t')[0] h = activation(batch_norm(h, phase_train, 'fc_t/bn')) if dropout: h = tf.nn.dropout(h, keep_prob) else: h = z size = dims[1] * dims[2] * dims[3] if convolutional else dims[1] h = utils.linear(h, size, name='fc_t2')[0] current_input = activation(batch_norm(h, phase_train, 'fc_t2/bn')) if dropout: current_input = tf.nn.dropout(current_input, keep_prob) if convolutional: current_input = tf.reshape(current_input, tf.stack([ tf.shape(current_input)[0], dims[1], dims[2], dims[3] ])) else: z = current_input shapes.reverse() n_filters.reverse() Ws.reverse() n_filters += [input_shape[-1]] # %% # Decoding layers for layer_i, n_output in enumerate(n_filters[1:]): with tf.variable_scope('decoder/{}'.format(layer_i)): shape = shapes[layer_i + 1] if convolutional: h, W = utils.deconv2d( x=current_input, n_output_h=shape[1], n_output_w=shape[2], n_output_ch=shape[3], n_input_ch=shapes[layer_i][3], k_h=filter_sizes[layer_i], k_w=filter_sizes[layer_i]) else: h, W = utils.linear(x=current_input, n_output=n_output) h = activation(batch_norm(h, phase_train, 'dec/bn' + str(layer_i))) if dropout: h = tf.nn.dropout(h, keep_prob) current_input = h y = current_input x_flat = utils.flatten(x) y_flat = utils.flatten(y) # l2 loss loss_x = tf.reduce_sum(tf.squared_difference(x_flat, y_flat), 1) if variational: # variational lower bound, kl-divergence loss_z = -0.5 * tf.reduce_sum(1.0 + 2.0 * z_log_sigma - tf.square(z_mu) - tf.exp(2.0 * z_log_sigma), 1) # add l2 loss cost = tf.reduce_mean(loss_x + loss_z) else: # just optimize l2 loss cost = tf.reduce_mean(loss_x) return { 'cost': cost, 'Ws': Ws, 'x': x, 'z': z, 'y': y, 'keep_prob': keep_prob, 'corrupt_prob': corrupt_prob, 'train': phase_train }
[docs]def train_vae(files, input_shape, learning_rate=0.0001, batch_size=100, n_epochs=50, n_examples=10, crop_shape=[64, 64, 3], crop_factor=0.8, n_filters=[100, 100, 100, 100], n_hidden=256, n_code=50, convolutional=True, variational=True, filter_sizes=[3, 3, 3, 3], dropout=True, keep_prob=0.8, activation=tf.nn.relu, img_step=100, save_step=100, ckpt_name="vae.ckpt"): """General purpose training of a (Variational) (Convolutional) Autoencoder. Supply a list of file paths to images, and this will do everything else. Parameters ---------- files : list of strings List of paths to images. input_shape : list Must define what the input image's shape is. learning_rate : float, optional Learning rate. batch_size : int, optional Batch size. n_epochs : int, optional Number of epochs. n_examples : int, optional Number of example to use while demonstrating the current training iteration's reconstruction. Creates a square montage, so make sure int(sqrt(n_examples))**2 = n_examples, e.g. 16, 25, 36, ... 100. crop_shape : list, optional Size to centrally crop the image to. crop_factor : float, optional Resize factor to apply before cropping. n_filters : list, optional Same as VAE's n_filters. n_hidden : int, optional Same as VAE's n_hidden. n_code : int, optional Same as VAE's n_code. convolutional : bool, optional Use convolution or not. variational : bool, optional Use variational layer or not. filter_sizes : list, optional Same as VAE's filter_sizes. dropout : bool, optional Use dropout or not keep_prob : float, optional Percent of keep for dropout. activation : function, optional Which activation function to use. img_step : int, optional How often to save training images showing the manifold and reconstruction. save_step : int, optional How often to save checkpoints. ckpt_name : str, optional Checkpoints will be named as this, e.g. 'model.ckpt' """ batch = create_input_pipeline( files=files, batch_size=batch_size, n_epochs=n_epochs, crop_shape=crop_shape, crop_factor=crop_factor, shape=input_shape) ae = VAE( input_shape=[None] + crop_shape, convolutional=convolutional, variational=variational, n_filters=n_filters, n_hidden=n_hidden, n_code=n_code, dropout=dropout, filter_sizes=filter_sizes, activation=activation) # Create a manifold of our inner most layer to show # example reconstructions. This is one way to see # what the "embedding" or "latent space" of the encoder # is capable of encoding, though note that this is just # a random hyperplane within the latent space, and does not # encompass all possible embeddings. zs = np.random.uniform(-1.0, 1.0, [4, n_code]).astype(np.float32) zs = utils.make_latent_manifold(zs, n_examples) optimizer = tf.train.AdamOptimizer( learning_rate=learning_rate).minimize(ae['cost']) # We create a session to use the graph sess = tf.Session() saver = tf.train.Saver() init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) sess.run(init_op) # This will handle our threaded image pipeline coord = tf.train.Coordinator() # Ensure no more changes to graph tf.get_default_graph().finalize() # Start up the queues for handling the image pipeline threads = tf.train.start_queue_runners(sess=sess, coord=coord) if os.path.exists(ckpt_name + '.index') or os.path.exists(ckpt_name): saver.restore(sess, ckpt_name) # Fit all training data t_i = 0 batch_i = 0 epoch_i = 0 cost = 0 n_files = len(files) test_xs = sess.run(batch) / 255.0 utils.montage(test_xs, 'test_xs.png') try: while not coord.should_stop() and epoch_i < n_epochs: batch_i += 1 batch_xs = sess.run(batch) / 255.0 train_cost = sess.run( [ae['cost'], optimizer], feed_dict={ ae['x']: batch_xs, ae['train']: True, ae['keep_prob']: keep_prob })[0] print(batch_i, train_cost) cost += train_cost if batch_i % n_files == 0: print('epoch:', epoch_i) print('average cost:', cost / batch_i) cost = 0 batch_i = 0 epoch_i += 1 if batch_i % img_step == 0: # Plot example reconstructions from latent layer recon = sess.run( ae['y'], feed_dict={ ae['z']: zs, ae['train']: False, ae['keep_prob']: 1.0 }) utils.montage( recon.reshape([-1] + crop_shape), 'manifold_%08d.png' % t_i) # Plot example reconstructions recon = sess.run( ae['y'], feed_dict={ ae['x']: test_xs, ae['train']: False, ae['keep_prob']: 1.0 }) print('reconstruction (min, max, mean):', recon.min(), recon.max(), recon.mean()) utils.montage( recon.reshape([-1] + crop_shape), 'reconstruction_%08d.png' % t_i) t_i += 1 if batch_i % save_step == 0: # Save the variables to disk. saver.save( sess, ckpt_name, global_step=batch_i, write_meta_graph=False) except tf.errors.OutOfRangeError: print('Done.') finally: # One of the threads has issued an exception. So let's tell all the # threads to shutdown. coord.request_stop() # Wait until all threads have finished. coord.join(threads) # Clean up the session. sess.close()
# %%
[docs]def test_mnist(): """Train an autoencoder on MNIST. This function will train an autoencoder on MNIST and also save many image files during the training process, demonstrating the latent space of the inner most dimension of the encoder, as well as reconstructions of the decoder. """ # load MNIST n_code = 2 mnist = MNIST(split=[0.8, 0.1, 0.1]) ae = VAE( input_shape=[None, 784], n_filters=[512, 256], n_hidden=64, n_code=n_code, activation=tf.nn.sigmoid, convolutional=False, variational=True) n_examples = 100 zs = np.random.uniform(-1.0, 1.0, [4, n_code]).astype(np.float32) zs = utils.make_latent_manifold(zs, n_examples) learning_rate = 0.02 optimizer = tf.train.AdamOptimizer( learning_rate=learning_rate).minimize(ae['cost']) # We create a session to use the graph sess = tf.Session() init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) sess.run(init_op) # Fit all training data t_i = 0 batch_i = 0 batch_size = 200 n_epochs = 10 test_xs = mnist.test.images[:n_examples] utils.montage(test_xs.reshape((-1, 28, 28)), 'test_xs.png') for epoch_i in range(n_epochs): train_i = 0 train_cost = 0 for batch_xs, _ in mnist.train.next_batch(batch_size): train_cost += sess.run( [ae['cost'], optimizer], feed_dict={ ae['x']: batch_xs, ae['train']: True, ae['keep_prob']: 1.0 })[0] train_i += 1 if batch_i % 10 == 0: # Plot example reconstructions from latent layer recon = sess.run( ae['y'], feed_dict={ ae['z']: zs, ae['train']: False, ae['keep_prob']: 1.0 }) utils.montage( recon.reshape((-1, 28, 28)), 'manifold_%08d.png' % t_i) # Plot example reconstructions recon = sess.run( ae['y'], feed_dict={ ae['x']: test_xs, ae['train']: False, ae['keep_prob']: 1.0 }) utils.montage( recon.reshape((-1, 28, 28)), 'reconstruction_%08d.png' % t_i) t_i += 1 batch_i += 1 valid_i = 0 valid_cost = 0 for batch_xs, _ in mnist.valid.next_batch(batch_size): valid_cost += sess.run( [ae['cost']], feed_dict={ ae['x']: batch_xs, ae['train']: False, ae['keep_prob']: 1.0 })[0] valid_i += 1 print('train:', train_cost / train_i, 'valid:', valid_cost / valid_i)
[docs]def test_celeb(): """Train an autoencoder on Celeb Net. """ files = CELEB() train_vae( files=files, input_shape=[218, 178, 3], batch_size=100, n_epochs=50, crop_shape=[64, 64, 3], crop_factor=0.8, convolutional=True, variational=True, n_filters=[100, 100, 100], n_hidden=250, n_code=100, dropout=True, filter_sizes=[3, 3, 3], activation=tf.nn.sigmoid, ckpt_name='./celeb.ckpt')
[docs]def test_sita(): """Train an autoencoder on Sita Sings The Blues. """ if not os.path.exists('sita'): os.system( 'wget http://ossguy.com/sita/Sita_Sings_the_Blues_640x360_XviD.avi') os.mkdir('sita') os.system('ffmpeg -i Sita_Sings_the_Blues_640x360_XviD.avi -r 60 -f' + ' image2 -s 160x90 sita/sita-%08d.jpg') files = [os.path.join('sita', f) for f in os.listdir('sita')] train_vae( files=files, input_shape=[90, 160, 3], batch_size=100, n_epochs=50, crop_shape=[90, 160, 3], crop_factor=1.0, convolutional=True, variational=True, n_filters=[100, 100, 100], n_hidden=250, n_code=100, dropout=True, filter_sizes=[3, 3, 3], activation=tf.nn.sigmoid, ckpt_name='./sita.ckpt')
if __name__ == '__main__': test_celeb()