Source code for cadl.wavenet

"""WaveNet Autoencoder and conditional WaveNet.
"""
"""
WaveNet Training code and utilities are licensed under APL from the

Google Magenta project
----------------------
https://github.com/tensorflow/magenta/blob/master/magenta/models/nsynth/wavenet

Copyright 2017 Parag K. Mital.  See also NOTICE.md.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import numpy as np
import tensorflow as tf
from cadl import librispeech, vctk
from cadl import wavenet_utils as wnu
from cadl.utils import sample_categorical
from scipy.io import wavfile


[docs]def get_sequence_length(n_stages, n_layers_per_stage): """Summary Parameters ---------- n_stages : TYPE Description n_layers_per_stage : TYPE Description Returns ------- TYPE Description """ sequence_length = 2**n_layers_per_stage * 2 * n_stages return sequence_length
[docs]def condition(x, encoding): """Summary Parameters ---------- x : TYPE Description encoding : TYPE Description Returns ------- TYPE Description """ batch_size, length, channels = x.get_shape().as_list() enc_batch_size, enc_length, enc_channels = encoding.get_shape().as_list() assert enc_batch_size == batch_size assert enc_channels == channels encoding = tf.reshape(encoding, [batch_size, enc_length, 1, channels]) x = tf.reshape(x, [batch_size, enc_length, -1, channels]) x += encoding x = tf.reshape(x, [batch_size, length, channels]) x.set_shape([batch_size, length, channels]) return x
[docs]def create_wavenet_autoencoder(n_stages, n_layers_per_stage, n_hidden, batch_size, n_skip, filter_length, bottleneck_width, hop_length, n_quantization, sample_rate): """Summary Parameters ---------- n_stages : TYPE Description n_layers_per_stage : TYPE Description n_hidden : TYPE Description batch_size : TYPE Description n_skip : TYPE Description filter_length : TYPE Description bottleneck_width : TYPE Description hop_length : TYPE Description n_quantization : TYPE Description sample_rate : TYPE Description Returns ------- TYPE Description """ offset = n_quantization / 2.0 sequence_length = 2**n_layers_per_stage * 2 * n_stages # Encode the source with 8-bit Mu-Law. X = tf.placeholder( name='X', shape=[batch_size, sequence_length], dtype=tf.float32) X_quantized = wnu.mu_law(X, n_quantization) X_scaled = tf.cast(X_quantized / offset, tf.float32) X_scaled = tf.expand_dims(X_scaled, 2) # The Non-Causal Temporal Encoder. en = wnu.conv1d( X=X_scaled, causal=False, num_filters=n_hidden, filter_length=filter_length, name='ae_startconv') # Residual blocks with skip connections. for i in range(n_stages * n_layers_per_stage): dilation = 2**(i % n_layers_per_stage) print(dilation) d = tf.nn.relu(en) d = wnu.conv1d( d, causal=False, num_filters=n_hidden, filter_length=filter_length, dilation=dilation, name='ae_dilatedconv_%d' % (i + 1)) d = tf.nn.relu(d) en += wnu.conv1d( d, num_filters=n_hidden, filter_length=1, name='ae_res_%d' % (i + 1)) en = wnu.conv1d( en, num_filters=bottleneck_width, filter_length=1, name='ae_bottleneck') en = wnu.pool1d(en, hop_length, name='ae_pool', mode='avg') encoding = en # The WaveNet Decoder. l = wnu.shift_right(X_scaled) l = wnu.conv1d( l, num_filters=n_hidden, filter_length=filter_length, name='startconv') # Set up skip connections. s = wnu.conv1d(l, num_filters=n_skip, filter_length=1, name='skip_start') # Residual blocks with skip connections. for i in range(n_stages * n_layers_per_stage): dilation = 2**(i % n_layers_per_stage) d = wnu.conv1d( l, num_filters=2 * n_hidden, filter_length=filter_length, dilation=dilation, name='dilatedconv_%d' % (i + 1)) d = condition(d, wnu.conv1d( en, num_filters=2 * n_hidden, filter_length=1, name='cond_map_%d' % (i + 1))) assert d.get_shape().as_list()[2] % 2 == 0 m = d.get_shape().as_list()[2] // 2 d_sigmoid = tf.sigmoid(d[:, :, :m]) d_tanh = tf.tanh(d[:, :, m:]) d = d_sigmoid * d_tanh l += wnu.conv1d( d, num_filters=n_hidden, filter_length=1, name='res_%d' % (i + 1)) s += wnu.conv1d( d, num_filters=n_skip, filter_length=1, name='skip_%d' % (i + 1)) s = tf.nn.relu(s) s = wnu.conv1d(s, num_filters=n_skip, filter_length=1, name='out1') s = condition(s, wnu.conv1d( en, num_filters=n_skip, filter_length=1, name='cond_map_out1')) s = tf.nn.relu(s) # Compute the logits and get the loss. logits = wnu.conv1d( s, num_filters=n_quantization, filter_length=1, name='logits') logits = tf.reshape(logits, [-1, n_quantization]) probs = tf.nn.softmax(logits, name='softmax') synthesis = tf.reshape( wnu.inv_mu_law( tf.cast(tf.argmax(probs, 1), tf.float32) - offset, n_quantization), [-1, sequence_length]) labels = tf.cast(tf.reshape(X_quantized, [-1]), tf.int32) + int(offset) loss = tf.reduce_mean( tf.nn.sparse_softmax_cross_entropy_with_logits( logits=logits, labels=labels, name='nll'), 0, name='loss') tf.summary.audio("synthesis", synthesis, sample_rate=sample_rate) tf.summary.histogram("probs", probs) tf.summary.histogram("input_quantized", X_quantized) tf.summary.histogram("logits", logits) tf.summary.histogram("labels", labels) tf.summary.histogram("synthesis", synthesis) tf.summary.scalar("loss", loss) summaries = tf.summary.merge_all() return { 'X': X, 'quantized': X_quantized, 'encoding': encoding, 'probs': probs, 'synthesis': synthesis, 'summaries': summaries, 'loss': loss }
[docs]def create_wavenet(n_stages=10, n_layers_per_stage=9, n_hidden=200, batch_size=32, n_skip=100, filter_length=2, shift=True, n_quantization=256, sample_rate=16000): """Summary Parameters ---------- n_stages : int, optional Description n_layers_per_stage : int, optional Description n_hidden : int, optional Description batch_size : int, optional Description n_skip : int, optional Description filter_length : int, optional Description shift : bool, optional Description n_quantization : int, optional Description sample_rate : int, optional Description Returns ------- TYPE Description """ offset = n_quantization / 2.0 sequence_length = 2**n_layers_per_stage * 2 * n_stages # Encode the source with 8-bit Mu-Law. X = tf.placeholder( name='X', shape=[batch_size, sequence_length], dtype=tf.float32) X_quantized = wnu.mu_law(X, n_quantization) X_onehot = tf.expand_dims(X_quantized, 2) if shift: X_onehot = wnu.shift_right(X_onehot) h = wnu.conv1d( X=X_onehot, num_filters=n_hidden, filter_length=filter_length, name='startconv') # Set up skip connections. s = wnu.conv1d(X=h, num_filters=n_skip, filter_length=1, name='skip_start') # Residual blocks with skip connections. for i in range(n_stages * n_layers_per_stage): dilation = 2**(i % n_layers_per_stage) # dilated masked cnn d = wnu.conv1d( X=h, num_filters=2 * n_hidden, filter_length=filter_length, dilation=dilation, name='dilatedconv_%d' % (i + 1)) # gated cnn assert d.get_shape().as_list()[2] % 2 == 0 m = d.get_shape().as_list()[2] // 2 d = tf.sigmoid(d[:, :, :m]) * tf.tanh(d[:, :, m:]) # residuals h += wnu.conv1d( X=d, num_filters=n_hidden, filter_length=1, name='res_%d' % (i + 1)) # skips s += wnu.conv1d( X=d, num_filters=n_skip, filter_length=1, name='skip_%d' % (i + 1)) s = tf.nn.relu(s) s = wnu.conv1d(X=s, num_filters=n_skip, filter_length=1, name='out1') s = tf.nn.relu(s) logits = tf.clip_by_value( wnu.conv1d( X=s, num_filters=n_quantization, filter_length=1, name='logits_preclip') + offset, 0.0, n_quantization - 1.0, name='logits') logits = tf.reshape(logits, [-1, n_quantization]) labels = tf.cast(tf.reshape(X_quantized + offset, [-1]), tf.int32) loss = tf.reduce_mean( tf.nn.sparse_softmax_cross_entropy_with_logits( logits=logits, labels=labels, name='nll'), 0, name='loss') probs = tf.nn.softmax(logits, name='softmax') synthesis = tf.reshape( wnu.inv_mu_law( tf.cast(tf.argmax(probs, 1), tf.float32) - offset, n_quantization), [-1, sequence_length]) tf.summary.audio("synthesis", synthesis, sample_rate=sample_rate) tf.summary.histogram("probs", probs) tf.summary.histogram("input_quantized", X_quantized) tf.summary.histogram("logits", logits) tf.summary.histogram("labels", labels) tf.summary.histogram("synthesis", synthesis) tf.summary.scalar("loss", loss) summaries = tf.summary.merge_all() return { 'X': X, 'quantized': X_quantized, 'probs': probs, 'synthesis': synthesis, 'summaries': summaries, 'loss': loss }
[docs]def train_vctk(): """Summary Returns ------- TYPE Description """ batch_size = 24 filter_length = 2 n_stages = 7 n_layers_per_stage = 9 n_hidden = 48 n_skip = 384 dataset = vctk.get_dataset() it_i = 0 n_epochs = 1000 sequence_length = get_sequence_length(n_stages, n_layers_per_stage) ckpt_path = 'vctk-wavenet/wavenet_filterlen{}_batchsize{}_sequencelen{}_stages{}_layers{}_hidden{}_skips{}'.format( filter_length, batch_size, sequence_length, n_stages, n_layers_per_stage, n_hidden, n_skip) with tf.graph().as_default(), tf.session() as sess: net = create_wavenet( batch_size=batch_size, filter_length=filter_length, n_hidden=n_hidden, n_skip=n_skip, n_stages=n_stages, n_layers_per_stage=n_layers_per_stage) saver = tf.train.saver() init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) sess.run(init_op) if tf.train.latest_checkpoint(ckpt_path) is not None: saver.restore(sess, tf.train.latest_checkpoint(ckpt_path)) batch = vctk.batch_generator with tf.variable_scope('optimizer'): opt = tf.train.adamoptimizer( learning_rate=0.0002).minimize(net['loss']) var_list = [ v for v in tf.global_variables() if v.name.startswith('optimizer') ] sess.run(tf.variables_initializer(var_list)) writer = tf.summary.filewriter(ckpt_path) for epoch_i in range(n_epochs): for batch_xs in batch(dataset, batch_size, sequence_length): loss, quantized, _ = sess.run( [net['loss'], net['quantized'], opt], feed_dict={net['x']: batch_xs}) print(loss) if it_i % 100 == 0: summary = sess.run( net['summaries'], feed_dict={net['x']: batch_xs}) writer.add_summary(summary, it_i) # save saver.save( sess, os.path.join(ckpt_path, 'model.ckpt'), global_step=it_i) it_i += 1 return loss
[docs]def test_librispeech(): """Summary """ batch_size = 24 filter_length = 2 n_stages = 7 n_layers_per_stage = 9 n_hidden = 48 n_skip = 384 total_length = 16000 sequence_length = get_sequence_length(n_stages, n_layers_per_stage) prime_length = sequence_length ckpt_path = 'wavenet/wavenet_filterlen{}_batchsize{}_sequencelen{}_stages{}_layers{}_hidden{}_skips{}/'.format( filter_length, batch_size, sequence_length, n_stages, n_layers_per_stage, n_hidden, n_skip) dataset = librispeech.get_dataset() batch = next( librispeech.batch_generator(dataset, batch_size, prime_length))[0] sess = tf.Session() net = create_wavenet( batch_size=batch_size, filter_length=filter_length, n_hidden=n_hidden, n_skip=n_skip, n_layers_per_stage=n_layers_per_stage, n_stages=n_stages, shift=False) init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) sess.run(init_op) saver = tf.train.Saver() if tf.train.latest_checkpoint(ckpt_path) is not None: saver.restore(sess, tf.train.latest_checkpoint(ckpt_path)) else: print('Could not find checkpoint') synth = np.zeros([batch_size, total_length], dtype=np.float32) synth[:, :prime_length] = batch print('Synthesize...') for sample_i in range(0, total_length - prime_length): print('{}/{}/{}'.format(sample_i, prime_length, total_length), end='\r') probs = sess.run( net["probs"], feed_dict={net["X"]: synth[:, sample_i:sample_i + sequence_length]}) idxs = sample_categorical(probs) idxs = idxs.reshape((batch_size, sequence_length)) if sample_i == 0: audio = wnu.inv_mu_law_numpy(idxs - 128) synth[:, :prime_length] = audio else: audio = wnu.inv_mu_law_numpy(idxs[:, -1] - 128) synth[:, prime_length + sample_i] = audio for i in range(batch_size): wavfile.write('synthesis-{}.wav'.format(i), 16000, synth[i])