import pandas as pd import numpy as np from pathlib import Path import logging import sys from datetime import datetime import warnings import gc import json from loguru import logger from src.create_dataset import process_datasets from src.preprocessing import Preprocessor from src.clean_data import DataCleaner from src.feature_analyzer import FeatureAnalyzer from src.model_trainer import ModelTrainer from pathlib import Path def create_directories(): """Create all necessary directories for the pipeline""" directories = { 'combined_data': Path('output_files/combined_data'), 'preprocessed': Path('output_files/cleaned_preprocessed_data'), 'feature_analyzer': Path('output_files/feature_analysis'), 'model_outputs': Path('output_files/model_outputs'), } for dir_path in directories.values(): dir_path.mkdir(parents=True, exist_ok=True) return directories def handle_memory(): """Handle memory management""" gc.collect() warnings.filterwarnings('ignore') def save_pipeline_metrics(metrics: dict, filepath: Path): """Save pipeline metrics to JSON file""" with open(filepath, 'w') as f: json.dump(metrics, f, indent=4, default=str) def start_pipelines(train_size=0.25): # Setup logging logger.info("STARTING YELP DATA ANALYSIS PIPELINES...") dirs = create_directories() logger.info("Created necessary directories") logger.info("Pipeline 1: Creating initial dataset...") try: filename="combined_merged_full.csv" df = process_datasets(output_path=dirs['combined_data'],filename=filename) logger.info(f"Dataset created successfully with shape: {df.shape}") except Exception as e: logger.error(f"Error in dataset creation: {str(e)}") try: logger.info("Pipeline 2: Preprocessing and Feature Engineering....") output_before_preprocess=Path(str(dirs['combined_data']) )/ "combined_merged_full.csv" df = pd.read_csv(output_before_preprocess) prep=Preprocessor(df) feature_engineered_df=prep.run_pipeline() except Exception as e: logger.error(f"Error in Pipeline 2 Preprocessing and Feature Engineering as : {e}") try: logger.info("Pipeline 3: Cleaning data...") filename="preprocessed_cleaned.csv" cleaner = DataCleaner(df=feature_engineered_df,output_path=str(dirs['preprocessed']),filename=filename) cleaner.run_pipeline() clean_output_file_path = Path(str(dirs['preprocessed']) )/ filename print("Preprocessed and Cleand data saved in ",clean_output_file_path) except Exception as e: logger.error(f"Error in Pipeline 3 Cleaning Data : {str(e)}") try: logger.info("Pipeline 4: Analyzing features...") filename="preprocessed_cleaned.csv" preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/ filename preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file) analyzer = FeatureAnalyzer(df=preprocessed_clean_df,output_path=str(dirs['feature_analyzer'])) analyzer.run_pipeline() except Exception as e: logger.error(f"Error in Feature analysis: {str(e)}") raise try: logger.info("Pipeline 5 : Training and Evaluating Models...") filename="preprocessed_cleaned.csv" preprocessed_clean_output_file=Path(str(dirs['preprocessed']) )/ filename preprocessed_clean_df=pd.read_csv(preprocessed_clean_output_file) preprocessed_clean_df = preprocessed_clean_df.sample(frac=1, random_state=42).reset_index(drop=True) size=int(train_size*len(preprocessed_clean_df)) preprocessed_clean_df=preprocessed_clean_df.iloc[:size,:] trainer = ModelTrainer(df=preprocessed_clean_df,output_path=str(dirs['model_outputs']), epochs=50,test_size=0.3) trainer.train_and_evaluate() logger.info(f"Models training completed ") except Exception as e: logger.error(f"Error in Model Trainer: {str(e)}")