Source code for cadl.utils

"""Various utilities including downloading, common layers, etc..
"""
"""
Copyright 2017 Parag K. Mital.  See also NOTICE.md.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import matplotlib.pyplot as plt
import tensorflow as tf
import urllib
import requests
import numpy as np
import zipfile
import os
import sys
from scipy.io import wavfile
import contextlib


@contextlib.contextmanager
[docs]def stdout_redirect(where): """Summary Parameters ---------- where : TYPE Description Yields ------ TYPE Description """ sys.stdout = where try: yield where finally: sys.stdout = sys.__stdout__
[docs]def exists(site): """Summary Parameters ---------- site : TYPE Description Returns ------- TYPE Description """ res = requests.head(site) return res.ok
[docs]def download(path): """Use urllib to download a file. Parameters ---------- path : str Url to download Returns ------- path : str Location of downloaded file. """ fname = path.split('/')[-1] if os.path.exists(fname): return fname print('Downloading ' + path) with open(fname, "wb") as f: response = requests.get(path, stream=True) total_length = response.headers.get('content-length') if total_length is None: f.write(response.content) else: dl = 0 total_length = int(total_length) for data in response.iter_content(chunk_size=4096): dl += len(data) f.write(data) done = int(50 * dl / total_length) sys.stdout.write("\r[%s%s] %.2f of %.2f MB" % ( '=' * done, ' ' * (50-done), dl / 1024.0 / 1024.0, total_length / 1024.0 / 1024.0)) sys.stdout.flush() return fname
[docs]def download_and_extract_tar(path, dst): """Download and extract a tar file. Parameters ---------- path : str Url to tar file to download. dst : str Location to save tar file contents. """ import tarfile filepath = download(path) if not os.path.exists(dst): os.makedirs(dst) tarfile.open(filepath, 'r:gz').extractall(dst)
[docs]def download_and_extract_zip(path, dst): """Download and extract a zip file. Parameters ---------- path : str Url to zip file to download. dst : str Location to save zip file contents. """ import zipfile filepath = download(path) if not os.path.exists(dst): os.makedirs(dst) zf = zipfile.ZipFile(file=filepath) zf.extractall(dst)
[docs]def load_audio(filename, b_normalize=True): """Load the audiofile at the provided filename using scipy.io.wavfile. Optionally normalizes the audio to the maximum value. Parameters ---------- filename : str File to load. b_normalize : bool, optional Normalize to the maximum value. Returns ------- TYPE Description """ sr, s = wavfile.read(filename) if b_normalize: s = s.astype(np.float32) s = (s / np.max(np.abs(s))) s -= np.mean(s) return s
[docs]def corrupt(x): """Take an input tensor and add uniform masking. Parameters ---------- x : Tensor/Placeholder Input to corrupt. Returns ------- x_corrupted : Tensor 50 pct of values corrupted. """ return tf.mul(x, tf.cast(tf.random_uniform(shape=tf.shape(x), minval=0, maxval=2, dtype=tf.int32), tf.float32))
[docs]def interp(l, r, n_samples): """Intepolate between the arrays l and r, n_samples times. Parameters ---------- l : np.ndarray Left edge r : np.ndarray Right edge n_samples : int Number of samples Returns ------- arr : np.ndarray Inteporalted array """ return np.array([ l + step_i / (n_samples - 1) * (r - l) for step_i in range(n_samples)])
[docs]def make_latent_manifold(corners, n_samples): """Create a 2d manifold out of the provided corners: n_samples * n_samples. Parameters ---------- corners : list of np.ndarray The four corners to intepolate. n_samples : int Number of samples to use in interpolation. Returns ------- arr : np.ndarray Stacked array of all 2D interpolated samples """ left = interp(corners[0], corners[1], n_samples) right = interp(corners[2], corners[3], n_samples) embedding = [] for row_i in range(n_samples): embedding.append(interp(left[row_i], right[row_i], n_samples)) return np.vstack(embedding)
[docs]def imcrop_tosquare(img): """Make any image a square image. Parameters ---------- img : np.ndarray Input image to crop, assumed at least 2d. Returns ------- crop : np.ndarray Cropped image. """ size = np.min(img.shape[:2]) extra = img.shape[:2] - size crop = img for i in np.flatnonzero(extra): crop = np.take(crop, extra[i] // 2 + np.r_[:size], axis=i) return crop
[docs]def slice_montage(montage, img_h, img_w, n_imgs): """Slice a montage image into n_img h x w images. Performs the opposite of the montage function. Takes a montage image and slices it back into a N x H x W x C image. Parameters ---------- montage : np.ndarray Montage image to slice. img_h : int Height of sliced image img_w : int Width of sliced image n_imgs : int Number of images to slice Returns ------- sliced : np.ndarray Sliced images as 4d array. """ sliced_ds = [] for i in range(int(np.sqrt(n_imgs))): for j in range(int(np.sqrt(n_imgs))): sliced_ds.append(montage[ 1 + i + i * img_h:1 + i + (i + 1) * img_h, 1 + j + j * img_w:1 + j + (j + 1) * img_w]) return np.array(sliced_ds)
[docs]def montage(images, saveto='montage.png'): """Draw all images as a montage separated by 1 pixel borders. Also saves the file to the destination specified by `saveto`. Parameters ---------- images : numpy.ndarray Input array to create montage of. Array should be: batch x height x width x channels. saveto : str Location to save the resulting montage image. Returns ------- m : numpy.ndarray Montage image. """ if isinstance(images, list): images = np.array(images) img_h = images.shape[1] img_w = images.shape[2] n_plots = int(np.ceil(np.sqrt(images.shape[0]))) if len(images.shape) == 4 and images.shape[3] == 3: m = np.ones( (images.shape[1] * n_plots + n_plots + 1, images.shape[2] * n_plots + n_plots + 1, 3)) * 0.5 else: m = np.ones( (images.shape[1] * n_plots + n_plots + 1, images.shape[2] * n_plots + n_plots + 1)) * 0.5 for i in range(n_plots): for j in range(n_plots): this_filter = i * n_plots + j if this_filter < images.shape[0]: this_img = images[this_filter] m[1 + i + i * img_h:1 + i + (i + 1) * img_h, 1 + j + j * img_w:1 + j + (j + 1) * img_w] = this_img plt.imsave(arr=m, fname=saveto) return m
[docs]def montage_filters(W): """Draws all filters (n_input * n_output filters) as a montage image separated by 1 pixel borders. Parameters ---------- W : Tensor Input tensor to create montage of. Returns ------- m : numpy.ndarray Montage image. """ W = np.reshape(W, [W.shape[0], W.shape[1], 1, W.shape[2] * W.shape[3]]) n_plots = int(np.ceil(np.sqrt(W.shape[-1]))) m = np.ones( (W.shape[0] * n_plots + n_plots + 1, W.shape[1] * n_plots + n_plots + 1)) * 0.5 for i in range(n_plots): for j in range(n_plots): this_filter = i * n_plots + j if this_filter < W.shape[-1]: m[1 + i + i * W.shape[0]:1 + i + (i + 1) * W.shape[0], 1 + j + j * W.shape[1]:1 + j + (j + 1) * W.shape[1]] = ( np.squeeze(W[:, :, :, this_filter])) return m
[docs]def get_celeb_files(dst='img_align_celeba', max_images=100): """Download the first 100 images of the celeb dataset. Files will be placed in a directory 'img_align_celeba' if one doesn't exist. Returns ------- files : list of strings Locations to the first 100 images of the celeb net dataset. Parameters ---------- dst : str, optional Description max_images : int, optional Description """ # Create a directory if not os.path.exists(dst): os.mkdir(dst) # Now perform the following 100 times: for img_i in range(1, max_images + 1): # create a string using the current loop counter f = '000%03d.jpg' % img_i if not os.path.exists(os.path.join(dst, f)): # and get the url with that string appended the end url = 'https://s3.amazonaws.com/cadl/celeb-align/' + f # We'll print this out to the console so we can see how far we've gone print(url, end='\r') # And now download the url to a location inside our new directory urllib.request.urlretrieve(url, os.path.join(dst, f)) files = [os.path.join(dst, file_i) for file_i in os.listdir(dst) if '.jpg' in file_i][:max_images] return files
[docs]def get_celeb_imgs(max_images=100): """Load the first `max_images` images of the celeb dataset. Returns ------- imgs : list of np.ndarray List of the first 100 images from the celeb dataset Parameters ---------- max_images : int, optional Description """ return [plt.imread(f_i) for f_i in get_celeb_files(max_images=max_images)]
[docs]def gauss(mean, stddev, ksize): """Use Tensorflow to compute a Gaussian Kernel. Parameters ---------- mean : float Mean of the Gaussian (e.g. 0.0). stddev : float Standard Deviation of the Gaussian (e.g. 1.0). ksize : int Size of kernel (e.g. 16). Returns ------- kernel : np.ndarray Computed Gaussian Kernel using Tensorflow. """ g = tf.Graph() with tf.Session(graph=g): x = tf.linspace(-3.0, 3.0, ksize) z = (tf.exp(tf.neg(tf.pow(x - mean, 2.0) / (2.0 * tf.pow(stddev, 2.0)))) * (1.0 / (stddev * tf.sqrt(2.0 * 3.1415)))) return z.eval()
[docs]def gauss2d(mean, stddev, ksize): """Use Tensorflow to compute a 2D Gaussian Kernel. Parameters ---------- mean : float Mean of the Gaussian (e.g. 0.0). stddev : float Standard Deviation of the Gaussian (e.g. 1.0). ksize : int Size of kernel (e.g. 16). Returns ------- kernel : np.ndarray Computed 2D Gaussian Kernel using Tensorflow. """ z = gauss(mean, stddev, ksize) g = tf.Graph() with tf.Session(graph=g): z_2d = tf.matmul(tf.reshape(z, [ksize, 1]), tf.reshape(z, [1, ksize])) return z_2d.eval()
[docs]def convolve(img, kernel): """Use Tensorflow to convolve a 4D image with a 4D kernel. Parameters ---------- img : np.ndarray 4-dimensional image shaped N x H x W x C kernel : np.ndarray 4-dimensional image shape K_H, K_W, C_I, C_O corresponding to the kernel's height and width, the number of input channels, and the number of output channels. Note that C_I should = C. Returns ------- result : np.ndarray Convolved result. """ g = tf.Graph() with tf.Session(graph=g): convolved = tf.nn.conv2d(img, kernel, strides=[1, 1, 1, 1], padding='SAME') res = convolved.eval() return res
[docs]def gabor(ksize=32): """Use Tensorflow to compute a 2D Gabor Kernel. Parameters ---------- ksize : int, optional Size of kernel. Returns ------- gabor : np.ndarray Gabor kernel with ksize x ksize dimensions. """ g = tf.Graph() with tf.Session(graph=g): z_2d = gauss2d(0.0, 1.0, ksize) ones = tf.ones((1, ksize)) ys = tf.sin(tf.linspace(-3.0, 3.0, ksize)) ys = tf.reshape(ys, [ksize, 1]) wave = tf.matmul(ys, ones) gabor = tf.mul(wave, z_2d) return gabor.eval()
[docs]def build_submission(filename, file_list, optional_file_list=()): """Helper utility to check homework assignment submissions and package them. Parameters ---------- filename : str Output zip file name file_list : tuple Tuple of files to include optional_file_list : tuple, optional Description """ # check each file exists for part_i, file_i in enumerate(file_list): if not os.path.exists(file_i): print('\nYou are missing the file {}. '.format(file_i) + 'It does not look like you have completed Part {}.'.format( part_i + 1)) def zipdir(path, zf): """Summary Parameters ---------- path : TYPE Description zf : TYPE Description """ for root, dirs, files in os.walk(path): for file in files: # make sure the files are part of the necessary file list if file.endswith(file_list) or file.endswith(optional_file_list): zf.write(os.path.join(root, file)) # create a zip file with the necessary files zipf = zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED) zipdir('.', zipf) zipf.close() print('Your assignment zip file has been created!') print('Now submit the file:\n{}\nto Kadenze for grading!'.format( os.path.abspath(filename)))
[docs]def normalize(a, s=0.1): '''Normalize the image range for visualization Parameters ---------- a : TYPE Description s : float, optional Description Returns ------- TYPE Description ''' return np.uint8(np.clip( (a - a.mean()) / max(a.std(), 1e-4) * s + 0.5, 0, 1) * 255)
# %%
[docs]def weight_variable(shape, **kwargs): '''Helper function to create a weight variable initialized with a normal distribution Parameters ---------- shape : list Size of weight variable **kwargs Description Returns ------- TYPE Description ''' if isinstance(shape, list): initial = tf.random_normal(tf.stack(shape), mean=0.0, stddev=0.01) initial.set_shape(shape) else: initial = tf.random_normal(shape, mean=0.0, stddev=0.01) return tf.Variable(initial, **kwargs)
# %%
[docs]def bias_variable(shape, **kwargs): '''Helper function to create a bias variable initialized with a constant value. Parameters ---------- shape : list Size of weight variable **kwargs Description Returns ------- TYPE Description ''' if isinstance(shape, list): initial = tf.random_normal(tf.stack(shape), mean=0.0, stddev=0.01) initial.set_shape(shape) else: initial = tf.random_normal(shape, mean=0.0, stddev=0.01) return tf.Variable(initial, **kwargs)
[docs]def binary_cross_entropy(z, x, name=None): """Binary Cross Entropy measures cross entropy of a binary variable. loss(x, z) = - sum_i (x[i] * log(z[i]) + (1 - x[i]) * log(1 - z[i])) Parameters ---------- z : tf.Tensor A `Tensor` of the same type and shape as `x`. x : tf.Tensor A `Tensor` of type `float32` or `float64`. name : None, optional Description Returns ------- TYPE Description """ with tf.variable_scope(name or 'bce'): eps = 1e-12 return (-(x * tf.log(z + eps) + (1. - x) * tf.log(1. - z + eps)))
[docs]def conv2d(x, n_output, k_h=5, k_w=5, d_h=2, d_w=2, padding='SAME', name='conv2d', reuse=None): """Helper for creating a 2d convolution operation. Parameters ---------- x : tf.Tensor Input tensor to convolve. n_output : int Number of filters. k_h : int, optional Kernel height k_w : int, optional Kernel width d_h : int, optional Height stride d_w : int, optional Width stride padding : str, optional Padding type: "SAME" or "VALID" name : str, optional Variable scope reuse : None, optional Description Returns ------- op : tf.Tensor Output of convolution """ with tf.variable_scope(name or 'conv2d', reuse=reuse): W = tf.get_variable( name='W', shape=[k_h, k_w, x.get_shape()[-1], n_output], initializer=tf.contrib.layers.xavier_initializer_conv2d()) conv = tf.nn.conv2d( name='conv', input=x, filter=W, strides=[1, d_h, d_w, 1], padding=padding) b = tf.get_variable( name='b', shape=[n_output], initializer=tf.constant_initializer(0.0)) h = tf.nn.bias_add( name='h', value=conv, bias=b) return h, W
[docs]def deconv2d(x, n_output_h, n_output_w, n_output_ch, n_input_ch=None, k_h=5, k_w=5, d_h=2, d_w=2, padding='SAME', name='deconv2d', reuse=None): """Deconvolution helper. Parameters ---------- x : tf.Tensor Input tensor to convolve. n_output_h : int Height of output n_output_w : int Width of output n_output_ch : int Number of filters. n_input_ch : None, optional Description k_h : int, optional Kernel height k_w : int, optional Kernel width d_h : int, optional Height stride d_w : int, optional Width stride padding : str, optional Padding type: "SAME" or "VALID" name : str, optional Variable scope reuse : None, optional Description Returns ------- op : tf.Tensor Output of deconvolution """ with tf.variable_scope(name or 'deconv2d', reuse=reuse): W = tf.get_variable( name='W', shape=[k_h, k_h, n_output_ch, n_input_ch or x.get_shape()[-1]], initializer=tf.contrib.layers.xavier_initializer_conv2d()) conv = tf.nn.conv2d_transpose( name='conv_t', value=x, filter=W, output_shape=tf.stack( [tf.shape(x)[0], n_output_h, n_output_w, n_output_ch]), strides=[1, d_h, d_w, 1], padding=padding) conv.set_shape([None, n_output_h, n_output_w, n_output_ch]) b = tf.get_variable( name='b', shape=[n_output_ch], initializer=tf.constant_initializer(0.0)) h = tf.nn.bias_add(name='h', value=conv, bias=b) return h, W
[docs]def lrelu(features, leak=0.2): """Leaky rectifier. Parameters ---------- features : tf.Tensor Input to apply leaky rectifier to. leak : float, optional Percentage of leak. Returns ------- op : tf.Tensor Resulting output of applying leaky rectifier activation. """ f1 = 0.5 * (1 + leak) f2 = 0.5 * (1 - leak) return f1 * features + f2 * abs(features)
[docs]def linear(x, n_output, name=None, activation=None, reuse=None): """Fully connected layer. Parameters ---------- x : tf.Tensor Input tensor to connect n_output : int Number of output neurons name : None, optional Scope to apply activation : None, optional Description reuse : None, optional Description Returns ------- h, W : tf.Tensor, tf.Tensor Output of fully connected layer and the weight matrix """ if len(x.get_shape()) != 2: x = flatten(x, reuse=reuse) n_input = x.get_shape().as_list()[1] with tf.variable_scope(name or "fc", reuse=reuse): W = tf.get_variable( name='W', shape=[n_input, n_output], dtype=tf.float32, initializer=tf.contrib.layers.xavier_initializer()) b = tf.get_variable( name='b', shape=[n_output], dtype=tf.float32, initializer=tf.constant_initializer(0.0)) h = tf.nn.bias_add( name='h', value=tf.matmul(x, W), bias=b) if activation: h = activation(h) return h, W
[docs]def flatten(x, name=None, reuse=None): """Flatten Tensor to 2-dimensions. Parameters ---------- x : tf.Tensor Input tensor to flatten. name : None, optional Variable scope for flatten operations reuse : None, optional Description Returns ------- flattened : tf.Tensor Flattened tensor. Raises ------ ValueError Description """ with tf.variable_scope('flatten'): dims = x.get_shape().as_list() if len(dims) == 4: flattened = tf.reshape( x, shape=[-1, dims[1] * dims[2] * dims[3]]) elif len(dims) == 2 or len(dims) == 1: flattened = x else: raise ValueError('Expected n dimensions of 1, 2 or 4. Found:', len(dims)) return flattened
[docs]def to_tensor(x): """Convert 2 dim Tensor to a 4 dim Tensor ready for convolution. Performs the opposite of flatten(x). If the tensor is already 4-D, this returns the same as the input, leaving it unchanged. Parameters ---------- x : tf.Tesnor Input 2-D tensor. If 4-D already, left unchanged. Returns ------- x : tf.Tensor 4-D representation of the input. Raises ------ ValueError If the tensor is not 2D or already 4D. """ if len(x.get_shape()) == 2: n_input = x.get_shape().as_list()[1] x_dim = np.sqrt(n_input) if x_dim == int(x_dim): x_dim = int(x_dim) x_tensor = tf.reshape( x, [-1, x_dim, x_dim, 1], name='reshape') elif np.sqrt(n_input / 3) == int(np.sqrt(n_input / 3)): x_dim = int(np.sqrt(n_input / 3)) x_tensor = tf.reshape( x, [-1, x_dim, x_dim, 3], name='reshape') else: x_tensor = tf.reshape( x, [-1, 1, 1, n_input], name='reshape') elif len(x.get_shape()) == 4: x_tensor = x else: raise ValueError('Unsupported input dimensions') return x_tensor
[docs]def sample_categorical(pmf): """Sample from a categorical distribution. Parameters ---------- pmf Probablity mass function. Output of a softmax over categories. Array of shape [batch_size, number of categories]. Rows sum to 1. Returns ------- idxs Array of size [batch_size, 1]. Integer of category sampled. """ if pmf.ndim == 1: pmf = np.expand_dims(pmf, 0) batch_size = pmf.shape[0] cdf = np.cumsum(pmf, axis=1) rand_vals = np.random.rand(batch_size) idxs = np.zeros([batch_size,]) for i in range(batch_size): idxs[i] = cdf[i].searchsorted(rand_vals[i]) return idxs