""" Leaderboard processing module for the leaderboard parser. This module contains the main functions for processing leaderboards. """ import json import os import datetime import logging import time import argparse from typing import Dict, Any, List, Tuple, Optional # Import functions from other modules from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result from src.file_utils import create_category_slug, split_combined_id, create_combined_id from src.file_utils import load_and_validate_results, validate_leaderboard_result from src.hub_utils import upload_to_hub, download_from_hub from src.leaderboard_processor import process_single_leaderboard from src.agents.parser_agent import get_default_model from src.agents.browser import cleanup_browser # Configure logger logger = logging.getLogger("leaderboard-parser") # Update state variables in server module def update_server_status(status, error=None): """ Updates the server status. Args: status: The new status ('idle', 'running', 'completed', 'failed') error: The error message in case of failure """ try: from src.server import processing_status, processing_error, last_run_time # Update global variables in server.py globals()['processing_status'] = status globals()['processing_error'] = error # Update server module variables import src.server src.server.processing_status = status src.server.processing_error = error # Update last run time when processing completes if status == "completed": now = datetime.datetime.now() src.server.last_run_time = now logger.info(f"Updated last run time to {now.isoformat()}") except ImportError: # In non-server mode, these variables don't exist pass def process_leaderboards(args_dict=None) -> Tuple[bool, str]: """ Process leaderboards with the given arguments. Returns a tuple of (success, message) """ # Update status update_server_status("running") # Set default arguments if none provided if args_dict is None: args_dict = {"local_only": False} # Create an argparse.Namespace object from the dictionary args = argparse.Namespace(**args_dict) try: # Ensure we're in the correct directory script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) os.chdir(script_dir) # Verify that the HF token is set if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only: raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!") # Use default paths for category list and leaderboards category_list_file = "data/best_model_for_category_list.json" leaderboards_file = "data/final_leaderboards.json" results_file = "data/best_model_for_results.json" # Clean if requested if getattr(args, "clean", False): clean_output_files(results_file) # Check if we're just uploading if getattr(args, "upload_only", False): upload_to_hub(to_parse_file=category_list_file, results_file=results_file) update_server_status("completed") return True, "Upload completed successfully" # Download data from the Hub if not in local-only mode if not getattr(args, "local_only", False): download_from_hub() # Just before the line that generates the error logger.info(f"Starting leaderboard processing") # Load the category list and leaderboards data try: with open(category_list_file, "r", encoding="utf-8") as f: category_list = json.load(f) with open(leaderboards_file, "r", encoding="utf-8") as f: leaderboards = json.load(f) # Create a mapping UID -> HOST for all leaderboards uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb} logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}") except FileNotFoundError as e: update_server_status("failed", str(e)) return False, f"File not found: {e}" # Load existing results if any try: logger.info(f"Loading and validating results from {results_file}") results_data = load_and_validate_results(results_file) all_results = results_data logger.info(f"Loaded and validated {len(all_results)} existing results") except Exception as e: logger.warning(f"Error loading results: {str(e)}") results_data = [] all_results = [] # Create a map of combined UIDs to their complete data (for checking parsing date) processed_results_map = {} for result in results_data: if "uid" in result: processed_results_map[result["uid"]] = result # Get reprocessing interval from environment variable (in hours) # Default value: 24 hours reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24")) # Maximum age without update (in seconds) max_age_seconds = reprocess_interval_hours * 60 * 60 logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours") # Current date and time now = datetime.datetime.now() print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}") # Get the default agent model = get_default_model() # Collect all leaderboards to process leaderboards_to_process = [] force_retry_leaderboards = [] # Add logs for debugging logger.info(f"Available categories: {len(category_list)}") logger.info(f"Available leaderboards: {len(uid_to_host)}") logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}") # Check if a specific category is requested target_category = getattr(args, "force_retry_category", None) target_uid = getattr(args, "force_retry_uid", None) # Exclusive mode (only process specified leaderboards) exclusive_mode = target_category is not None or target_uid is not None if target_category: logger.info(f"Force retry category mode enabled (exclusive): {target_category}") if target_uid: logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}") # Process leaderboards for category in category_list: category_name = category["category"] normalized_category = create_category_slug(category_name) # If in specific category mode and this is not the target category, skip to the next if target_category and target_category != normalized_category: logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}") continue # ADDITIONAL SAFETY: Reload data from file before each new category # This ensures there is no contamination between categories try: logger.info(f"Reloading data from file before processing category: {category_name}") all_results = load_and_validate_results(results_file) logger.info(f"Data reloaded successfully: {len(all_results)} results available") except Exception as e: logger.warning(f"Unable to reload data before category {category_name}: {str(e)}") # In case of error, keep existing data if possible if not isinstance(all_results, list): all_results = [] # Check if category has leaderboards if "leaderboards" not in category or not isinstance(category["leaderboards"], list): logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.") continue # Process each leaderboard in the category for leaderboard in category["leaderboards"]: if "uid" not in leaderboard: logger.warning(f"Leaderboard in category '{category_name}' has no UID.") continue leaderboard_uid = leaderboard["uid"] # In specific UID mode, ignore all other leaderboards if target_uid and target_uid != leaderboard_uid: logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}") continue # Get additional rules if available additional_rules = leaderboard.get("additionnal_agent_rules", None) # Check if we should force processing this leaderboard # Using the new distinct options force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid force_retry_category = getattr(args, "force_retry_category", None) == normalized_category # Support for the old option for backward compatibility (to be removed later) legacy_force_retry = False if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None: legacy_force_retry = ( getattr(args, "force_retry", None) == leaderboard_uid or getattr(args, "force_retry", None) == normalized_category ) if legacy_force_retry: logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.") # Combine different sources of force_retry force_retry = force_retry_uid or force_retry_category or legacy_force_retry # Add explicit logs about the reason for force retry if force_retry: if force_retry_uid: logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}") elif force_retry_category: logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}") elif legacy_force_retry: logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}") # Search for the leaderboard URL in uid_to_host (direct dictionary lookup) host = uid_to_host.get(leaderboard_uid) if not host: logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.") # Show more information for debugging logger.debug(f"Total number of UIDs available: {len(uid_to_host)}") continue # Create combined identifier (category_uid) # The category is already normalized by create_category_slug combined_uid = create_combined_id(normalized_category, leaderboard_uid) # If force_retry is enabled, process the leaderboard without checking the time since last processing if force_retry: logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.") leaderboards_to_process.append({ "uid": leaderboard_uid, "host": host, "category": normalized_category, "additional_rules": additional_rules, "force_retry": force_retry }) continue # Skip directly to the next leaderboard # Check if the leaderboard has already been processed recently needs_reprocessing = True if combined_uid in processed_results_map: # Check if the leaderboard has been processed within the interval result = processed_results_map[combined_uid] # If the --ignore-cooldown option is active, force reprocessing regardless of status if getattr(args, "ignore_cooldown", False): logger.info(f"Leaderboard {combined_uid} forced reprocessing with --ignore-cooldown, ignoring cooldown period.") elif "parsed_at" in result: try: # Convert parsing date to datetime object parsed_at = datetime.datetime.fromisoformat(result["parsed_at"]) # Calculate time elapsed since last parsing time_diff = now - parsed_at # Add logs for debugging date checks logger.info(f"DEBUG: Current date: {now.isoformat()}") logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}") logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}") logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}") # Strictly check if the duration in seconds is greater than the threshold time_seconds = time_diff.total_seconds() # If time elapsed is greater than max_age_seconds, reparse if time_seconds > max_age_seconds: needs_reprocessing = True print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.") else: print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})") continue except (ValueError, TypeError): # If date is invalid, reprocess by precaution logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.") else: # If parsing date is missing, reprocess by precaution logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.") else: # If the leaderboard has never been processed, process it logger.info(f"New leaderboard {combined_uid} to process.") if needs_reprocessing or force_retry: leaderboards_to_process.append({ "uid": leaderboard_uid, "host": host, "category": normalized_category, "additional_rules": additional_rules, "force_retry": force_retry }) # Information on the number of leaderboards to process logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}") # Process each leaderboard for index, leaderboard_info in enumerate(leaderboards_to_process): leaderboard_uid = leaderboard_info["uid"] host = leaderboard_info["host"] category_name = leaderboard_info["category"] additional_rules = leaderboard_info["additional_rules"] force_retry = leaderboard_info["force_retry"] # Process this leaderboard logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})") try: # Force restart of browser every 2 leaderboards to avoid memory leaks if index > 0 and index % 2 == 0: logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks") cleanup_browser() # Force garbage collection import gc gc.collect() # Small pause to let the system clean up time.sleep(3) # Process the leaderboard all_results = process_single_leaderboard( leaderboard_uid, host, model, index, all_results, additional_rules, category_name ) # Add detailed logs for diagnosing problems logger.info(f"Results after processing: {len(all_results)} elements") # Search for results corresponding to the processed leaderboard for idx, res in enumerate(all_results): if res.get("original_uid") == leaderboard_uid: logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}") # Clean up after each processing cleanup_browser() # Verify if the leaderboard exists with the exact normalized category # MODIFICATION: Strict search by original_uid AND category normalized_category_name = create_category_slug(category_name) current_result = None for result in all_results: # Always compare normalized categories to avoid format issues result_category = result.get("category", "") if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name: current_result = result logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}") break # SUPPRESSION: No longer search for alternatives with only original_uid # If result is not found, it's probably an error or processing failed if not current_result: logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}") logger.error(f"Search for all results corresponding to this UID:") for res in all_results: if res.get("original_uid") == leaderboard_uid: logger.error(f" - Result with category={res.get('category')}, uid={res.get('uid')}") logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found") continue # Update only this specific leaderboard in the results file logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file") updated_results = update_leaderboard_result(current_result, results_file) # CORRECTION CRITIQUE: Update all_results list with file data # to avoid desynchronization between file and in-memory list all_results = updated_results # Update global result for next leaderboard results_data = updated_results logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved") # Upload to HF Hub after each leaderboard if not in local-only mode if not getattr(args, "local_only", False): logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}") try: upload_to_hub(to_parse_file=category_list_file, results_file=results_file) logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}") except Exception as upload_err: logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}") except Exception as e: logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}") continue # Final save not necessary as all leaderboards have already been updated individually logger.info("Leaderboard processing completed") update_server_status("completed") return True, "Processing completed successfully" except Exception as e: update_server_status("failed", str(e)) logger.exception("Error processing leaderboards") return False, f"Error processing leaderboards: {str(e)}"