""" Genetic Algorithm for feature selection and hyperparameter optimization """ import numpy as np import random import time import warnings from typing import Dict, List, Callable, Optional, Tuple from joblib import Parallel, delayed from xgboost import XGBClassifier from lightgbm import LGBMClassifier from sklearn.ensemble import GradientBoostingClassifier, AdaBoostClassifier from sklearn.metrics import accuracy_score import config warnings.filterwarnings( 'ignore', message='X does not have valid feature names') warnings.filterwarnings('ignore', category=UserWarning, module='sklearn') class GeneticAlgorithm: """GA for optimizing features + hyperparameters + ensemble weights""" def __init__(self, X: np.ndarray, y: np.ndarray, n_features_to_select: int = 80, skip_feature_selection: bool = False, selected_models: List[str] = None): """ Initialize GA Args: X: Training data y: Training labels n_features_to_select: Number of features to select skip_feature_selection: If True, use all features (only optimize hyperparams) selected_models: List of models to train ['xgboost', 'lightgbm', 'gradientboosting', 'adaboost'] """ self.X = X self.y = y self.n_features = X.shape[1] self.skip_feature_selection = skip_feature_selection # Model selection if selected_models is None or len(selected_models) == 0: self.selected_models = ['xgboost', 'lightgbm', 'gradientboosting', 'adaboost'] else: self.selected_models = selected_models self.n_models = len(self.selected_models) if skip_feature_selection: self.n_select = self.n_features print( f"✅ GA will optimize: HYPERPARAMETERS ONLY (using all {self.n_features} features)") else: if n_features_to_select > self.n_features: print( f"⚠️ Adjusted: {n_features_to_select} → {self.n_features} features") self.n_select = self.n_features else: self.n_select = n_features_to_select print( f"✅ GA will optimize: FEATURES ({self.n_select}/{self.n_features}) + HYPERPARAMETERS") print( f"✅ Training models: {', '.join(self.selected_models)} ({self.n_models} models)") self.n_classes = len(np.unique(y)) # GA parameters from config self.population_size = config.GA_CONFIG['population_size'] self.n_generations = config.GA_CONFIG['n_generations'] self.mutation_rate = config.GA_CONFIG['mutation_rate'] self.crossover_rate = config.GA_CONFIG['crossover_rate'] self.elite_size = config.GA_CONFIG['elite_size'] self.early_stopping_patience = config.GA_CONFIG['early_stopping_patience'] self.early_stopping_tolerance = config.GA_CONFIG['early_stopping_tolerance'] self.best_chromosome = None self.best_fitness = 0 self.history = [] self.log_messages = [] def log(self, message: str): """Add log message with timestamp""" timestamp = time.strftime("%H:%M:%S") log_entry = f"[{timestamp}] {message}" self.log_messages.append(log_entry) print(log_entry) def create_chromosome(self) -> Dict: """Create random chromosome""" chromosome = {} # Feature selection (skip if not optimizing features) if self.skip_feature_selection: chromosome['feature_indices'] = np.arange(self.n_features) else: n_to_select = min(self.n_select, self.n_features) chromosome['feature_indices'] = np.sort(np.random.choice( self.n_features, n_to_select, replace=False )) # Add hyperparameters ONLY for selected models for model_name in self.selected_models: model_prefix = self._get_model_prefix(model_name) if model_prefix in config.MODEL_HYPERPARAMS: for param_name, param_values in config.MODEL_HYPERPARAMS[model_prefix].items(): key = f"{model_prefix}_{param_name}" chromosome[key] = random.choice(param_values) # Ensemble weights (for selected models only) chromosome['weights'] = self._random_weights(self.n_models) return chromosome def _get_model_prefix(self, model_name: str) -> str: """Get model prefix for config lookup""" prefix_map = { 'xgboost': 'xgb', 'lightgbm': 'lgbm', 'gradientboosting': 'gb', 'adaboost': 'ada' } return prefix_map.get(model_name, model_name) def _random_weights(self, n: int) -> np.ndarray: """Generate n random weights that sum to 1""" return np.random.dirichlet(np.ones(n)) def fitness(self, chromosome: Dict, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray) -> float: """Calculate fitness using validation accuracy""" try: feature_indices = chromosome['feature_indices'] X_train_selected = X_train[:, feature_indices] X_val_selected = X_val[:, feature_indices] models = [] # Train only selected models for model_name in self.selected_models: model = self._train_model( model_name, chromosome, X_train_selected, y_train ) models.append(model) # Ensemble prediction predictions = [model.predict_proba( X_val_selected) for model in models] weights = chromosome['weights'] ensemble_proba = np.average(predictions, axis=0, weights=weights) y_pred = np.argmax(ensemble_proba, axis=1) accuracy = accuracy_score(y_val, y_pred) return accuracy except Exception as e: print(f"⚠️ Error in fitness evaluation: {e}") import traceback traceback.print_exc() return 0.0 def _train_model(self, model_name: str, chromosome: Dict, X_train: np.ndarray, y_train: np.ndarray): """Train a single model based on name and chromosome config""" if model_name == 'xgboost': model = XGBClassifier( n_estimators=chromosome.get('xgb_n_estimators', 100), max_depth=chromosome.get('xgb_max_depth', 6), learning_rate=chromosome.get('xgb_learning_rate', 0.1), subsample=chromosome.get('xgb_subsample', 0.8), colsample_bytree=chromosome.get('xgb_colsample_bytree', 0.8), min_child_weight=chromosome.get('xgb_min_child_weight', 1), gamma=chromosome.get('xgb_gamma', 0), objective='multi:softprob', num_class=self.n_classes, random_state=config.RANDOM_STATE, n_jobs=-1, verbosity=0 ) elif model_name == 'lightgbm': model = LGBMClassifier( n_estimators=chromosome.get('lgbm_n_estimators', 100), num_leaves=chromosome.get('lgbm_num_leaves', 31), learning_rate=chromosome.get('lgbm_learning_rate', 0.1), min_child_samples=chromosome.get('lgbm_min_child_samples', 20), subsample=chromosome.get('lgbm_subsample', 0.8), colsample_bytree=chromosome.get('lgbm_colsample_bytree', 0.8), reg_alpha=chromosome.get('lgbm_reg_alpha', 0), reg_lambda=chromosome.get('lgbm_reg_lambda', 0), objective='multiclass', num_class=self.n_classes, random_state=config.RANDOM_STATE, n_jobs=-1, verbose=-1, force_col_wise=True ) elif model_name == 'gradientboosting': model = GradientBoostingClassifier( n_estimators=chromosome.get('gb_n_estimators', 100), max_depth=chromosome.get('gb_max_depth', 5), learning_rate=chromosome.get('gb_learning_rate', 0.1), subsample=chromosome.get('gb_subsample', 0.8), min_samples_split=chromosome.get('gb_min_samples_split', 2), min_samples_leaf=chromosome.get('gb_min_samples_leaf', 1), random_state=config.RANDOM_STATE ) elif model_name == 'adaboost': model = AdaBoostClassifier( n_estimators=chromosome.get('ada_n_estimators', 100), learning_rate=chromosome.get('ada_learning_rate', 1.0), algorithm=config.ADABOOST_ALGORITHM, random_state=config.RANDOM_STATE ) else: raise ValueError(f"Unknown model: {model_name}") model.fit(X_train, y_train) return model def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]: """Crossover operation""" if random.random() > self.crossover_rate: return parent1.copy(), parent2.copy() child1 = {} child2 = {} # Feature crossover if self.skip_feature_selection: child1['feature_indices'] = parent1['feature_indices'].copy() child2['feature_indices'] = parent2['feature_indices'].copy() else: mask = np.random.rand(self.n_select) < 0.5 child1_features = np.where( mask, parent1['feature_indices'], parent2['feature_indices']) child2_features = np.where( mask, parent2['feature_indices'], parent1['feature_indices']) child1_features = np.unique(child1_features) child2_features = np.unique(child2_features) while len(child1_features) < self.n_select: new_feat = random.randint(0, self.n_features - 1) if new_feat not in child1_features: child1_features = np.append(child1_features, new_feat) while len(child2_features) < self.n_select: new_feat = random.randint(0, self.n_features - 1) if new_feat not in child2_features: child2_features = np.append(child2_features, new_feat) child1['feature_indices'] = np.sort( child1_features[:self.n_select]) child2['feature_indices'] = np.sort( child2_features[:self.n_select]) # Hyperparameter crossover for key in parent1.keys(): if key != 'feature_indices': if random.random() < 0.5: child1[key] = parent1[key] child2[key] = parent2[key] else: child1[key] = parent2[key] child2[key] = parent1[key] return child1, child2 def mutate(self, chromosome: Dict) -> Dict: """Mutation operation""" mutated = chromosome.copy() # Feature mutation if not self.skip_feature_selection: if random.random() < self.mutation_rate: n_replace = random.randint(1, min(5, self.n_select)) indices_to_replace = np.random.choice( self.n_select, n_replace, replace=False) for idx in indices_to_replace: new_feat = random.randint(0, self.n_features - 1) while new_feat in mutated['feature_indices']: new_feat = random.randint(0, self.n_features - 1) mutated['feature_indices'][idx] = new_feat mutated['feature_indices'] = np.sort( mutated['feature_indices']) # Hyperparameter mutation if random.random() < self.mutation_rate: param_keys = [k for k in chromosome.keys() if k not in [ 'feature_indices', 'weights']] if param_keys: param_to_mutate = random.choice(param_keys) temp = self.create_chromosome() mutated[param_to_mutate] = temp[param_to_mutate] # Weight mutation if random.random() < self.mutation_rate: mutated['weights'] = self._random_weights(self.n_models) return mutated def evaluate_population_parallel(self, population: List[Dict], X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, n_jobs: int = 2) -> List[float]: """Evaluate entire population in parallel""" # Limit n_jobs to prevent resource exhaustion safe_n_jobs = min(n_jobs, 4, len(population) // 2) if safe_n_jobs < 1: safe_n_jobs = 1 self.log( f" Evaluating {len(population)} individuals (n_jobs={safe_n_jobs})...") try: fitness_scores = Parallel( n_jobs=safe_n_jobs, verbose=0, backend='loky', timeout=600 )( delayed(self.fitness)( chromosome, X_train, y_train, X_val, y_val) for chromosome in population ) except Exception as e: self.log(f"⚠️ Parallel evaluation failed: {e}") self.log(" Falling back to sequential evaluation...") fitness_scores = [] for i, chromosome in enumerate(population): if (i + 1) % 5 == 0: self.log( f" Progress: {i+1}/{len(population)} individuals") score = self.fitness(chromosome, X_train, y_train, X_val, y_val) fitness_scores.append(score) return fitness_scores def evolve(self, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, progress_callback: Optional[Callable] = None, n_jobs: int = 2) -> Dict: """Main GA evolution loop""" self.log("="*70) self.log("🧬 GENETIC ALGORITHM OPTIMIZATION") self.log("="*70) self.log(f"Population size: {self.population_size}") self.log(f"Generations: {self.n_generations}") self.log( f"Feature selection: {'DISABLED (hyperparams only)' if self.skip_feature_selection else f'ENABLED ({self.n_select}/{self.n_features})'}") self.log( f"Selected models: {', '.join(self.selected_models)} ({self.n_models} models)") self.log(f"Early stopping patience: {self.early_stopping_patience}") self.log(f"Parallel jobs: {n_jobs}") self.log("="*70) population = [self.create_chromosome() for _ in range(self.population_size)] start_time = time.time() no_improve_count = 0 for generation in range(self.n_generations): try: gen_start = time.time() self.log( f"\n📊 Generation {generation + 1}/{self.n_generations}") # Parallel fitness evaluation fitness_scores = self.evaluate_population_parallel( population, X_train, y_train, X_val, y_val, n_jobs=n_jobs ) # Validation check if len(fitness_scores) != len(population): self.log( f"⚠️ Warning: Got {len(fitness_scores)} scores for {len(population)} individuals") while len(fitness_scores) < len(population): fitness_scores.append(0.0) max_fitness = max(fitness_scores) avg_fitness = np.mean(fitness_scores) std_fitness = np.std(fitness_scores) max_idx = fitness_scores.index(max_fitness) # Track improvement improved = False if max_fitness > self.best_fitness + self.early_stopping_tolerance: prev_best = self.best_fitness self.best_fitness = max_fitness self.best_chromosome = population[max_idx].copy() no_improve_count = 0 improved = True self.log( f" ✨ NEW BEST: {max_fitness:.4f} (+{max_fitness - prev_best:.4f})") else: no_improve_count += 1 self.log( f" → Best: {max_fitness:.4f} (no improvement, count={no_improve_count})") # Log statistics self.log( f" Average: {avg_fitness:.4f} (σ={std_fitness:.4f})") self.log( f" Range: [{min(fitness_scores):.4f}, {max(fitness_scores):.4f}]") gen_time = time.time() - gen_start elapsed = time.time() - start_time avg_gen_time = elapsed / (generation + 1) eta = avg_gen_time * (self.n_generations - generation - 1) self.log( f" Time: {gen_time:.1f}s | Elapsed: {elapsed/60:.1f}min | ETA: {eta/60:.1f}min") self.history.append({ 'generation': generation + 1, 'best_fitness': max_fitness, 'avg_fitness': avg_fitness, 'std_fitness': std_fitness, 'time': gen_time, 'improved': improved }) # Update progress callback if progress_callback: try: progress_callback( (generation + 1) / self.n_generations, desc=f"Gen {generation+1}/{self.n_generations} | Best: {max_fitness:.4f} | Avg: {avg_fitness:.4f} | ETA: {eta/60:.0f}min" ) except Exception as e: self.log(f"⚠️ Progress callback failed: {e}") # Early stopping check if no_improve_count >= self.early_stopping_patience: self.log( f"\n🛑 EARLY STOPPING at generation {generation + 1}") self.log( f" No improvement for {self.early_stopping_patience} consecutive generations") self.log(f" Best fitness: {self.best_fitness:.4f}") break # Explicit flush import sys sys.stdout.flush() # Selection self.log(f" Creating next generation...") selected = [] for _ in range(self.population_size - self.elite_size): tournament = random.sample( list(zip(population, fitness_scores)), 3) winner = max(tournament, key=lambda x: x[1])[0] selected.append(winner) elite_indices = np.argsort(fitness_scores)[-self.elite_size:] elite = [population[i] for i in elite_indices] # Crossover & Mutation offspring = [] for i in range(0, len(selected), 2): if i + 1 < len(selected): child1, child2 = self.crossover( selected[i], selected[i+1]) offspring.append(self.mutate(child1)) offspring.append(self.mutate(child2)) population = elite + \ offspring[:self.population_size - self.elite_size] self.log(f" ✓ Generation {generation + 1} complete") except KeyboardInterrupt: self.log("\n⚠️ Training interrupted by user") break except Exception as e: self.log(f"\n❌ Error in generation {generation + 1}: {e}") import traceback self.log(traceback.format_exc()) if generation == 0: self.log("❌ First generation failed, aborting") return None else: self.log("⚠️ Attempting to continue...") continue total_time = time.time() - start_time self.log("\n" + "="*70) self.log("✅ GA OPTIMIZATION COMPLETE") self.log("="*70) self.log(f"Final best fitness: {self.best_fitness:.4f}") self.log( f"Total generations: {len(self.history)}/{self.n_generations}") self.log(f"Total time: {total_time/60:.1f} minutes") if len(self.history) > 0: self.log( f"Average time per generation: {total_time/len(self.history):.1f}s") self.log("="*70) if self.best_chromosome is None: self.log( "⚠️ Warning: No improvement found, using best from final generation") fitness_scores = self.evaluate_population_parallel( population, X_train, y_train, X_val, y_val, n_jobs=n_jobs ) max_idx = fitness_scores.index(max(fitness_scores)) self.best_chromosome = population[max_idx].copy() self.best_fitness = fitness_scores[max_idx] return self.best_chromosome