Source code for dcase_models.data.data_generator

import os
import numpy as np
import inspect
import random

import tensorflow as tf
tensorflow2 = tf.__version__.split('.')[0] == '2'

if tensorflow2:
    from tensorflow.keras.utils import Sequence
else:
    from keras.utils import Sequence

from dcase_models.data.feature_extractor import FeatureExtractor
from dcase_models.data.dataset_base import Dataset
# from .data_augmentation import AugmentedDataset


[docs]class DataGenerator(): """ Includes methods to load features files from DCASE datasets. Parameters ---------- dataset : Dataset Instance of the Dataset used to load the data. Note that the dataset has to be downloaded before initializing the DataGenerator. Refer to dcase-models/data/datasets.py for a complete list of available datasets. inputs : instance of FeatureExtractor or list of FeatureExtractor instances Instance(s) of FeatureExtractor. These are the feature extractor(s) used to generate the features. For multi-input, pass a list of FeatureExtractor instances. folds : list of str List of folds to be loaded. Each fold has to be in dataset.fold_list. Note that since the folds used at each stage of the pipeline (training, validation, evaluation) are different, an instance of DataGenerator for each stage has to be created. e.g. ['fold1', 'fold2', 'fold3', ...] outputs : str, FeatureExtractor or list, default='annotations' Instance(s) of FeatureExtractor used to generate the outputs. To use the annotations obtained from Dataset, use a string. For multi-output, use a list of FeatureExtractor and/or strings. batch_size : int, default=32 Number of files loaded when call get_data_batch(). Note that the meaning of batch_size here is slightly different from the one in machine learning libraries like keras. In these libraries batch_size means the number of instances (sequences in DCASE-models) used in each training step. Here batch_size is the number of files, and therefore, the number of sequences varies in each batch. shuffle: bool, default=True When training a model, it is typical to shuffle the dataset at the end of each epoch. If shuffle is True (default), then the audio file list is shuffled when the class is initialized and when shuffle_list() method is called. train : bool, default True When training, it is typical to feed the model with a numpy array that contains all the data concatenated. For validation and testing it is necessary to have the features of each file separate in order to do a file-wise evaluation. Therefore, if train is True, the loaded data is concatenated and converted to a numpy array. If train is False get_data() and get_data_batch() return a list, whose elements are the features of each file in the audio_file_list. scaler : Scaler or None, default=None If is not None, the Scaler object is used to scale the data after loading. scaler_outputs : Scaler or None, default=None Same as scaler but for the system outputs. Attributes ---------- audio_file_list : list of dict List of audio files from which the features will be loaded. Each element in the list includes information of the original audio file (important to get the annotations) and the subfolder where is the resampled (and maybe augmented) audio file. e.g.: audio_file_list = [ {'file_original': 'audio/1.wav', 'sub_folder': 'original'}, {'file_original': 'audio/1.wav', 'sub_folder': 'pitch_shift_1'}, {'file_original': 'audio/2.wav', 'sub_folder': 'original'}, ... ] See Also -------- Dataset : Dataset class FeatureExtractor : FeatureExtractor class Examples -------- Create instances of Dataset and FeatureExtractor with default parameters >>> from dcase_models.data.datasets import UrbanSound8k >>> from dcase_models.data.features import MelSpectrogram >>> from dcase_models.data.data_generator import DataGenerator >>> dataset = UrbanSound8k('../datasets/UrbanSound8k') >>> features = MelSpectrogram() Assuming that the dataset was downloaded and features were extracted already, we can initialize the data generators. This example uses fold1 and fold2 for training and fold3 for validation. >>> data_gen_train = DataGenerator( dataset, features, ['fold1', 'fold2'], train=True) >>> data_gen_val = DataGenerator( dataset, features, ['fold3'], train=False) >>> X_train, Y_train = data_gen_train.get_data_batch(0) >>> print(X_train.shape, Y_train.shape) (212, 43, 64) (212, 10) >>> X_val, Y_val = data_gen_val.get_data_batch(0) >>> print(len(X_val), len(Y_val)) 32 32 >>> print(X_val[0].shape, Y_val[0].shape) (7, 43, 64) (7, 10) >>> X_train, Y_train = data_gen_train.get_data() >>> print(X_train.shape, Y_train.shape) (11095, 43, 64) (11095, 10) >>> X_val, Y_val = data_gen_val.get_data() >>> print(len(X_val), len(Y_val)) 925 925 >>> print(X_val[0].shape, Y_val[0].shape) (7, 43, 64) (7, 10) """
[docs] def __init__(self, dataset, inputs, folds, outputs='annotations', batch_size=32, shuffle=True, train=True, scaler=None, scaler_outputs=None): """ Initialize the DataGenerator. Generates the audio_file_list by concatenating all the files from the folds passed as an argument. """ # General attributes self.dataset = dataset self.inputs = inputs if type(inputs) != list: self.inputs = [inputs] self.folds = folds self.outputs = outputs if type(outputs) != list: self.outputs = [outputs] self.batch_size = batch_size self.shuffle = shuffle self.train = train self.scaler = scaler self.scaler_outputs = scaler_outputs if (Dataset not in inspect.getmro(dataset.__class__)): raise AttributeError( 'dataset has to be an instance of Dataset or similar' ) if (not dataset.check_if_downloaded() and dataset.__class__ is not Dataset): raise AttributeError( ('The dataset was not downloaded. Please download it ' 'before using DataGenerator') ) for j, inp in enumerate(self.inputs): if ((FeatureExtractor not in inspect.getmro(inp.__class__)) and (type(inp) is not str)): raise AttributeError(('Each input has to be an ' 'instance of FeatureExtractor ' 'or similar')) # TODO: Check if all inputs share sr # TODO: Check if str is available in dataset if (not inp.check_if_extracted(dataset) and inp.__class__ is not FeatureExtractor): raise AttributeError( ('Features were not extracted ' 'for input: %d - %s' % (j, inp.__class__.__name__)) ) if FeatureExtractor in inspect.getmro(inp.__class__): self.sr = inp.sr self.time_resolution = inp.sequence_hop_time for output in self.outputs: if ((FeatureExtractor not in inspect.getmro(output.__class__)) and (type(output) is not str)): raise AttributeError('''Each input has to be an instance of FeatureExtractor or similar''') # self.features_file_list = [] self.audio_file_list = [] # Get audio paths self.dataset.generate_file_lists() audio_path, subfolders = self.dataset.get_audio_paths( self.sr ) if not train: # If not train, don't use augmentation subfolders = [subfolders[0]] for fold in folds: for subfolder in subfolders: subfolder_name = os.path.basename(subfolder) files_audio = self.dataset.file_lists[fold] for file_audio in files_audio: self.audio_file_list.append( {'file_original': file_audio, 'sub_folder': subfolder_name}) # file_features = self.convert_audio_path_to_features_path( # files_audio, subfolder=subfolder_name # ) # self.features_file_list.extend(file_features) if shuffle: self.shuffle_list() self.data = {}
def _data_generation(self, list_files): """ Returns features and annotations for all files in list_files. Parameters ---------- list_files : list of str List of file paths. Returns ------- features_list : list of ndarray List of features for each file. annotations : list of ndarray List of annotations matrix for each file. """ inputs_lists = [[] for _ in range(len(self.inputs))] outputs_lists = [[] for _ in range(len(self.outputs))] for file_dict in list_files: file_original = file_dict['file_original'] sub_folder = file_dict['sub_folder'] for j, input in enumerate(self.inputs): if type(input) is not str: features_path = input.get_features_path(self.dataset) file_features = self.convert_audio_path_to_features_path( file_original, features_path, subfolder=sub_folder) features = np.load(file_features) inputs_lists[j].append(features) else: raise AttributeError('Not available') # TODO: ADD this option for j, output in enumerate(self.outputs): if type(output) is not str: # print(output) features_path = output.get_features_path(self.dataset) file_features = self.convert_audio_path_to_features_path( file_original, features_path, subfolder=sub_folder) features = np.load(file_features) # print(features.shape) outputs_lists[j].append(features) else: # TODO: Add option to other outputs y = self.dataset.get_annotations( file_original, inputs_lists[0][-1], self.time_resolution) outputs_lists[j].append(y) # TODO: Improve how we pass features array to get_ann.. return inputs_lists, outputs_lists
[docs] def get_data(self): """ Return all data from the selected folds. If train were set as True, the output is concatenated and converted to a numpy array. Otherwise the outputs are lists whose elements are the features of each file. Returns ------- X : list or ndarray List or array of features for each file. Y : list or ndarray List or array of annotations for each file. """ X_list, Y_list = self._data_generation(self.audio_file_list) if self.scaler is not None: X_list = self.scaler.transform(X_list) if self.scaler_outputs is not None: Y_list = self.scaler_outputs.transform(Y_list) X = [[] for _ in range(len(self.inputs))] Y = [[] for _ in range(len(self.outputs))] for j in range(len(self.inputs)): if self.train: X[j] = np.concatenate(X_list[j], axis=0) else: X[j] = X_list[j].copy() for j in range(len(self.outputs)): if self.train: Y[j] = np.concatenate(Y_list[j], axis=0) else: Y[j] = Y_list[j].copy() if len(X) == 1: X = X[0] if len(Y) == 1: Y = Y[0] return X, Y
[docs] def get_data_batch(self, index): """ Return the data from the batch given by argument. If train were set as True, the output is concatenated and converted to a numpy array. Otherwise the outputs are lists whose elements are the features of each file. Returns ------- X : list or ndarray List or array of features for each file. Y : list or ndarray List or array of annotations for each file. """ list_file_batch = self.audio_file_list[ index*self.batch_size:(index+1)*self.batch_size ] # Generate data X_list, Y_list = self._data_generation(list_file_batch) if self.scaler is not None: X_list = self.scaler.transform(X_list) if self.scaler_outputs is not None: Y_list = self.scaler_outputs.transform(Y_list) X = [[] for _ in range(len(self.inputs))] Y = [[] for _ in range(len(self.outputs))] for j in range(len(self.inputs)): if self.train: X[j] = np.concatenate(X_list[j], axis=0) else: X[j] = X_list[j].copy() for j in range(len(self.outputs)): if self.train: Y[j] = np.concatenate(Y_list[j], axis=0) else: Y[j] = Y_list[j].copy() if len(X) == 1: X = X[0] if len(Y) == 1: Y = Y[0] return X, Y
[docs] def get_data_from_file(self, file_index): """ Returns the data from the file index given by argument. Returns ------- X : ndarray Array of features for each file. Y : ndarray Array of annotations for each file. """ # Generate data X, Y = self._data_generation([self.audio_file_list[file_index]]) if self.scaler is not None: X = self.scaler.transform(X) if self.scaler_outputs is not None: Y = self.scaler_outputs.transform(Y) if len(X) == 1: X = X[0] if len(Y) == 1: Y = Y[0] return X[0].copy(), Y[0].copy()
[docs] def convert_features_path_to_audio_path(self, features_file, features_path, sr=None): """ Converts features path(s) to audio path(s). Parameters ---------- features_file : str or list of str Path(s) to the features file(s). Returns ------- audio_file : str or list of str Path(s) to the audio file(s). """ audio_path, _ = self.dataset.get_audio_paths(sr=sr) if type(features_file) is str: audio_file = features_file.replace( features_path, audio_path ) audio_file = audio_file.replace('.npy', '.wav') elif type(features_file) is list: audio_file = [] for j in range(len(features_file)): audio_file_j = features_file[j].replace( features_path, audio_path ) audio_file_j = audio_file_j.replace('.npy', '.wav') audio_file.append(audio_file_j) return audio_file
[docs] def convert_audio_path_to_features_path(self, audio_file, features_path, subfolder=''): """ Converts audio path(s) to features path(s). Parameters ---------- audio_file : str or list of str Path(s) to the audio file(s). Returns ------- features_file : str or list of str Path(s) to the features file(s). """ if subfolder != '': features_path = os.path.join(features_path, subfolder) if type(audio_file) is str: features_file = audio_file.replace( self.dataset.audio_path, features_path ) features_file = features_file.replace('.wav', '.npy') elif type(audio_file) is list: features_file = [] for j in range(len(audio_file)): features_file_j = audio_file[j].replace( self.dataset.audio_path, features_path ) features_file_j = features_file_j.replace('.wav', '.npy') features_file.append(features_file_j) return features_file
[docs] def paths_remove_aug_subfolder(self, path): """ Removes the subfolder string related to augmentation from a path. Converts DATASET_PATH/audio/original/... into DATASET_PATH/audio/... Parameters ---------- path : str or list of str Path to be converted. Returns ------- features_file : str or list of str Path(s) to the features file(s). """ audio_path, subfolders = self.dataset.get_audio_paths() audio_path_sr, subfolders_sr = self.dataset.get_audio_paths() new_path = None for subfolder in subfolders: if subfolder in path: new_path = path.replace(subfolder, audio_path) break return new_path
[docs] def shuffle_list(self): """ Shuffles features_file_list. Notes ----- Only shuffle the list if shuffle is True. """ if self.shuffle: random.shuffle(self.audio_file_list)
def __len__(self): """ Get the number of batches. """ return int(np.ceil(len(self.audio_file_list) / self.batch_size))
[docs] def set_scaler(self, scaler): """ Set scaler object. """ self.scaler = scaler
[docs] def set_scaler_outputs(self, scaler_outputs): """ Set scaler object. """ self.scaler_outputs = scaler_outputs
[docs]class KerasDataGenerator(Sequence):
[docs] def __init__(self, data_generator): self.data_gen = data_generator self.data_gen.shuffle_list()
def __len__(self): 'Denotes the number of batches per epoch' return len(self.data_gen) def __getitem__(self, index): 'Generate one batch of data' # Generate indexes of the batch return self.data_gen.get_data_batch(index)
[docs] def on_epoch_end(self): 'Updates indexes after each epoch' self.data_gen.shuffle_list()