"""Utils for creating datasets.
"""
"""
Copyright 2017 Parag K. Mital. See also NOTICE.md.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import pickle
import numpy as np
import tensorflow as tf
from . import dft
from .utils import download_and_extract_zip, download_and_extract_tar
[docs]def gtzan_music_speech_download(dst='gtzan_music_speech'):
"""Download the GTZAN music and speech dataset.
Parameters
----------
dst : str, optional
Location to put the GTZAN music and speech datset.
"""
path = 'http://opihi.cs.uvic.ca/sound/music_speech.tar.gz'
download_and_extract_tar(path, dst)
[docs]def gtzan_music_speech_load(dst='gtzan_music_speech'):
"""Load the GTZAN Music and Speech dataset.
Downloads the dataset if it does not exist into the dst directory.
Parameters
----------
dst : str, optional
Location of GTZAN Music and Speech dataset.
Returns
-------
Xs, ys : np.ndarray, np.ndarray
Array of data, Array of labels
"""
from scipy.io import wavfile
if not os.path.exists(dst):
gtzan_music_speech_download(dst)
music_dir = os.path.join(os.path.join(dst, 'music_speech'), 'music_wav')
music = [
os.path.join(music_dir, file_i) for file_i in os.listdir(music_dir)
if file_i.endswith('.wav')
]
speech_dir = os.path.join(os.path.join(dst, 'music_speech'), 'speech_wav')
speech = [
os.path.join(speech_dir, file_i) for file_i in os.listdir(speech_dir)
if file_i.endswith('.wav')
]
Xs = []
ys = []
for i in music:
sr, s = wavfile.read(i)
s = s / 16384.0 - 1.0
re, im = dft.dft_np(s)
mag, phs = dft.ztoc(re, im)
Xs.append((mag, phs))
ys.append(0)
for i in speech:
sr, s = wavfile.read(i)
s = s / 16384.0 - 1.0
re, im = dft.dft_np(s)
mag, phs = dft.ztoc(re, im)
Xs.append((mag, phs))
ys.append(1)
Xs = np.array(Xs)
Xs = np.transpose(Xs, [0, 2, 3, 1])
ys = np.array(ys)
return Xs, ys
[docs]def cifar10_download(dst='cifar10'):
"""Download the CIFAR10 dataset.
Parameters
----------
dst : str, optional
Directory to download into.
"""
path = 'http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
download_and_extract_tar(path, dst)
[docs]def tiny_imagenet_load(dst='tiny_imagenet'):
"""Loads the paths to every file in the Tiny Imagenet Dataset.
Downloads the dataset if it does not exist into the dst directory.
Parameters
----------
dst : str, optional
Location of Tiny ImageNet dataset.
Returns
-------
all_files : list
List of paths to every file in the Tiny ImageNet Dataset
"""
if not os.path.exists(dst):
tiny_imagenet_download(dst)
all_files = []
all_labels = []
words = {}
with open(
os.path.join(os.path.join(dst, 'tiny-imagenet-200'), 'words.txt'),
'r') as fp:
for line in fp:
s = line.split('\t', maxsplit=1)
words.update({s[0]: s[1].strip()})
for ds_type in ['train', 'val', 'test']:
path = os.path.join(dst, 'tiny-imagenet-200')
path = os.path.join(path, ds_type)
for root, dirs, files in os.walk(path):
for f in files:
if f.endswith('JPEG'):
if ds_type == 'train':
try:
label = words[root.split('/')[-2]]
except:
print(root, f)
raise
else:
label = ''
all_files.append(os.path.join(root, f))
all_labels.append(label)
return all_files, all_labels
[docs]def tiny_imagenet_download(dst='tiny_imagenet'):
"""Download the Tiny ImageNet dataset.
Parameters
----------
dst : str, optional
Directory to download into.
"""
path = 'http://cs231n.stanford.edu/tiny-imagenet-200.zip'
download_and_extract_zip(path, dst)
[docs]def cifar10_load(dst='cifar10'):
"""Load the CIFAR10 dataset.
Downloads the dataset if it does not exist into the dst directory.
Parameters
----------
dst : str, optional
Location of CIFAR10 dataset.
Returns
-------
Xs, ys : np.ndarray, np.ndarray
Array of data, Array of labels
"""
if not os.path.exists(dst):
cifar10_download(dst)
Xs = None
ys = None
for f in range(1, 6):
cf = pickle.load(
open('%s/cifar-10-batches-py/data_batch_%d' % (dst, f), 'rb'),
encoding='LATIN')
if Xs is not None:
Xs = np.r_[Xs, cf['data']]
ys = np.r_[ys, np.array(cf['labels'])]
else:
Xs = cf['data']
ys = cf['labels']
Xs = np.swapaxes(np.swapaxes(Xs.reshape(-1, 3, 32, 32), 1, 3), 1, 2)
return Xs, ys
[docs]def dense_to_one_hot(labels, n_classes=2):
"""Convert class labels from scalars to one-hot vectors.
Parameters
----------
labels : array
Input labels to convert to one-hot representation.
n_classes : int, optional
Number of possible one-hot.
Returns
-------
one_hot : array
One hot representation of input.
"""
return np.eye(n_classes).astype(np.float32)[labels]
[docs]class DatasetSplit(object):
"""Utility class for batching data and handling multiple splits.
Attributes
----------
current_batch_idx : int
Description
images : np.ndarray
Xs of the dataset. Not necessarily images.
labels : np.ndarray
ys of the dataset.
n_classes : int
Number of possible labels
num_examples : int
Number of total observations
"""
def __init__(self, images, labels):
"""Initialize a DatasetSplit object.
Parameters
----------
images : np.ndarray
Xs/inputs
labels : np.ndarray
ys/outputs
"""
self.images = np.array(images).astype(np.float32)
if labels is not None:
self.labels = np.array(labels).astype(np.int32)
self.n_classes = len(np.unique(labels))
else:
self.labels = None
self.num_examples = len(self.images)
[docs] def next_batch(self, batch_size=100):
"""Batch generator with randomization.
Parameters
----------
batch_size : int, optional
Size of each minibatch.
Yields
------
Xs, ys : np.ndarray, np.ndarray
Next batch of inputs and labels (if no labels, then None).
"""
# Shuffle each epoch
current_permutation = np.random.permutation(range(len(self.images)))
epoch_images = self.images[current_permutation, ...]
if self.labels is not None:
epoch_labels = self.labels[current_permutation, ...]
# Then iterate over the epoch
self.current_batch_idx = 0
while self.current_batch_idx < len(self.images):
end_idx = min(self.current_batch_idx + batch_size, len(self.images))
this_batch = {
'images':
epoch_images[self.current_batch_idx:end_idx],
'labels':
epoch_labels[self.current_batch_idx:end_idx]
if self.labels is not None else None
}
self.current_batch_idx += batch_size
yield this_batch['images'], this_batch['labels']
[docs]class Dataset(object):
"""Create a dataset from data and their labels.
Allows easy use of train/valid/test splits; Batch generator.
Attributes
----------
all_idxs : list
All indexes across all splits.
all_inputs : list
All inputs across all splits.
all_labels : list
All labels across all splits.
n_classes : int
Number of labels.
split : list
Percentage split of train, valid, test sets.
test_idxs : list
Indexes of the test split.
train_idxs : list
Indexes of the train split.
valid_idxs : list
Indexes of the valid split.
"""
def __init__(self, Xs, ys=None, split=[1.0, 0.0, 0.0], one_hot=False, n_classes=1):
"""Initialize a Dataset object.
Parameters
----------
Xs : np.ndarray
Images/inputs to a network
ys : np.ndarray
Labels/outputs to a network
split : list, optional
Percentage of train, valid, and test sets.
one_hot : bool, optional
Whether or not to use one-hot encoding of labels (ys).
n_classes : int, optional
Number of classes represented in ys (used for one hot embedding).
"""
self.all_idxs = []
self.all_labels = []
self.all_inputs = []
self.train_idxs = []
self.valid_idxs = []
self.test_idxs = []
self.n_classes = n_classes
self.split = split
# Now mix all the labels that are currently stored as blocks
self.all_inputs = Xs
n_idxs = len(self.all_inputs)
idxs = range(n_idxs)
rand_idxs = np.random.permutation(idxs)
self.all_inputs = self.all_inputs[rand_idxs, ...]
if ys is not None:
self.all_labels = ys if not one_hot else dense_to_one_hot(ys, n_classes=n_classes)
self.all_labels = self.all_labels[rand_idxs, ...]
else:
self.all_labels = None
# Get splits
self.train_idxs = idxs[:round(split[0] * n_idxs)]
self.valid_idxs = idxs[len(self.train_idxs):
len(self.train_idxs) + round(split[1] * n_idxs)]
self.test_idxs = idxs[(len(self.valid_idxs) + len(self.train_idxs)):
(len(self.valid_idxs) + len(self.train_idxs)
) + round(split[2] * n_idxs)]
@property
def X(self):
"""Inputs/Xs/Images.
Returns
-------
all_inputs : np.ndarray
Original Inputs/Xs.
"""
return self.all_inputs
@property
def Y(self):
"""Outputs/ys/Labels.
Returns
-------
all_labels : np.ndarray
Original Outputs/ys.
"""
return self.all_labels
@property
def train(self):
"""Train split.
Returns
-------
split : DatasetSplit
Split of the train dataset.
"""
if len(self.train_idxs):
inputs = self.all_inputs[self.train_idxs, ...]
if self.all_labels is not None:
labels = self.all_labels[self.train_idxs, ...]
else:
labels = None
else:
inputs, labels = [], []
return DatasetSplit(inputs, labels)
@property
def valid(self):
"""Validation split.
Returns
-------
split : DatasetSplit
Split of the validation dataset.
"""
if len(self.valid_idxs):
inputs = self.all_inputs[self.valid_idxs, ...]
if self.all_labels is not None:
labels = self.all_labels[self.valid_idxs, ...]
else:
labels = None
else:
inputs, labels = [], []
return DatasetSplit(inputs, labels)
@property
def test(self):
"""Test split.
Returns
-------
split : DatasetSplit
Split of the test dataset.
"""
if len(self.test_idxs):
inputs = self.all_inputs[self.test_idxs, ...]
if self.all_labels is not None:
labels = self.all_labels[self.test_idxs, ...]
else:
labels = None
else:
inputs, labels = [], []
return DatasetSplit(inputs, labels)
[docs] def mean(self):
"""Mean of the inputs/Xs.
Returns
-------
mean : np.ndarray
Calculates mean across 0th (batch) dimension.
"""
return np.mean(self.all_inputs, axis=0)
[docs] def std(self):
"""Standard deviation of the inputs/Xs.
Returns
-------
std : np.ndarray
Calculates std across 0th (batch) dimension.
"""
return np.std(self.all_inputs, axis=0)