tfrere's picture
add model factory
b0bf659
raw
history blame
21.8 kB
"""
Leaderboard processing module for the leaderboard parser.
This module contains the main functions for processing leaderboards.
"""
import json
import os
import datetime
import logging
import time
import argparse
from typing import Dict, Any, List, Tuple, Optional
# Import functions from other modules
from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result
from src.file_utils import create_category_slug, split_combined_id, create_combined_id
from src.file_utils import load_and_validate_results, validate_leaderboard_result
from src.hub_utils import upload_to_hub, download_from_hub
from src.leaderboard_processor import process_single_leaderboard
from src.agents.parser_agent import get_default_model
from src.agents.browser import cleanup_browser
# Configure logger
logger = logging.getLogger("leaderboard-parser")
# Update state variables in server module
def update_server_status(status, error=None):
"""
Updates the server status.
Args:
status: The new status ('idle', 'running', 'completed', 'failed')
error: The error message in case of failure
"""
try:
from src.server import processing_status, processing_error
# Update global variables in server.py
globals()['processing_status'] = status
globals()['processing_error'] = error
# Update server module variables
import src.server
src.server.processing_status = status
src.server.processing_error = error
except ImportError:
# In non-server mode, these variables don't exist
pass
def process_leaderboards(args_dict=None) -> Tuple[bool, str]:
"""
Process leaderboards with the given arguments.
Returns a tuple of (success, message)
"""
# Update status
update_server_status("running")
# Set default arguments if none provided
if args_dict is None:
args_dict = {"local_only": False}
# Create an argparse.Namespace object from the dictionary
args = argparse.Namespace(**args_dict)
try:
# Ensure we're in the correct directory
script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.chdir(script_dir)
# Verify that the HF token is set
if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only:
raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!")
# Use default paths for category list and leaderboards
category_list_file = "data/best_model_for_category_list.json"
leaderboards_file = "data/final_leaderboards.json"
results_file = "data/best_model_for_results.json"
# Clean if requested
if getattr(args, "clean", False):
clean_output_files(results_file)
# Check if we're just uploading
if getattr(args, "upload_only", False):
upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
update_server_status("completed")
return True, "Upload completed successfully"
# Download data from the Hub if not in local-only mode
if not getattr(args, "local_only", False):
download_from_hub()
# Just before the line that generates the error
logger.info(f"Starting leaderboard processing")
# Load the category list and leaderboards data
try:
with open(category_list_file, "r", encoding="utf-8") as f:
category_list = json.load(f)
with open(leaderboards_file, "r", encoding="utf-8") as f:
leaderboards = json.load(f)
# Create a mapping UID -> HOST for all leaderboards
uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb}
logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}")
except FileNotFoundError as e:
update_server_status("failed", str(e))
return False, f"File not found: {e}"
# Load existing results if any
try:
logger.info(f"Loading and validating results from {results_file}")
results_data = load_and_validate_results(results_file)
all_results = results_data
logger.info(f"Loaded and validated {len(all_results)} existing results")
except Exception as e:
logger.warning(f"Error loading results: {str(e)}")
results_data = []
all_results = []
# Create a map of combined UIDs to their complete data (for checking parsing date)
processed_results_map = {}
for result in results_data:
if "uid" in result:
processed_results_map[result["uid"]] = result
# Get reprocessing interval from environment variable (in hours)
# Default value: 24 hours
reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24"))
# Maximum age without update (in seconds)
max_age_seconds = reprocess_interval_hours * 60 * 60
logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours")
# Current date and time
now = datetime.datetime.now()
print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}")
# Get the default agent
model = get_default_model()
# Collect all leaderboards to process
leaderboards_to_process = []
force_retry_leaderboards = []
# Add logs for debugging
logger.info(f"Available categories: {len(category_list)}")
logger.info(f"Available leaderboards: {len(uid_to_host)}")
logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}")
# Check if a specific category is requested
target_category = getattr(args, "force_retry_category", None)
target_uid = getattr(args, "force_retry_uid", None)
# Exclusive mode (only process specified leaderboards)
exclusive_mode = target_category is not None or target_uid is not None
if target_category:
logger.info(f"Force retry category mode enabled (exclusive): {target_category}")
if target_uid:
logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}")
# Process leaderboards
for category in category_list:
category_name = category["category"]
normalized_category = create_category_slug(category_name)
# If in specific category mode and this is not the target category, skip to the next
if target_category and target_category != normalized_category:
logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}")
continue
# ADDITIONAL SAFETY: Reload data from file before each new category
# This ensures there is no contamination between categories
try:
logger.info(f"Reloading data from file before processing category: {category_name}")
all_results = load_and_validate_results(results_file)
logger.info(f"Data reloaded successfully: {len(all_results)} results available")
except Exception as e:
logger.warning(f"Unable to reload data before category {category_name}: {str(e)}")
# In case of error, keep existing data if possible
if not isinstance(all_results, list):
all_results = []
# Check if category has leaderboards
if "leaderboards" not in category or not isinstance(category["leaderboards"], list):
logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.")
continue
# Process each leaderboard in the category
for leaderboard in category["leaderboards"]:
if "uid" not in leaderboard:
logger.warning(f"Leaderboard in category '{category_name}' has no UID.")
continue
leaderboard_uid = leaderboard["uid"]
# In specific UID mode, ignore all other leaderboards
if target_uid and target_uid != leaderboard_uid:
logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}")
continue
# Get additional rules if available
additional_rules = leaderboard.get("additionnal_agent_rules", None)
# Check if we should force processing this leaderboard
# Using the new distinct options
force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid
force_retry_category = getattr(args, "force_retry_category", None) == normalized_category
# Support for the old option for backward compatibility (to be removed later)
legacy_force_retry = False
if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None:
legacy_force_retry = (
getattr(args, "force_retry", None) == leaderboard_uid or
getattr(args, "force_retry", None) == normalized_category
)
if legacy_force_retry:
logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.")
# Combine different sources of force_retry
force_retry = force_retry_uid or force_retry_category or legacy_force_retry
# Add explicit logs about the reason for force retry
if force_retry:
if force_retry_uid:
logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}")
elif force_retry_category:
logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}")
elif legacy_force_retry:
logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}")
# Search for the leaderboard URL in uid_to_host (direct dictionary lookup)
host = uid_to_host.get(leaderboard_uid)
if not host:
logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.")
# Show more information for debugging
logger.debug(f"Total number of UIDs available: {len(uid_to_host)}")
continue
# Create combined identifier (category_uid)
# The category is already normalized by create_category_slug
combined_uid = create_combined_id(normalized_category, leaderboard_uid)
# If force_retry is enabled, process the leaderboard without checking the time since last processing
if force_retry:
logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.")
leaderboards_to_process.append({
"uid": leaderboard_uid,
"host": host,
"category": normalized_category,
"additional_rules": additional_rules,
"force_retry": force_retry
})
continue # Skip directly to the next leaderboard
# Check if the leaderboard has already been processed recently
needs_reprocessing = True
if combined_uid in processed_results_map:
# Check if the leaderboard has been processed within the interval
result = processed_results_map[combined_uid]
# If the --retry-rejected option is active and the status is "rejected", force reprocessing
if getattr(args, "retry_rejected", False) and result.get("parsing_status") == "rejected":
logger.info(f"Leaderboard {combined_uid} previously rejected, forced reprocessing with --retry-rejected.")
elif "parsed_at" in result:
try:
# Convert parsing date to datetime object
parsed_at = datetime.datetime.fromisoformat(result["parsed_at"])
# Calculate time elapsed since last parsing
time_diff = now - parsed_at
# Add logs for debugging date checks
logger.info(f"DEBUG: Current date: {now.isoformat()}")
logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}")
logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}")
logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}")
# Strictly check if the duration in seconds is greater than the threshold
time_seconds = time_diff.total_seconds()
# If time elapsed is greater than max_age_seconds, reparse
if time_seconds > max_age_seconds:
needs_reprocessing = True
print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.")
else:
print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})")
continue
except (ValueError, TypeError):
# If date is invalid, reprocess by precaution
logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.")
else:
# If parsing date is missing, reprocess by precaution
logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.")
else:
# If the leaderboard has never been processed, process it
logger.info(f"New leaderboard {combined_uid} to process.")
if needs_reprocessing or force_retry:
leaderboards_to_process.append({
"uid": leaderboard_uid,
"host": host,
"category": normalized_category,
"additional_rules": additional_rules,
"force_retry": force_retry
})
# Information on the number of leaderboards to process
logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}")
# Process each leaderboard
for index, leaderboard_info in enumerate(leaderboards_to_process):
leaderboard_uid = leaderboard_info["uid"]
host = leaderboard_info["host"]
category_name = leaderboard_info["category"]
additional_rules = leaderboard_info["additional_rules"]
force_retry = leaderboard_info["force_retry"]
# Process this leaderboard
logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})")
try:
# Force restart of browser every 2 leaderboards to avoid memory leaks
if index > 0 and index % 2 == 0:
logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks")
cleanup_browser()
# Force garbage collection
import gc
gc.collect()
# Small pause to let the system clean up
time.sleep(3)
# Process the leaderboard
all_results = process_single_leaderboard(
leaderboard_uid,
host,
model,
index,
all_results,
additional_rules,
category_name
)
# Add detailed logs for diagnosing problems
logger.info(f"Results after processing: {len(all_results)} elements")
# Search for results corresponding to the processed leaderboard
for idx, res in enumerate(all_results):
if res.get("original_uid") == leaderboard_uid:
logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}")
# Clean up after each processing
cleanup_browser()
# Verify if the leaderboard exists with the exact normalized category
# MODIFICATION: Strict search by original_uid AND category
normalized_category_name = create_category_slug(category_name)
current_result = None
for result in all_results:
# Always compare normalized categories to avoid format issues
result_category = result.get("category", "")
if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name:
current_result = result
logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}")
break
# SUPPRESSION: No longer search for alternatives with only original_uid
# If result is not found, it's probably an error or processing failed
if not current_result:
logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}")
logger.error(f"Search for all results corresponding to this UID:")
for res in all_results:
if res.get("original_uid") == leaderboard_uid:
logger.error(f" - Result with category={res.get('category')}, uid={res.get('uid')}")
logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found")
continue
# Update only this specific leaderboard in the results file
logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file")
updated_results = update_leaderboard_result(current_result, results_file)
# CORRECTION CRITIQUE: Update all_results list with file data
# to avoid desynchronization between file and in-memory list
all_results = updated_results
# Update global result for next leaderboard
results_data = updated_results
logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved")
# Upload to HF Hub after each leaderboard if not in local-only mode
if not getattr(args, "local_only", False):
logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}")
try:
upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}")
except Exception as upload_err:
logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}")
except Exception as e:
logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}")
continue
# Final save not necessary as all leaderboards have already been updated individually
logger.info("Leaderboard processing completed")
update_server_status("completed")
return True, "Processing completed successfully"
except Exception as e:
update_server_status("failed", str(e))
logger.exception("Error processing leaderboards")
return False, f"Error processing leaderboards: {str(e)}"