Spaces:
Build error
Build error
""" | |
Leaderboard processing module for the leaderboard parser. | |
This module contains the main functions for processing leaderboards. | |
""" | |
import json | |
import os | |
import datetime | |
import logging | |
import time | |
import argparse | |
from typing import Dict, Any, List, Tuple, Optional | |
# Import functions from other modules | |
from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result | |
from src.file_utils import create_category_slug, split_combined_id, create_combined_id | |
from src.file_utils import load_and_validate_results, validate_leaderboard_result | |
from src.hub_utils import upload_to_hub, download_from_hub | |
from src.leaderboard_processor import process_single_leaderboard | |
from src.agents.parser_agent import get_default_model | |
from src.agents.browser import cleanup_browser | |
# Configure logger | |
logger = logging.getLogger("leaderboard-parser") | |
# Update state variables in server module | |
def update_server_status(status, error=None): | |
""" | |
Updates the server status. | |
Args: | |
status: The new status ('idle', 'running', 'completed', 'failed') | |
error: The error message in case of failure | |
""" | |
try: | |
from src.server import processing_status, processing_error | |
# Update global variables in server.py | |
globals()['processing_status'] = status | |
globals()['processing_error'] = error | |
# Update server module variables | |
import src.server | |
src.server.processing_status = status | |
src.server.processing_error = error | |
except ImportError: | |
# In non-server mode, these variables don't exist | |
pass | |
def process_leaderboards(args_dict=None) -> Tuple[bool, str]: | |
""" | |
Process leaderboards with the given arguments. | |
Returns a tuple of (success, message) | |
""" | |
# Update status | |
update_server_status("running") | |
# Set default arguments if none provided | |
if args_dict is None: | |
args_dict = {"local_only": False} | |
# Create an argparse.Namespace object from the dictionary | |
args = argparse.Namespace(**args_dict) | |
try: | |
# Ensure we're in the correct directory | |
script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
os.chdir(script_dir) | |
# Verify that the HF token is set | |
if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only: | |
raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!") | |
# Use default paths for category list and leaderboards | |
category_list_file = "data/best_model_for_category_list.json" | |
leaderboards_file = "data/final_leaderboards.json" | |
results_file = "data/best_model_for_results.json" | |
# Clean if requested | |
if getattr(args, "clean", False): | |
clean_output_files(results_file) | |
# Check if we're just uploading | |
if getattr(args, "upload_only", False): | |
upload_to_hub(to_parse_file=category_list_file, results_file=results_file) | |
update_server_status("completed") | |
return True, "Upload completed successfully" | |
# Download data from the Hub if not in local-only mode | |
if not getattr(args, "local_only", False): | |
download_from_hub() | |
# Just before the line that generates the error | |
logger.info(f"Starting leaderboard processing") | |
# Load the category list and leaderboards data | |
try: | |
with open(category_list_file, "r", encoding="utf-8") as f: | |
category_list = json.load(f) | |
with open(leaderboards_file, "r", encoding="utf-8") as f: | |
leaderboards = json.load(f) | |
# Create a mapping UID -> HOST for all leaderboards | |
uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb} | |
logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}") | |
except FileNotFoundError as e: | |
update_server_status("failed", str(e)) | |
return False, f"File not found: {e}" | |
# Load existing results if any | |
try: | |
logger.info(f"Loading and validating results from {results_file}") | |
results_data = load_and_validate_results(results_file) | |
all_results = results_data | |
logger.info(f"Loaded and validated {len(all_results)} existing results") | |
except Exception as e: | |
logger.warning(f"Error loading results: {str(e)}") | |
results_data = [] | |
all_results = [] | |
# Create a map of combined UIDs to their complete data (for checking parsing date) | |
processed_results_map = {} | |
for result in results_data: | |
if "uid" in result: | |
processed_results_map[result["uid"]] = result | |
# Get reprocessing interval from environment variable (in hours) | |
# Default value: 24 hours | |
reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24")) | |
# Maximum age without update (in seconds) | |
max_age_seconds = reprocess_interval_hours * 60 * 60 | |
logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours") | |
# Current date and time | |
now = datetime.datetime.now() | |
print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}") | |
# Get the default agent | |
model = get_default_model() | |
# Collect all leaderboards to process | |
leaderboards_to_process = [] | |
force_retry_leaderboards = [] | |
# Add logs for debugging | |
logger.info(f"Available categories: {len(category_list)}") | |
logger.info(f"Available leaderboards: {len(uid_to_host)}") | |
logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}") | |
# Check if a specific category is requested | |
target_category = getattr(args, "force_retry_category", None) | |
target_uid = getattr(args, "force_retry_uid", None) | |
# Exclusive mode (only process specified leaderboards) | |
exclusive_mode = target_category is not None or target_uid is not None | |
if target_category: | |
logger.info(f"Force retry category mode enabled (exclusive): {target_category}") | |
if target_uid: | |
logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}") | |
# Process leaderboards | |
for category in category_list: | |
category_name = category["category"] | |
normalized_category = create_category_slug(category_name) | |
# If in specific category mode and this is not the target category, skip to the next | |
if target_category and target_category != normalized_category: | |
logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}") | |
continue | |
# ADDITIONAL SAFETY: Reload data from file before each new category | |
# This ensures there is no contamination between categories | |
try: | |
logger.info(f"Reloading data from file before processing category: {category_name}") | |
all_results = load_and_validate_results(results_file) | |
logger.info(f"Data reloaded successfully: {len(all_results)} results available") | |
except Exception as e: | |
logger.warning(f"Unable to reload data before category {category_name}: {str(e)}") | |
# In case of error, keep existing data if possible | |
if not isinstance(all_results, list): | |
all_results = [] | |
# Check if category has leaderboards | |
if "leaderboards" not in category or not isinstance(category["leaderboards"], list): | |
logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.") | |
continue | |
# Process each leaderboard in the category | |
for leaderboard in category["leaderboards"]: | |
if "uid" not in leaderboard: | |
logger.warning(f"Leaderboard in category '{category_name}' has no UID.") | |
continue | |
leaderboard_uid = leaderboard["uid"] | |
# In specific UID mode, ignore all other leaderboards | |
if target_uid and target_uid != leaderboard_uid: | |
logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}") | |
continue | |
# Get additional rules if available | |
additional_rules = leaderboard.get("additionnal_agent_rules", None) | |
# Check if we should force processing this leaderboard | |
# Using the new distinct options | |
force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid | |
force_retry_category = getattr(args, "force_retry_category", None) == normalized_category | |
# Support for the old option for backward compatibility (to be removed later) | |
legacy_force_retry = False | |
if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None: | |
legacy_force_retry = ( | |
getattr(args, "force_retry", None) == leaderboard_uid or | |
getattr(args, "force_retry", None) == normalized_category | |
) | |
if legacy_force_retry: | |
logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.") | |
# Combine different sources of force_retry | |
force_retry = force_retry_uid or force_retry_category or legacy_force_retry | |
# Add explicit logs about the reason for force retry | |
if force_retry: | |
if force_retry_uid: | |
logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}") | |
elif force_retry_category: | |
logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}") | |
elif legacy_force_retry: | |
logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}") | |
# Search for the leaderboard URL in uid_to_host (direct dictionary lookup) | |
host = uid_to_host.get(leaderboard_uid) | |
if not host: | |
logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.") | |
# Show more information for debugging | |
logger.debug(f"Total number of UIDs available: {len(uid_to_host)}") | |
continue | |
# Create combined identifier (category_uid) | |
# The category is already normalized by create_category_slug | |
combined_uid = create_combined_id(normalized_category, leaderboard_uid) | |
# If force_retry is enabled, process the leaderboard without checking the time since last processing | |
if force_retry: | |
logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.") | |
leaderboards_to_process.append({ | |
"uid": leaderboard_uid, | |
"host": host, | |
"category": normalized_category, | |
"additional_rules": additional_rules, | |
"force_retry": force_retry | |
}) | |
continue # Skip directly to the next leaderboard | |
# Check if the leaderboard has already been processed recently | |
needs_reprocessing = True | |
if combined_uid in processed_results_map: | |
# Check if the leaderboard has been processed within the interval | |
result = processed_results_map[combined_uid] | |
# If the --retry-rejected option is active and the status is "rejected", force reprocessing | |
if getattr(args, "retry_rejected", False) and result.get("parsing_status") == "rejected": | |
logger.info(f"Leaderboard {combined_uid} previously rejected, forced reprocessing with --retry-rejected.") | |
elif "parsed_at" in result: | |
try: | |
# Convert parsing date to datetime object | |
parsed_at = datetime.datetime.fromisoformat(result["parsed_at"]) | |
# Calculate time elapsed since last parsing | |
time_diff = now - parsed_at | |
# Add logs for debugging date checks | |
logger.info(f"DEBUG: Current date: {now.isoformat()}") | |
logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}") | |
logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}") | |
logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}") | |
# Strictly check if the duration in seconds is greater than the threshold | |
time_seconds = time_diff.total_seconds() | |
# If time elapsed is greater than max_age_seconds, reparse | |
if time_seconds > max_age_seconds: | |
needs_reprocessing = True | |
print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.") | |
else: | |
print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})") | |
continue | |
except (ValueError, TypeError): | |
# If date is invalid, reprocess by precaution | |
logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.") | |
else: | |
# If parsing date is missing, reprocess by precaution | |
logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.") | |
else: | |
# If the leaderboard has never been processed, process it | |
logger.info(f"New leaderboard {combined_uid} to process.") | |
if needs_reprocessing or force_retry: | |
leaderboards_to_process.append({ | |
"uid": leaderboard_uid, | |
"host": host, | |
"category": normalized_category, | |
"additional_rules": additional_rules, | |
"force_retry": force_retry | |
}) | |
# Information on the number of leaderboards to process | |
logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}") | |
# Process each leaderboard | |
for index, leaderboard_info in enumerate(leaderboards_to_process): | |
leaderboard_uid = leaderboard_info["uid"] | |
host = leaderboard_info["host"] | |
category_name = leaderboard_info["category"] | |
additional_rules = leaderboard_info["additional_rules"] | |
force_retry = leaderboard_info["force_retry"] | |
# Process this leaderboard | |
logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})") | |
try: | |
# Force restart of browser every 2 leaderboards to avoid memory leaks | |
if index > 0 and index % 2 == 0: | |
logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks") | |
cleanup_browser() | |
# Force garbage collection | |
import gc | |
gc.collect() | |
# Small pause to let the system clean up | |
time.sleep(3) | |
# Process the leaderboard | |
all_results = process_single_leaderboard( | |
leaderboard_uid, | |
host, | |
model, | |
index, | |
all_results, | |
additional_rules, | |
category_name | |
) | |
# Add detailed logs for diagnosing problems | |
logger.info(f"Results after processing: {len(all_results)} elements") | |
# Search for results corresponding to the processed leaderboard | |
for idx, res in enumerate(all_results): | |
if res.get("original_uid") == leaderboard_uid: | |
logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}") | |
# Clean up after each processing | |
cleanup_browser() | |
# Verify if the leaderboard exists with the exact normalized category | |
# MODIFICATION: Strict search by original_uid AND category | |
normalized_category_name = create_category_slug(category_name) | |
current_result = None | |
for result in all_results: | |
# Always compare normalized categories to avoid format issues | |
result_category = result.get("category", "") | |
if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name: | |
current_result = result | |
logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}") | |
break | |
# SUPPRESSION: No longer search for alternatives with only original_uid | |
# If result is not found, it's probably an error or processing failed | |
if not current_result: | |
logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}") | |
logger.error(f"Search for all results corresponding to this UID:") | |
for res in all_results: | |
if res.get("original_uid") == leaderboard_uid: | |
logger.error(f" - Result with category={res.get('category')}, uid={res.get('uid')}") | |
logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found") | |
continue | |
# Update only this specific leaderboard in the results file | |
logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file") | |
updated_results = update_leaderboard_result(current_result, results_file) | |
# CORRECTION CRITIQUE: Update all_results list with file data | |
# to avoid desynchronization between file and in-memory list | |
all_results = updated_results | |
# Update global result for next leaderboard | |
results_data = updated_results | |
logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved") | |
# Upload to HF Hub after each leaderboard if not in local-only mode | |
if not getattr(args, "local_only", False): | |
logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}") | |
try: | |
upload_to_hub(to_parse_file=category_list_file, results_file=results_file) | |
logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}") | |
except Exception as upload_err: | |
logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}") | |
except Exception as e: | |
logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}") | |
continue | |
# Final save not necessary as all leaderboards have already been updated individually | |
logger.info("Leaderboard processing completed") | |
update_server_status("completed") | |
return True, "Processing completed successfully" | |
except Exception as e: | |
update_server_status("failed", str(e)) | |
logger.exception("Error processing leaderboards") | |
return False, f"Error processing leaderboards: {str(e)}" |