Source code for nnogada.Nnogada

from deap import base, creator, tools, algorithms
from scipy.stats import bernoulli
import sys
from bitstring import BitArray
import time
import tensorflow as tf
import torch
from torch import nn
from torchinfo import summary
from torch_optimizer import AdaBound
import torch.nn.functional as F

import pandas as pd
from nnogada.hyperparameters import *

from tqdm import tqdm

[docs]class Nnogada: """ Main class for nnogada. """
[docs] def __init__(self, hyp_to_find, X_train, Y_train, X_val, Y_val, regression=True, verbose=False, **kwargs): """ Initialization of Nnogada class. Parameters ----------- hyp_to_find: dict Dictionary with the free hyperparameters of the neural net. The names must match with the names in the hyperparameters.py file. Ex: hyperparams = {'deep': [2,3], 'num_units': [100, 200], 'batch_size': [8, 32]} X_train: numpy.ndarray Set of attributes, or independent variables, for training. Y_train: numpy.ndarray Set of labels or dependent variable for training. X_val: numpy.ndarray Set of attributes, or independent variables, for testing/validation. Y_test: numpy.ndarray Set of labels or dependent variable for testing/validation. regression: Boolean If True assumes a regression task. Else, a classification is assumed. It affects the default choice in the activation function for the last layer, if regression it is the linear function, else it is softmax. **kwargs: kwargs Optional arguments: deep: Hyperparameter object Number of layers. num_units: Hyperparameter object Number of nodes by layer. batch_size: Hyperparameter object Batch size. learning_rate: Hyperparameter object Learning rate for Adam optimizer. epochs: Hyperparameter object Number of epochs for training. act_fn: Hyperparameter object Activation function for the hidden layers. last_act_fn: Hyperparameter object Activation function for the last layer. loss_fn: Hyperparameter object Loss function. """ self.neural_library = kwargs.pop('neural_library', 'keras') self.deep = kwargs.pop('deep', deep) self.num_units = kwargs.pop('num_units', num_units) self.batch_size = kwargs.pop('batch_size', batch_size) self.learning_rate = kwargs.pop('learning_rate', learning_rate) self.epochs = kwargs.pop('epochs', epochs) self.act_fn = kwargs.pop('act_fn', act_fn) self.last_act_fn = kwargs.pop('last_act_fn', last_act_fn) self.loss_fn = kwargs.pop('loss_fn', loss_fn) self.all_hyp_list = [self.deep, self.num_units, self.batch_size, self.learning_rate, self.epochs, self.act_fn, self.last_act_fn, self.loss_fn] self.hyp_to_find = hyp_to_find self.verbose = verbose if regression: self.metric = 'mean_squared_error' else: # it is a classification problem self.metric = 'accuracy' self.last_act_fn.setVal('softmax') self.X_train = X_train self.Y_train = Y_train self.X_val = X_val self.Y_val = Y_val self.history = [] self.best = None
[docs] def set_hyperparameters(self): """ This small routine sets as variable the hyperparameters indicated in the hyp_to_find dictionary. """ for hyp in self.all_hyp_list: if hyp.name in self.hyp_to_find: hyp.vary = True hyp.setValues(self.hyp_to_find[hyp.name]) #SC_hyperparameters
[docs] def neural_train_evaluate(self, ga_individual_solution): """ This train and evaluates the neural network models with the different solutions proposed by the Genetic Algorithm . Parameters ----------- ga_individual_solution: Individual of the genetic algorithm. Returns ------- loss: float Last value for the loss function. """ t = time.time() # Decode GA solution to integer for window_size and num_units hyp_vary_list = [] self.df_colnames = [] for i, hyp in enumerate(self.all_hyp_list): if hyp.vary: if self.verbose: print(hyp.name, hyp.values, len(hyp.values)) if len(hyp.values) <= 2: nbits = 1 elif len(hyp.values) <=4: nbits = 2 elif len(hyp.values) <=6: nbits = 3 elif len(hyp.values) <=8: nbits = 4 else: sys.exit("At this moment please only use 8 possible values for hyperparameter as maximum.") hyp.bitarray = BitArray(ga_individual_solution[i*nbits:i*nbits+nbits]) # (8) hyp.setVal(hyp.values[hyp.bitarray.uint]) hyp_vary_list.append(hyp.val) self.df_colnames.append(hyp.name) if self.verbose: print(hyp.name + ": {} | ".format(hyp.val), end='') if self.verbose: print("\n-------------------------------------------------") if self.neural_library == 'keras': # Train model and predict on validation set model = tf.keras.Sequential() model.add(tf.keras.layers.Dense(self.num_units.val, input_shape=(int(self.X_train.shape[1]),))) for i in range(self.deep.val): model.add(tf.keras.layers.Dense(self.num_units.val, activation=self.act_fn.val)) # model.add(keras.layers.Dropout(0.3)) # model.add(tf.keras.layers.Dense(int(self.Y_train.shape[1]), activation=tf.nn.softmax)) model.add(tf.keras.layers.Dense(int(self.Y_train.shape[1]), activation=self.last_act_fn.val)) optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate.val, beta_1=0.9, beta_2=0.999, epsilon=1e-3) model.compile(optimizer=optimizer, loss=self.loss_fn.val, metrics=[self.metric]) model.fit(self.X_train, self.Y_train, epochs=self.epochs.val, validation_data=(self.X_val, self.Y_val), callbacks=None, batch_size=self.batch_size.val, shuffle=1, verbose=int(self.verbose)) loss, score = model.evaluate(self.X_val, self.Y_val, verbose=int(self.verbose)) t = time.time() - t if self.verbose: print("Loss: {:.5f} Loss: {:.5f} Elapsed time: {:.2f}".format(score, loss, t)) print("-------------------------------------------------\n") # results = [hyp for hyp in hyp_vary_list].extend([loss, score, t]) # print(results) self.history.append(hyp_vary_list+[loss, score, t]) return loss, elif self.neural_library == 'torch': batch_size = int(self.batch_size.val) # Initialize the MLP self.model = MLP(int(self.X_train.shape[1]), int(self.Y_train.shape[1]), numneurons=self.num_units.val) # numlayers=self.deep.val) self.model.apply(self.model.init_weights) self.model.float() dataset_train = LoadDataSet(self.X_train, self.Y_train) dataset_val = LoadDataSet(self.X_val, self.Y_val) trainloader = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers=1) validloader = torch.utils.data.DataLoader(dataset_val, batch_size=batch_size, shuffle=True, num_workers=1) # Define the loss function and optimizer # loss_function = nn.L1Loss() loss_function = nn.MSELoss() optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate.val) # optimizer = torch.optim.Adadelta(self.model.parameters(), lr=self.learning_rate, weight_decay=1e-5) # optimizer = torch.optim.SGD(self.model.parameters(), lr=self.learning_rate, momentum=0.9, weight_decay=1e-5) # optimizer = AdaBound(self.model.parameters(), lr=self.learning_rate, final_lr=0.01, weight_decay=1e-10, gamma=0.1) # optimizer = torch.optim.Adagrad(self.model.parameters(), lr=self.learning_rate, # lr_decay=0, weight_decay=0, initial_accumulator_value=0, eps=1e-10) # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=5) # it needs pytorch utilities if self.verbose: summary(self.model) # Run the training loop history_train = np.empty((1,)) history_val = np.empty((1,)) for epoch in range(0, self.epochs.val): # Set current loss value current_loss = 0.0 # Iterate over the DataLoader for training data for i, data in enumerate(trainloader, 0): # Get and prepare inputs inputs, targets = data inputs, targets = inputs.float(), targets.float() targets = targets.reshape((targets.shape[0], targets.shape[1])) # Zero the gradients optimizer.zero_grad() # Perform forward pass outputs = self.model(inputs) # Compute loss loss = loss_function(outputs, targets) # Perform backward pass loss.backward() # Perform optimization optimizer.step() # Print statistics current_loss += loss.item() if i % 10 == 0: current_loss = 0.0 history_train = np.append(history_train, current_loss) valid_loss = 0.0 self.model.eval() # Optional when not using Model Specific layer for i, data in enumerate(validloader, 0): # Get and prepare inputs inputs, targets = data inputs, targets = inputs.float(), targets.float() targets = targets.reshape((targets.shape[0], targets.shape[1])) output_val = self.model(inputs) valid_loss = loss_function(output_val, targets) valid_loss += loss.item() history_val = np.append(history_val, valid_loss.item()) if self.verbose: print('Epoch: {}/{} | Training Loss: {:.5f} | Validation Loss:' '{:.5f}'.format(epoch + 1, self.epochs.val, loss.item(), valid_loss.item()), end='\r') t = time.time() - t if self.verbose: print('\nTraining process has finished in {:.2f} minutes.'.format(t / 60)) print("-------------------------------------------------\n") # history = {'loss': history_train, 'val_loss': history_val} self.loss_val = history_val[-5:] self.loss_train = history_train[-5:] # print("current loss: {} valid_loss: {} loss_val: {}".format(loss, valid_loss, self.loss_val[-1])) self.history.append(hyp_vary_list + [float(loss), float(valid_loss), t]) return self.loss_val[-1],
[docs] def eaSimpleWithElitism(self, population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, pbar=None): """ Method based on https://github.com/PacktPublishing/Hands-On-Genetic-Algorithms-with-Python. The individuals contained in the halloffame are directly injected into the next generation and are not subject to the genetic operators of selection, crossover and mutation. Parameters ---------- population : list List of individuals. toolbox : deap.base.Toolbox object Toolbox that contains the genetic operators. cxpb : float The probability of crossover between two individuals. mutpb : float Probability of mutation. ngen : int Number of generation. stats : deap.tools.Statistics object A Statistics object that is updated inplace, optional. halloffame : deap.tools.HallOfFame object Object that will contain the best individuals, optional. pbar : bool Flag to use progres bar with tqdm library. Returns ------- population : list List of individuals. logbook : deap.tools.Logbook object. Statistics of the evolution. """ logbook = tools.Logbook() logbook.header = ['gen', 'nevals'] + (stats.fields if stats else []) # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in population if not ind.fitness.valid] fitnesses = toolbox.map(toolbox.evaluate, invalid_ind) # nnogada: it evaluates all the individuals. for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit if halloffame is None: raise ValueError("halloffame parameter must not be empty!") halloffame.update(population) hof_size = len(halloffame.items) if halloffame.items else 0 record = stats.compile(population) if stats else {} logbook.record(gen=0, nevals=len(invalid_ind), **record) if self.verbose: print(logbook.stream) # Begin the generational process for gen in range(1, ngen + 1): if pbar: pbar.update(1) # Select the next generation individuals offspring = toolbox.select(population, len(population) - hof_size) # Vary the pool of individuals offspring = algorithms.varAnd(offspring, toolbox, cxpb, mutpb) # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in offspring if not ind.fitness.valid] fitnesses = toolbox.map(toolbox.evaluate, invalid_ind) for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit # add the best back to population: offspring.extend(halloffame.items) # Update the hall of fame with the generated individuals halloffame.update(offspring) # Replace the current population by the offspring population[:] = offspring # Append the current generation statistics to the logbook record = stats.compile(population) if stats else {} logbook.record(gen=gen, nevals=len(invalid_ind), **record) if self.verbose: print(logbook.stream) return population, logbook
[docs] def ga_with_elitism(self, population_size, max_generations, gene_length, k, pmutation=0.5, pcrossover=0.5, hof=1): """ Simple genetic algorithm with elitism. Parameters ----------- population_size : int Population size. max_generations : int Maximum number of generations. gene_length : int Length of each gene. k : int k parameter for the tournament selection method pmutation : float Probability of mutation, between 0 and 1. pcrossover : float Probability of crossover, between 0 and 1. hof : int Number of individuals to stay in the hall of fame. Returns ------- best_population : list Individuals in the last population. """ # """ # # # Parameters: # ------------ # # # Returns # --------- # best_population : list # Individuals in the last population. # # """ # Genetic Algorithm constants: P_CROSSOVER = pcrossover # probability for crossover P_MUTATION = pmutation # probability for mutating an individual HALL_OF_FAME_SIZE = hof # Best individuals that pass to the other generation # set the random seed: toolbox = base.Toolbox() # As we are trying to minimize the RMSE score, that's why using -1.0. # In case, when you want to maximize accuracy for instance, use 1.0 creator.create('FitnessMin', base.Fitness, weights=[-1.0]) creator.create('Individual', list, fitness=creator.FitnessMin) # create the individual operator to fill up an Individual instance: toolbox.register('binary', bernoulli.rvs, 0.5) toolbox.register('individual', tools.initRepeat, creator.Individual, toolbox.binary, n=gene_length) # create the population operator to generate a list of individuals: toolbox.register('population', tools.initRepeat, list, toolbox.individual) # genetic operators: toolbox.register('evaluate', self.neural_train_evaluate) toolbox.register('select', tools.selTournament, tournsize=2) toolbox.register('mutate', tools.mutFlipBit, indpb=0.11) toolbox.register('mate', tools.cxUniform, indpb=0.5) # create initial population (generation 0): population = toolbox.population(n=population_size) # prepare the statistics object: stats = tools.Statistics(lambda ind: ind.fitness.values) stats.register("min", np.min) stats.register("avg", np.mean) stats.register("max", np.max) # define the hall-of-fame object: hof = tools.HallOfFame(HALL_OF_FAME_SIZE) # Genetic Algorithm flow with elitism: try: pbar = tqdm(total=max_generations) except: pbar = None population, logbook = self.eaSimpleWithElitism(population, toolbox, cxpb=P_CROSSOVER, mutpb=P_MUTATION, ngen=max_generations, stats=stats, halloffame=hof, pbar=pbar) best_population = tools.selBest(population, k=k) # convert the history list in a data frame self.df_colnames = self.df_colnames + ['loss', 'score', 'time'] self.history = pd.DataFrame(self.history, columns=self.df_colnames) self.history = self.history.sort_values(by='loss', ascending=True, ignore_index=True) print("\nBest 5 solutions:\n-----------------\n") print(self.history.head(5)) self.best = self.history.iloc[0] return best_population
# for torch nets class LoadDataSet: def __init__(self, X, y, scale_data=False): """ Prepare the dataset for regression Parameters ---------- X : numpy.darray Input data for the neural network training. y : numpy.darray Output data for the neural network training. scale_data : bool Flag to scale the training data. """ if not torch.is_tensor(X) and not torch.is_tensor(y): # # Apply scaling if necessary # if scale_data: # X = StandardScaler().fit_transform(X) self.X = torch.from_numpy(X) self.y = torch.from_numpy(y) def __len__(self): return len(self.X) def __getitem__(self, i): return self.X[i], self.y[i]
[docs]class MLP(nn.Module): """ Multilayer Perceptron class for regression. """
[docs] def __init__(self, ncols, noutput, numneurons=200, numlayers=3, dropout=0.5): """ Initialization method. Parameters ---------- ncols : int Number of attributes. noutput : int Size of the output. numneurons : int Number of neurons for the hidden layers. numlayers : int Number of hidden layers. dropout : float Dropout value. """ super().__init__() l_input = nn.Linear(ncols, numneurons) a_input = nn.ReLU() l_hidden = nn.Linear(numneurons, numneurons) a_hidden = nn.ReLU() l_output = nn.Linear(numneurons, noutput) l = [l_input, a_input] for _ in range(numlayers): l.append(l_hidden) l.append(a_hidden) l.append(l_output) self.module_list = nn.ModuleList(l)
[docs] def forward(self, x): """ Forward method using activation function and other functions defined in the torch architecture. Parameters ---------- x : numpy.array Input array. Returns ------- x : numpy.array Array before a forward step. """ for f in self.module_list: x = f(x) return x
[docs] def init_weights(self, m): """ Initilization of the ANN weights. Parameters ---------- m : MLP class. Multilayer perceptron model. """ if type(m) == nn.Linear: nn.init.xavier_normal_(m.weight)