Source code for cadl.wavenet_utils

"""Various utilities for training WaveNet.
"""
"""
WaveNet Training code and utilities are licensed under APL from the

Google Magenta project
----------------------
https://github.com/tensorflow/magenta/blob/master/magenta/models/nsynth/wavenet

Copyright 2017 Parag K. Mital.  See also NOTICE.md.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import tensorflow as tf
import numpy as np


[docs]def shift_right(X): """Shift the input over by one and a zero to the front. Parameters ---------- X The [mb, time, channels] tensor input. Returns ------- x_sliced The [mb, time, channels] tensor output. """ shape = X.get_shape().as_list() x_padded = tf.pad(X, [[0, 0], [1, 0], [0, 0]]) x_sliced = tf.slice(x_padded, [0, 0, 0], tf.stack([-1, shape[1], -1])) x_sliced.set_shape(shape) return x_sliced
[docs]def mul_or_none(a, b): """Return the element wise multiplicative of the inputs. If either input is None, we return None. Parameters ---------- a A tensor input. b Another tensor input with the same type as a. Returns ------- None if either input is None. Otherwise returns a * b. """ if a is None or b is None: return None return a * b
[docs]def time_to_batch(X, block_size): """Splits time dimension (i.e. dimension 1) of `X` into batches. Within each batch element, the `k*block_size` time steps are transposed, so that the `k` time steps in each output batch element are offset by `block_size` from each other. The number of input time steps must be a multiple of `block_size`. Parameters ---------- X Tensor of shape [nb, k*block_size, n] for some natural number k. block_size number of time steps (i.e. size of dimension 1) in the output tensor. Returns ------- Tensor of shape [nb*block_size, k, n] """ shape = X.get_shape().as_list() y = tf.reshape(X, [ shape[0], shape[1] // block_size, block_size, shape[2] ]) y = tf.transpose(y, [0, 2, 1, 3]) y = tf.reshape(y, [ shape[0] * block_size, shape[1] // block_size, shape[2] ]) y.set_shape([ mul_or_none(shape[0], block_size), mul_or_none(shape[1], 1. / block_size), shape[2] ]) return y
[docs]def batch_to_time(X, block_size): """Inverse of `time_to_batch(X, block_size)`. Parameters ---------- X Tensor of shape [nb*block_size, k, n] for some natural number k. block_size number of time steps (i.e. size of dimension 1) in the output tensor. Returns ------- Tensor of shape [nb, k*block_size, n]. """ shape = X.get_shape().as_list() y = tf.reshape(X, [shape[0] // block_size, block_size, shape[1], shape[2]]) y = tf.transpose(y, [0, 2, 1, 3]) y = tf.reshape(y, [shape[0] // block_size, shape[1] * block_size, shape[2]]) y.set_shape([mul_or_none(shape[0], 1. / block_size), mul_or_none(shape[1], block_size), shape[2]]) return y
[docs]def conv1d(X, num_filters, filter_length, name, dilation=1, causal=True, kernel_initializer=tf.uniform_unit_scaling_initializer(1.0), biases_initializer=tf.constant_initializer(0.0)): """Fast 1D convolution that supports causal padding and dilation. Parameters ---------- X The [mb, time, channels] float tensor that we convolve. num_filters The number of filter maps in the convolution. filter_length The integer length of the filter. name The name of the scope for the variables. dilation The amount of dilation. causal Whether or not this is a causal convolution. kernel_initializer The kernel initialization function. biases_initializer The biases initialization function. Returns ------- y The output of the 1D convolution. """ batch_size, length, num_input_channels = X.get_shape().as_list() assert length % dilation == 0 kernel_shape = [1, filter_length, num_input_channels, num_filters] strides = [1, 1, 1, 1] biases_shape = [num_filters] padding = 'VALID' if causal else 'SAME' with tf.variable_scope(name): weights = tf.get_variable( 'W', shape=kernel_shape, initializer=kernel_initializer) biases = tf.get_variable( 'biases', shape=biases_shape, initializer=biases_initializer) x_ttb = time_to_batch(X, dilation) if filter_length > 1 and causal: x_ttb = tf.pad(x_ttb, [[0, 0], [filter_length - 1, 0], [0, 0]]) x_ttb_shape = x_ttb.get_shape().as_list() x_4d = tf.reshape(x_ttb, [x_ttb_shape[0], 1, x_ttb_shape[1], num_input_channels]) y = tf.nn.conv2d(x_4d, weights, strides, padding=padding) y = tf.nn.bias_add(y, biases) y_shape = y.get_shape().as_list() y = tf.reshape(y, [y_shape[0], y_shape[2], num_filters]) y = batch_to_time(y, dilation) y.set_shape([batch_size, length, num_filters]) return y
[docs]def pool1d(X, window_length, name, mode='avg', stride=None): """1D pooling function that supports multiple different modes. Parameters ---------- X The [mb, time, channels] float tensor that we are going to pool over. window_length The amount of samples we pool over. name The name of the scope for the variables. mode The type of pooling, either avg or max. stride The stride length. Returns ------- pooled The [mb, time // stride, channels] float tensor result of pooling. """ if mode == 'avg': pool_fn = tf.nn.avg_pool elif mode == 'max': pool_fn = tf.nn.max_pool stride = stride or window_length batch_size, length, num_channels = X.get_shape().as_list() assert length % window_length == 0 assert length % stride == 0 window_shape = [1, 1, window_length, 1] strides = [1, 1, stride, 1] x_4d = tf.reshape(X, [batch_size, 1, length, num_channels]) pooled = pool_fn(x_4d, window_shape, strides, padding='SAME', name=name) return tf.reshape(pooled, [batch_size, length // stride, num_channels])
[docs]def mu_law(X, mu=255, int8=False): """A TF implementation of Mu-Law encoding. Parameters ---------- X The audio samples to encode. mu The Mu to use in our Mu-Law. int8 Use int8 encoding. Returns ------- out The Mu-Law encoded int8 data. """ out = tf.sign(X) * tf.log(1 + mu * tf.abs(X)) / np.log(1 + mu) out = tf.floor(out * 128) if int8: out = tf.cast(out, tf.int8) return out
[docs]def mu_law_numpy(X, mu=255, int8=False): """A TF implementation of Mu-Law encoding. Parameters ---------- X The audio samples to encode. mu The Mu to use in our Mu-Law. int8 Use int8 encoding. Returns ------- out The Mu-Law encoded int8 data. """ out = np.sign(X) * np.log(1 + mu * np.abs(X)) / np.log(1 + mu) out = np.floor(out * 128) if int8: return out.astype(np.int8) return out
[docs]def inv_mu_law(X, mu=255): """A TF implementation of inverse Mu-Law. Parameters ---------- X The Mu-Law samples to decode. mu The Mu we used to encode these samples. Returns ------- out The decoded data. """ X = tf.cast(X, tf.float32) out = (X + 0.5) * 2. / (mu + 1) out = tf.sign(out) / mu * ((1 + mu)**tf.abs(out) - 1) out = tf.where(tf.equal(X, 0), X, out) return out
[docs]def inv_mu_law_numpy(X, mu=255.0): """A numpy implementation of inverse Mu-Law. Parameters ---------- X The Mu-Law samples to decode. mu The Mu we used to encode these samples. Returns ------- out The decoded data. """ X = np.array(X).astype(np.float32) out = (X + 0.5) * 2. / (mu + 1) out = np.sign(out) / mu * ((1 + mu)**np.abs(out) - 1) out = np.where(np.equal(X, 0), X, out) return out
[docs]def causal_linear(X, n_inputs, n_outputs, name, filter_length, rate, batch_size, depth=1): """Applies dilated convolution using queues. Assumes a filter_length of 2 or 3. Parameters ---------- X The [mb, time, channels] tensor input. n_inputs The input number of channels. n_outputs The output number of channels. name The variable scope to provide to W and biases. filter_length The length of the convolution, assumed to be 3. rate The rate or dilation batch_size Non-symbolic value for batch_size. depth : int, optional Description Returns ------- y The output of the operation (init_1, init_2) Initialization operations for the queues (push_1, push_2) Push operations for the queues """ assert filter_length == 2 or filter_length == 3 # TODO: Make generic... started something like this: # # create queue # qs = [] # inits = [] # states = [] # pushs = [] # zeros = tf.zeros((rate, batch_size, depth, n_inputs)) # for f_i in range(1, filter_length): # q = tf.FIFOQueue( # rate, # dtypes=tf.float32, # shapes=(batch_size, depth, n_inputs)) # qs.append(q) # inits.append(q.enqueue_many(zeros)) # states.append(q.dequeue()) # # pushs.append(qs[0].enqueue(X)) # for f_i in range(2, filter_length): # pushs.append(qs[f_i].enqueue(states[f_i - 1])) if filter_length == 3: # create queue q_1 = tf.FIFOQueue(rate, dtypes=tf.float32, shapes=(batch_size, depth, n_inputs)) q_2 = tf.FIFOQueue(rate, dtypes=tf.float32, shapes=(batch_size, depth, n_inputs)) init_1 = q_1.enqueue_many(tf.zeros((rate, batch_size, depth, n_inputs))) init_2 = q_2.enqueue_many(tf.zeros((rate, batch_size, depth, n_inputs))) state_1 = q_1.dequeue() push_1 = q_1.enqueue(X) state_2 = q_2.dequeue() push_2 = q_2.enqueue(state_1) # get pretrained weights w = tf.get_variable( name=name + "/W", shape=[1, filter_length, n_inputs, n_outputs], dtype=tf.float32) b = tf.get_variable( name=name + "/biases", shape=[n_outputs], dtype=tf.float32) w_q_2 = tf.slice(w, [0, 0, 0, 0], [-1, 1, -1, -1]) w_q_1 = tf.slice(w, [0, 1, 0, 0], [-1, 1, -1, -1]) w_x = tf.slice(w, [0, 2, 0, 0], [-1, 1, -1, -1]) # perform op w/ cached states y = tf.nn.bias_add( tf.matmul(state_2[:, 0, :], w_q_2[0][0]) + tf.matmul( state_1[:, 0, :], w_q_1[0][0]) + tf.matmul(X[:, 0, :], w_x[0][0]), b) y = tf.expand_dims(y, 1) return y, [init_1, init_2], [push_1, push_2] else: # create queue q = tf.FIFOQueue( rate, dtypes=tf.float32, shapes=(batch_size, depth, n_inputs)) init = q.enqueue_many( tf.zeros((rate, batch_size, depth, n_inputs))) state = q.dequeue() push = q.enqueue(X) # get pretrained weights W = tf.get_variable( name=name + '/W', shape=[1, filter_length, n_inputs, n_outputs], dtype=tf.float32) b = tf.get_variable( name=name + '/biases', shape=[n_outputs], dtype=tf.float32) W_q = tf.slice(W, [0, 0, 0, 0], [-1, 1, -1, -1]) W_x = tf.slice(W, [0, 1, 0, 0], [-1, 1, -1, -1]) # perform op w/ cached states y = tf.nn.bias_add( tf.matmul(state[:, 0, :], W_q[0][0]) + tf.matmul(X[:, 0, :], W_x[0][0]), b) return tf.expand_dims(y, 1), [init], [push]
[docs]def linear(X, n_inputs, n_outputs, name): """Summary Parameters ---------- X : TYPE Description n_inputs : TYPE Description n_outputs : TYPE Description name : TYPE Description Returns ------- TYPE Description """ W = tf.get_variable( name=name + '/W', shape=[1, 1, n_inputs, n_outputs], dtype=tf.float32) b = tf.get_variable( name=name + '/biases', shape=[n_outputs], dtype=tf.float32) # ipdb.set_trace() y = tf.nn.bias_add(tf.matmul(X[:, 0, :], W[0][0]), b) return tf.expand_dims(y, 1)