Source code for evolutionary_feature_selection._evolutionary_feature_selection

"""
Select Features by Evolutionary Algorithm
"""
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.feature_selection import SelectorMixin
from sklearn.utils.validation import check_X_y, check_is_fitted, check_random_state, check_scalar
from sklearn.metrics import r2_score
from sklearn.linear_model import LinearRegression
from joblib import Parallel, delayed


[docs]class EvolutionaryFeatureSelection(BaseEstimator, SelectorMixin): """ A feature selection transformer that selects a set of features of given size by evolutionary optimization The optimization is performed in a procreation-mutation-selection cycle with the fitness calculated as the validation score of the predictor fitted using a particular selection of features. Parameters ---------- predictor : predictor predictor to train on the reduced feature set scoring : callable scoring function to use as fitness function for specimen selection n_features : int number of features to select per specimen population_size : int number of specimens in the population n_breeders : int number of specimens from the population from which breeders are selected. must be < population_size mutation_rate : float 0 <= mutation_rate <= 1 fraction of offspring per generation that are mutated n_mutation_features : int >0, <n_features, default = 0.1 * n_features number of features to swap upon mutation. generations : int > 0 number of generations initial_population : array-like shape(N, n_features) initial feature selections. If N < population_size, random specimens will be generated to reach population_size, if N > population_size, the first population_size specimens from initial_population will be used. Useful for continuing previous evolutionary feature selection runs. random_state : numpy.random.random_state random state for repeatability in testing fitness_trace: bool, default False if True, store trace of fitness values during fit. Use with care, may lead to large memory consumption. population_trace : bool, default False if True, store trace of populations during fit. Use with care, may lead to large memory consumption. n_jobs : int, default=1 number of parallel jobs to use for compute heavy part of fit Attributes ---------- random_state_ : Numpy random state For testing and consistent parallel processing population_ : Ndarray shape(population_size, n_features_in) The full population of feature masks in the current iteration/generation fitness_values_ : Ndarray, shape(population_size) The fitness/score values of the population current_specimen_ : Ndarray, shape(n_features_in) The specimen with the highest fitness value, same as population_[0] fitness_history_ : Ndarray, shape(generations, population_size) Trace of the population fitness values over along all generations for fit debugging and quality assessment population_history_ : Ndarray, shape(generations, populations_size, n_features_in) """
[docs] def __init__(self, predictor=None, scoring=None, n_features=1, population_size=10, n_breeders=5, mutation_rate=0.5, n_mutation_features=1, generations=10, initial_population=None, random_state=None, population_trace=False, fitness_trace=False, n_jobs=1): self.predictor = predictor self.scoring = scoring self.n_features = check_scalar(n_features, "n_features", (int, np.int64, np.int32), min_val=1) self.population_size = check_scalar(population_size, "population_size", (int, np.int64, np.int32), min_val=1) self.n_breeders = check_scalar(n_breeders, "n_breeders", (int, np.int64, np.int32), min_val=1) self.mutation_rate = check_scalar(mutation_rate, "mutation_rate", (float, np.float64, np.float32), min_val=0, max_val=1) self.n_mutation_features = check_scalar(n_mutation_features, "n_mutation_features", (int, np.int64, np.int32), min_val=0, max_val=self.n_features) self.generations = check_scalar(generations, "generations", (int, np.int64, np.int32), min_val=1) self.initial_population = initial_population self.random_state = random_state self.population_trace = population_trace self.fitness_trace = fitness_trace self.n_jobs = check_scalar(n_jobs, "n_jobs", (int, np.int64, np.int32))
def _mutate(self, specimen): """ Mutates a specimen by replacing a number of feature indices from the self._features_seen The method assures that the removed features are replaced by features not present in the original specimen Parameters ---------- specimen : array-like of feature indices or column names Returns ------- modified array-like of feature indices or column names """ self.random_state_ = check_random_state(self.random_state_) new_specimen = np.array(specimen) for i in range(self.n_mutation_features): new_specimen[self.random_state_.choice(np.argwhere(specimen).ravel())] = False new_specimen[self.random_state_.choice(np.argwhere(np.bitwise_not(specimen)).ravel())] = True return new_specimen def _procreate(self, father, mother): """ Create a new specimen by randomly combining features from parent specimens Parameters ---------- father : first parent mask array mother : second parent mask array Returns ------- mask array """ self.random_state_ = check_random_state(self.random_state_) offspring = father + mother while np.count_nonzero(offspring) > self.n_features: offspring[self.random_state_.choice(np.argwhere(offspring).ravel())] = False return offspring def _populate(self, n_features_in): """ Creates the population from initial population if specified, plus the required number of random specimen Parameters ---------- n_features_in : int number of features in the input data set """ self.random_state_ = check_random_state(self.random_state_) self.population_ = np.zeros((self.population_size, n_features_in), dtype=bool) if self.initial_population is not None: row = min(self.initial_population.shape[0], self.population_size) self.population_[0:row] = self.initial_population[0:row] else: row = 0 while row < self.population_size: self.population_[row, self.random_state_.randint(0, n_features_in, self.n_features)] = True row += 1 def _calculate_fitness_values(self, X, y): """ Calculate Fitnesses for the population Parameters ---------- y : array-line true values Returns ------- array of float fitness values """ for specimen_index in range(self.population_.shape[0]): self.fitness_values_[specimen_index] = self._calculate_fitness_specimen(X, y, self.population_[specimen_index]) return self.fitness_values_ def _calculate_fitness_specimen(self, X, y, specimen): local_X = X[:, specimen] y_pred = self.predictor_.fit(local_X, y).predict(local_X) fitness = self.scoring_(y, y_pred) return fitness def _get_support_mask(self): check_is_fitted(self, ("population_", "current_specimen_", "fitness_values_")) return self.current_specimen_ def fit(self, X, y): self._validate_data(X=X, y=y, reset=True) X, y = check_X_y(X, y, ensure_min_features=2) if self.predictor is None: self.predictor_ = LinearRegression() else: self.predictor_ = self.predictor() if self.scoring is None: self.scoring_ = r2_score else: self.scoring_ = self.scoring self.random_state_ = check_random_state(self.random_state) self._populate(X.shape[1]) # set initial fitness values and sort initial specimens by fitness with Parallel(n_jobs=self.n_jobs) as parallel: self.fitness_values_ = np.zeros(self.population_size, dtype=float) self._calculate_fitness_values(X, y) fitness_order = np.argsort(- self.fitness_values_) self.population_ = self.population_[fitness_order] self.fitness_values_ = self.fitness_values_[fitness_order] self.current_specimen_ = self.population_[0] # iterate generations if self.fitness_trace: self.fitness_history_ = np.zeros((self.generations + 1, self.population_size)) self.fitness_history_[0] = self.fitness_values_ if self.population_trace: self.population_history_ = np.zeros((self.generations + 1, self.population_size, self.n_features_in_)) self.population_history_[0] = self.population_ temp_pop_size = (2 * self.population_size) - self.n_breeders converged = False generation = 0 while not converged: temp_pop = np.zeros((temp_pop_size, self.n_features_in_), dtype=bool) temp_pop[0:self.population_size] = self.population_ temp_fitness_values = np.zeros(temp_pop_size) temp_fitness_values[0:self.population_size] = self.fitness_values_ for offspring_idx in range(self.population_size, temp_pop_size): dad, mom = self.random_state_.randint(self.n_breeders, size=2) temp_pop[offspring_idx] = self._procreate(self.population_[dad], self.population_[mom]) if self.random_state_.rand() <= self.mutation_rate: temp_pop[offspring_idx] = self._mutate(temp_pop[offspring_idx]) temp_fitness_values[self.population_size:] = parallel( delayed(self._calculate_fitness_specimen)(X, y, temp_pop[i]) for i in range(self.population_size, temp_pop_size)) fitness_order = np.argsort(-temp_fitness_values)[0:self.population_size] self.population_ = temp_pop[fitness_order] self.fitness_values_ = temp_fitness_values[fitness_order] self.current_specimen_ = self.population_[0] generation += 1 if self.fitness_trace: self.fitness_history_[generation] = self.fitness_values_ if self.population_trace: self.population_history_[generation] = self.population_ if generation >= self.generations: converged = True return self