|
|
""" |
|
|
Genetic Algorithm for feature selection and hyperparameter optimization |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import random |
|
|
import time |
|
|
import warnings |
|
|
from typing import Dict, List, Callable, Optional, Tuple |
|
|
from joblib import Parallel, delayed |
|
|
|
|
|
from xgboost import XGBClassifier |
|
|
from lightgbm import LGBMClassifier |
|
|
from sklearn.ensemble import GradientBoostingClassifier, AdaBoostClassifier |
|
|
from sklearn.metrics import accuracy_score |
|
|
|
|
|
import config |
|
|
|
|
|
warnings.filterwarnings( |
|
|
'ignore', message='X does not have valid feature names') |
|
|
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn') |
|
|
|
|
|
|
|
|
class GeneticAlgorithm: |
|
|
"""GA for optimizing features + hyperparameters + ensemble weights""" |
|
|
|
|
|
def __init__(self, X: np.ndarray, y: np.ndarray, |
|
|
n_features_to_select: int = 80, |
|
|
skip_feature_selection: bool = False, |
|
|
selected_models: List[str] = None): |
|
|
""" |
|
|
Initialize GA |
|
|
|
|
|
Args: |
|
|
X: Training data |
|
|
y: Training labels |
|
|
n_features_to_select: Number of features to select |
|
|
skip_feature_selection: If True, use all features (only optimize hyperparams) |
|
|
selected_models: List of models to train ['xgboost', 'lightgbm', 'gradientboosting', 'adaboost'] |
|
|
""" |
|
|
self.X = X |
|
|
self.y = y |
|
|
self.n_features = X.shape[1] |
|
|
self.skip_feature_selection = skip_feature_selection |
|
|
|
|
|
|
|
|
if selected_models is None or len(selected_models) == 0: |
|
|
self.selected_models = ['xgboost', |
|
|
'lightgbm', 'gradientboosting', 'adaboost'] |
|
|
else: |
|
|
self.selected_models = selected_models |
|
|
|
|
|
self.n_models = len(self.selected_models) |
|
|
|
|
|
if skip_feature_selection: |
|
|
self.n_select = self.n_features |
|
|
print( |
|
|
f"✅ GA will optimize: HYPERPARAMETERS ONLY (using all {self.n_features} features)") |
|
|
else: |
|
|
if n_features_to_select > self.n_features: |
|
|
print( |
|
|
f"⚠️ Adjusted: {n_features_to_select} → {self.n_features} features") |
|
|
self.n_select = self.n_features |
|
|
else: |
|
|
self.n_select = n_features_to_select |
|
|
print( |
|
|
f"✅ GA will optimize: FEATURES ({self.n_select}/{self.n_features}) + HYPERPARAMETERS") |
|
|
|
|
|
print( |
|
|
f"✅ Training models: {', '.join(self.selected_models)} ({self.n_models} models)") |
|
|
|
|
|
self.n_classes = len(np.unique(y)) |
|
|
|
|
|
|
|
|
self.population_size = config.GA_CONFIG['population_size'] |
|
|
self.n_generations = config.GA_CONFIG['n_generations'] |
|
|
self.mutation_rate = config.GA_CONFIG['mutation_rate'] |
|
|
self.crossover_rate = config.GA_CONFIG['crossover_rate'] |
|
|
self.elite_size = config.GA_CONFIG['elite_size'] |
|
|
self.early_stopping_patience = config.GA_CONFIG['early_stopping_patience'] |
|
|
self.early_stopping_tolerance = config.GA_CONFIG['early_stopping_tolerance'] |
|
|
|
|
|
self.best_chromosome = None |
|
|
self.best_fitness = 0 |
|
|
self.history = [] |
|
|
self.log_messages = [] |
|
|
|
|
|
def log(self, message: str): |
|
|
"""Add log message with timestamp""" |
|
|
timestamp = time.strftime("%H:%M:%S") |
|
|
log_entry = f"[{timestamp}] {message}" |
|
|
self.log_messages.append(log_entry) |
|
|
print(log_entry) |
|
|
|
|
|
def create_chromosome(self) -> Dict: |
|
|
"""Create random chromosome""" |
|
|
|
|
|
chromosome = {} |
|
|
|
|
|
|
|
|
if self.skip_feature_selection: |
|
|
chromosome['feature_indices'] = np.arange(self.n_features) |
|
|
else: |
|
|
n_to_select = min(self.n_select, self.n_features) |
|
|
chromosome['feature_indices'] = np.sort(np.random.choice( |
|
|
self.n_features, n_to_select, replace=False |
|
|
)) |
|
|
|
|
|
|
|
|
for model_name in self.selected_models: |
|
|
model_prefix = self._get_model_prefix(model_name) |
|
|
|
|
|
if model_prefix in config.MODEL_HYPERPARAMS: |
|
|
for param_name, param_values in config.MODEL_HYPERPARAMS[model_prefix].items(): |
|
|
key = f"{model_prefix}_{param_name}" |
|
|
chromosome[key] = random.choice(param_values) |
|
|
|
|
|
|
|
|
chromosome['weights'] = self._random_weights(self.n_models) |
|
|
|
|
|
return chromosome |
|
|
|
|
|
def _get_model_prefix(self, model_name: str) -> str: |
|
|
"""Get model prefix for config lookup""" |
|
|
prefix_map = { |
|
|
'xgboost': 'xgb', |
|
|
'lightgbm': 'lgbm', |
|
|
'gradientboosting': 'gb', |
|
|
'adaboost': 'ada' |
|
|
} |
|
|
return prefix_map.get(model_name, model_name) |
|
|
|
|
|
def _random_weights(self, n: int) -> np.ndarray: |
|
|
"""Generate n random weights that sum to 1""" |
|
|
return np.random.dirichlet(np.ones(n)) |
|
|
|
|
|
def fitness(self, chromosome: Dict, X_train: np.ndarray, y_train: np.ndarray, |
|
|
X_val: np.ndarray, y_val: np.ndarray) -> float: |
|
|
"""Calculate fitness using validation accuracy""" |
|
|
try: |
|
|
feature_indices = chromosome['feature_indices'] |
|
|
|
|
|
X_train_selected = X_train[:, feature_indices] |
|
|
X_val_selected = X_val[:, feature_indices] |
|
|
|
|
|
models = [] |
|
|
|
|
|
|
|
|
for model_name in self.selected_models: |
|
|
model = self._train_model( |
|
|
model_name, chromosome, |
|
|
X_train_selected, y_train |
|
|
) |
|
|
models.append(model) |
|
|
|
|
|
|
|
|
predictions = [model.predict_proba( |
|
|
X_val_selected) for model in models] |
|
|
weights = chromosome['weights'] |
|
|
ensemble_proba = np.average(predictions, axis=0, weights=weights) |
|
|
y_pred = np.argmax(ensemble_proba, axis=1) |
|
|
|
|
|
accuracy = accuracy_score(y_val, y_pred) |
|
|
return accuracy |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Error in fitness evaluation: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return 0.0 |
|
|
|
|
|
def _train_model(self, model_name: str, chromosome: Dict, X_train: np.ndarray, y_train: np.ndarray): |
|
|
"""Train a single model based on name and chromosome config""" |
|
|
|
|
|
if model_name == 'xgboost': |
|
|
model = XGBClassifier( |
|
|
n_estimators=chromosome.get('xgb_n_estimators', 100), |
|
|
max_depth=chromosome.get('xgb_max_depth', 6), |
|
|
learning_rate=chromosome.get('xgb_learning_rate', 0.1), |
|
|
subsample=chromosome.get('xgb_subsample', 0.8), |
|
|
colsample_bytree=chromosome.get('xgb_colsample_bytree', 0.8), |
|
|
min_child_weight=chromosome.get('xgb_min_child_weight', 1), |
|
|
gamma=chromosome.get('xgb_gamma', 0), |
|
|
objective='multi:softprob', |
|
|
num_class=self.n_classes, |
|
|
random_state=config.RANDOM_STATE, |
|
|
n_jobs=-1, |
|
|
verbosity=0 |
|
|
) |
|
|
|
|
|
elif model_name == 'lightgbm': |
|
|
model = LGBMClassifier( |
|
|
n_estimators=chromosome.get('lgbm_n_estimators', 100), |
|
|
num_leaves=chromosome.get('lgbm_num_leaves', 31), |
|
|
learning_rate=chromosome.get('lgbm_learning_rate', 0.1), |
|
|
min_child_samples=chromosome.get('lgbm_min_child_samples', 20), |
|
|
subsample=chromosome.get('lgbm_subsample', 0.8), |
|
|
colsample_bytree=chromosome.get('lgbm_colsample_bytree', 0.8), |
|
|
reg_alpha=chromosome.get('lgbm_reg_alpha', 0), |
|
|
reg_lambda=chromosome.get('lgbm_reg_lambda', 0), |
|
|
objective='multiclass', |
|
|
num_class=self.n_classes, |
|
|
random_state=config.RANDOM_STATE, |
|
|
n_jobs=-1, |
|
|
verbose=-1, |
|
|
force_col_wise=True |
|
|
) |
|
|
|
|
|
elif model_name == 'gradientboosting': |
|
|
model = GradientBoostingClassifier( |
|
|
n_estimators=chromosome.get('gb_n_estimators', 100), |
|
|
max_depth=chromosome.get('gb_max_depth', 5), |
|
|
learning_rate=chromosome.get('gb_learning_rate', 0.1), |
|
|
subsample=chromosome.get('gb_subsample', 0.8), |
|
|
min_samples_split=chromosome.get('gb_min_samples_split', 2), |
|
|
min_samples_leaf=chromosome.get('gb_min_samples_leaf', 1), |
|
|
random_state=config.RANDOM_STATE |
|
|
) |
|
|
|
|
|
elif model_name == 'adaboost': |
|
|
model = AdaBoostClassifier( |
|
|
n_estimators=chromosome.get('ada_n_estimators', 100), |
|
|
learning_rate=chromosome.get('ada_learning_rate', 1.0), |
|
|
algorithm=config.ADABOOST_ALGORITHM, |
|
|
random_state=config.RANDOM_STATE |
|
|
) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unknown model: {model_name}") |
|
|
|
|
|
model.fit(X_train, y_train) |
|
|
return model |
|
|
|
|
|
def crossover(self, parent1: Dict, parent2: Dict) -> Tuple[Dict, Dict]: |
|
|
"""Crossover operation""" |
|
|
if random.random() > self.crossover_rate: |
|
|
return parent1.copy(), parent2.copy() |
|
|
|
|
|
child1 = {} |
|
|
child2 = {} |
|
|
|
|
|
|
|
|
if self.skip_feature_selection: |
|
|
child1['feature_indices'] = parent1['feature_indices'].copy() |
|
|
child2['feature_indices'] = parent2['feature_indices'].copy() |
|
|
else: |
|
|
mask = np.random.rand(self.n_select) < 0.5 |
|
|
child1_features = np.where( |
|
|
mask, parent1['feature_indices'], parent2['feature_indices']) |
|
|
child2_features = np.where( |
|
|
mask, parent2['feature_indices'], parent1['feature_indices']) |
|
|
|
|
|
child1_features = np.unique(child1_features) |
|
|
child2_features = np.unique(child2_features) |
|
|
|
|
|
while len(child1_features) < self.n_select: |
|
|
new_feat = random.randint(0, self.n_features - 1) |
|
|
if new_feat not in child1_features: |
|
|
child1_features = np.append(child1_features, new_feat) |
|
|
|
|
|
while len(child2_features) < self.n_select: |
|
|
new_feat = random.randint(0, self.n_features - 1) |
|
|
if new_feat not in child2_features: |
|
|
child2_features = np.append(child2_features, new_feat) |
|
|
|
|
|
child1['feature_indices'] = np.sort( |
|
|
child1_features[:self.n_select]) |
|
|
child2['feature_indices'] = np.sort( |
|
|
child2_features[:self.n_select]) |
|
|
|
|
|
|
|
|
for key in parent1.keys(): |
|
|
if key != 'feature_indices': |
|
|
if random.random() < 0.5: |
|
|
child1[key] = parent1[key] |
|
|
child2[key] = parent2[key] |
|
|
else: |
|
|
child1[key] = parent2[key] |
|
|
child2[key] = parent1[key] |
|
|
|
|
|
return child1, child2 |
|
|
|
|
|
def mutate(self, chromosome: Dict) -> Dict: |
|
|
"""Mutation operation""" |
|
|
mutated = chromosome.copy() |
|
|
|
|
|
|
|
|
if not self.skip_feature_selection: |
|
|
if random.random() < self.mutation_rate: |
|
|
n_replace = random.randint(1, min(5, self.n_select)) |
|
|
indices_to_replace = np.random.choice( |
|
|
self.n_select, n_replace, replace=False) |
|
|
|
|
|
for idx in indices_to_replace: |
|
|
new_feat = random.randint(0, self.n_features - 1) |
|
|
while new_feat in mutated['feature_indices']: |
|
|
new_feat = random.randint(0, self.n_features - 1) |
|
|
mutated['feature_indices'][idx] = new_feat |
|
|
|
|
|
mutated['feature_indices'] = np.sort( |
|
|
mutated['feature_indices']) |
|
|
|
|
|
|
|
|
if random.random() < self.mutation_rate: |
|
|
param_keys = [k for k in chromosome.keys() if k not in [ |
|
|
'feature_indices', 'weights']] |
|
|
if param_keys: |
|
|
param_to_mutate = random.choice(param_keys) |
|
|
temp = self.create_chromosome() |
|
|
mutated[param_to_mutate] = temp[param_to_mutate] |
|
|
|
|
|
|
|
|
if random.random() < self.mutation_rate: |
|
|
mutated['weights'] = self._random_weights(self.n_models) |
|
|
|
|
|
return mutated |
|
|
|
|
|
def evaluate_population_parallel(self, population: List[Dict], |
|
|
X_train: np.ndarray, y_train: np.ndarray, |
|
|
X_val: np.ndarray, y_val: np.ndarray, |
|
|
n_jobs: int = 2) -> List[float]: |
|
|
"""Evaluate entire population in parallel""" |
|
|
|
|
|
|
|
|
safe_n_jobs = min(n_jobs, 4, len(population) // 2) |
|
|
if safe_n_jobs < 1: |
|
|
safe_n_jobs = 1 |
|
|
|
|
|
self.log( |
|
|
f" Evaluating {len(population)} individuals (n_jobs={safe_n_jobs})...") |
|
|
|
|
|
try: |
|
|
fitness_scores = Parallel( |
|
|
n_jobs=safe_n_jobs, |
|
|
verbose=0, |
|
|
backend='loky', |
|
|
timeout=600 |
|
|
)( |
|
|
delayed(self.fitness)( |
|
|
chromosome, X_train, y_train, X_val, y_val) |
|
|
for chromosome in population |
|
|
) |
|
|
except Exception as e: |
|
|
self.log(f"⚠️ Parallel evaluation failed: {e}") |
|
|
self.log(" Falling back to sequential evaluation...") |
|
|
|
|
|
fitness_scores = [] |
|
|
for i, chromosome in enumerate(population): |
|
|
if (i + 1) % 5 == 0: |
|
|
self.log( |
|
|
f" Progress: {i+1}/{len(population)} individuals") |
|
|
score = self.fitness(chromosome, X_train, |
|
|
y_train, X_val, y_val) |
|
|
fitness_scores.append(score) |
|
|
|
|
|
return fitness_scores |
|
|
|
|
|
def evolve(self, X_train: np.ndarray, y_train: np.ndarray, |
|
|
X_val: np.ndarray, y_val: np.ndarray, |
|
|
progress_callback: Optional[Callable] = None, |
|
|
n_jobs: int = 2) -> Dict: |
|
|
"""Main GA evolution loop""" |
|
|
|
|
|
self.log("="*70) |
|
|
self.log("🧬 GENETIC ALGORITHM OPTIMIZATION") |
|
|
self.log("="*70) |
|
|
self.log(f"Population size: {self.population_size}") |
|
|
self.log(f"Generations: {self.n_generations}") |
|
|
self.log( |
|
|
f"Feature selection: {'DISABLED (hyperparams only)' if self.skip_feature_selection else f'ENABLED ({self.n_select}/{self.n_features})'}") |
|
|
self.log( |
|
|
f"Selected models: {', '.join(self.selected_models)} ({self.n_models} models)") |
|
|
self.log(f"Early stopping patience: {self.early_stopping_patience}") |
|
|
self.log(f"Parallel jobs: {n_jobs}") |
|
|
self.log("="*70) |
|
|
|
|
|
population = [self.create_chromosome() |
|
|
for _ in range(self.population_size)] |
|
|
|
|
|
start_time = time.time() |
|
|
no_improve_count = 0 |
|
|
|
|
|
for generation in range(self.n_generations): |
|
|
try: |
|
|
gen_start = time.time() |
|
|
|
|
|
self.log( |
|
|
f"\n📊 Generation {generation + 1}/{self.n_generations}") |
|
|
|
|
|
|
|
|
fitness_scores = self.evaluate_population_parallel( |
|
|
population, X_train, y_train, X_val, y_val, n_jobs=n_jobs |
|
|
) |
|
|
|
|
|
|
|
|
if len(fitness_scores) != len(population): |
|
|
self.log( |
|
|
f"⚠️ Warning: Got {len(fitness_scores)} scores for {len(population)} individuals") |
|
|
while len(fitness_scores) < len(population): |
|
|
fitness_scores.append(0.0) |
|
|
|
|
|
max_fitness = max(fitness_scores) |
|
|
avg_fitness = np.mean(fitness_scores) |
|
|
std_fitness = np.std(fitness_scores) |
|
|
max_idx = fitness_scores.index(max_fitness) |
|
|
|
|
|
|
|
|
improved = False |
|
|
if max_fitness > self.best_fitness + self.early_stopping_tolerance: |
|
|
prev_best = self.best_fitness |
|
|
self.best_fitness = max_fitness |
|
|
self.best_chromosome = population[max_idx].copy() |
|
|
no_improve_count = 0 |
|
|
improved = True |
|
|
self.log( |
|
|
f" ✨ NEW BEST: {max_fitness:.4f} (+{max_fitness - prev_best:.4f})") |
|
|
else: |
|
|
no_improve_count += 1 |
|
|
self.log( |
|
|
f" → Best: {max_fitness:.4f} (no improvement, count={no_improve_count})") |
|
|
|
|
|
|
|
|
self.log( |
|
|
f" Average: {avg_fitness:.4f} (σ={std_fitness:.4f})") |
|
|
self.log( |
|
|
f" Range: [{min(fitness_scores):.4f}, {max(fitness_scores):.4f}]") |
|
|
|
|
|
gen_time = time.time() - gen_start |
|
|
elapsed = time.time() - start_time |
|
|
avg_gen_time = elapsed / (generation + 1) |
|
|
eta = avg_gen_time * (self.n_generations - generation - 1) |
|
|
|
|
|
self.log( |
|
|
f" Time: {gen_time:.1f}s | Elapsed: {elapsed/60:.1f}min | ETA: {eta/60:.1f}min") |
|
|
|
|
|
self.history.append({ |
|
|
'generation': generation + 1, |
|
|
'best_fitness': max_fitness, |
|
|
'avg_fitness': avg_fitness, |
|
|
'std_fitness': std_fitness, |
|
|
'time': gen_time, |
|
|
'improved': improved |
|
|
}) |
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
try: |
|
|
progress_callback( |
|
|
(generation + 1) / self.n_generations, |
|
|
desc=f"Gen {generation+1}/{self.n_generations} | Best: {max_fitness:.4f} | Avg: {avg_fitness:.4f} | ETA: {eta/60:.0f}min" |
|
|
) |
|
|
except Exception as e: |
|
|
self.log(f"⚠️ Progress callback failed: {e}") |
|
|
|
|
|
|
|
|
if no_improve_count >= self.early_stopping_patience: |
|
|
self.log( |
|
|
f"\n🛑 EARLY STOPPING at generation {generation + 1}") |
|
|
self.log( |
|
|
f" No improvement for {self.early_stopping_patience} consecutive generations") |
|
|
self.log(f" Best fitness: {self.best_fitness:.4f}") |
|
|
break |
|
|
|
|
|
|
|
|
import sys |
|
|
sys.stdout.flush() |
|
|
|
|
|
|
|
|
self.log(f" Creating next generation...") |
|
|
|
|
|
selected = [] |
|
|
for _ in range(self.population_size - self.elite_size): |
|
|
tournament = random.sample( |
|
|
list(zip(population, fitness_scores)), 3) |
|
|
winner = max(tournament, key=lambda x: x[1])[0] |
|
|
selected.append(winner) |
|
|
|
|
|
elite_indices = np.argsort(fitness_scores)[-self.elite_size:] |
|
|
elite = [population[i] for i in elite_indices] |
|
|
|
|
|
|
|
|
offspring = [] |
|
|
for i in range(0, len(selected), 2): |
|
|
if i + 1 < len(selected): |
|
|
child1, child2 = self.crossover( |
|
|
selected[i], selected[i+1]) |
|
|
offspring.append(self.mutate(child1)) |
|
|
offspring.append(self.mutate(child2)) |
|
|
|
|
|
population = elite + \ |
|
|
offspring[:self.population_size - self.elite_size] |
|
|
|
|
|
self.log(f" ✓ Generation {generation + 1} complete") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
self.log("\n⚠️ Training interrupted by user") |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
self.log(f"\n❌ Error in generation {generation + 1}: {e}") |
|
|
import traceback |
|
|
self.log(traceback.format_exc()) |
|
|
|
|
|
if generation == 0: |
|
|
self.log("❌ First generation failed, aborting") |
|
|
return None |
|
|
else: |
|
|
self.log("⚠️ Attempting to continue...") |
|
|
continue |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
|
|
|
self.log("\n" + "="*70) |
|
|
self.log("✅ GA OPTIMIZATION COMPLETE") |
|
|
self.log("="*70) |
|
|
self.log(f"Final best fitness: {self.best_fitness:.4f}") |
|
|
self.log( |
|
|
f"Total generations: {len(self.history)}/{self.n_generations}") |
|
|
self.log(f"Total time: {total_time/60:.1f} minutes") |
|
|
if len(self.history) > 0: |
|
|
self.log( |
|
|
f"Average time per generation: {total_time/len(self.history):.1f}s") |
|
|
self.log("="*70) |
|
|
|
|
|
if self.best_chromosome is None: |
|
|
self.log( |
|
|
"⚠️ Warning: No improvement found, using best from final generation") |
|
|
fitness_scores = self.evaluate_population_parallel( |
|
|
population, X_train, y_train, X_val, y_val, n_jobs=n_jobs |
|
|
) |
|
|
max_idx = fitness_scores.index(max(fitness_scores)) |
|
|
self.best_chromosome = population[max_idx].copy() |
|
|
self.best_fitness = fitness_scores[max_idx] |
|
|
|
|
|
return self.best_chromosome |
|
|
|