Spaces:
Build error
Build error
File size: 21,794 Bytes
0821095 b0bf659 0821095 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
"""
Leaderboard processing module for the leaderboard parser.
This module contains the main functions for processing leaderboards.
"""
import json
import os
import datetime
import logging
import time
import argparse
from typing import Dict, Any, List, Tuple, Optional
# Import functions from other modules
from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result
from src.file_utils import create_category_slug, split_combined_id, create_combined_id
from src.file_utils import load_and_validate_results, validate_leaderboard_result
from src.hub_utils import upload_to_hub, download_from_hub
from src.leaderboard_processor import process_single_leaderboard
from src.agents.parser_agent import get_default_model
from src.agents.browser import cleanup_browser
# Configure logger
logger = logging.getLogger("leaderboard-parser")
# Update state variables in server module
def update_server_status(status, error=None):
"""
Updates the server status.
Args:
status: The new status ('idle', 'running', 'completed', 'failed')
error: The error message in case of failure
"""
try:
from src.server import processing_status, processing_error
# Update global variables in server.py
globals()['processing_status'] = status
globals()['processing_error'] = error
# Update server module variables
import src.server
src.server.processing_status = status
src.server.processing_error = error
except ImportError:
# In non-server mode, these variables don't exist
pass
def process_leaderboards(args_dict=None) -> Tuple[bool, str]:
"""
Process leaderboards with the given arguments.
Returns a tuple of (success, message)
"""
# Update status
update_server_status("running")
# Set default arguments if none provided
if args_dict is None:
args_dict = {"local_only": False}
# Create an argparse.Namespace object from the dictionary
args = argparse.Namespace(**args_dict)
try:
# Ensure we're in the correct directory
script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.chdir(script_dir)
# Verify that the HF token is set
if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only:
raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!")
# Use default paths for category list and leaderboards
category_list_file = "data/best_model_for_category_list.json"
leaderboards_file = "data/final_leaderboards.json"
results_file = "data/best_model_for_results.json"
# Clean if requested
if getattr(args, "clean", False):
clean_output_files(results_file)
# Check if we're just uploading
if getattr(args, "upload_only", False):
upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
update_server_status("completed")
return True, "Upload completed successfully"
# Download data from the Hub if not in local-only mode
if not getattr(args, "local_only", False):
download_from_hub()
# Just before the line that generates the error
logger.info(f"Starting leaderboard processing")
# Load the category list and leaderboards data
try:
with open(category_list_file, "r", encoding="utf-8") as f:
category_list = json.load(f)
with open(leaderboards_file, "r", encoding="utf-8") as f:
leaderboards = json.load(f)
# Create a mapping UID -> HOST for all leaderboards
uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb}
logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}")
except FileNotFoundError as e:
update_server_status("failed", str(e))
return False, f"File not found: {e}"
# Load existing results if any
try:
logger.info(f"Loading and validating results from {results_file}")
results_data = load_and_validate_results(results_file)
all_results = results_data
logger.info(f"Loaded and validated {len(all_results)} existing results")
except Exception as e:
logger.warning(f"Error loading results: {str(e)}")
results_data = []
all_results = []
# Create a map of combined UIDs to their complete data (for checking parsing date)
processed_results_map = {}
for result in results_data:
if "uid" in result:
processed_results_map[result["uid"]] = result
# Get reprocessing interval from environment variable (in hours)
# Default value: 24 hours
reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24"))
# Maximum age without update (in seconds)
max_age_seconds = reprocess_interval_hours * 60 * 60
logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours")
# Current date and time
now = datetime.datetime.now()
print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}")
# Get the default agent
model = get_default_model()
# Collect all leaderboards to process
leaderboards_to_process = []
force_retry_leaderboards = []
# Add logs for debugging
logger.info(f"Available categories: {len(category_list)}")
logger.info(f"Available leaderboards: {len(uid_to_host)}")
logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}")
# Check if a specific category is requested
target_category = getattr(args, "force_retry_category", None)
target_uid = getattr(args, "force_retry_uid", None)
# Exclusive mode (only process specified leaderboards)
exclusive_mode = target_category is not None or target_uid is not None
if target_category:
logger.info(f"Force retry category mode enabled (exclusive): {target_category}")
if target_uid:
logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}")
# Process leaderboards
for category in category_list:
category_name = category["category"]
normalized_category = create_category_slug(category_name)
# If in specific category mode and this is not the target category, skip to the next
if target_category and target_category != normalized_category:
logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}")
continue
# ADDITIONAL SAFETY: Reload data from file before each new category
# This ensures there is no contamination between categories
try:
logger.info(f"Reloading data from file before processing category: {category_name}")
all_results = load_and_validate_results(results_file)
logger.info(f"Data reloaded successfully: {len(all_results)} results available")
except Exception as e:
logger.warning(f"Unable to reload data before category {category_name}: {str(e)}")
# In case of error, keep existing data if possible
if not isinstance(all_results, list):
all_results = []
# Check if category has leaderboards
if "leaderboards" not in category or not isinstance(category["leaderboards"], list):
logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.")
continue
# Process each leaderboard in the category
for leaderboard in category["leaderboards"]:
if "uid" not in leaderboard:
logger.warning(f"Leaderboard in category '{category_name}' has no UID.")
continue
leaderboard_uid = leaderboard["uid"]
# In specific UID mode, ignore all other leaderboards
if target_uid and target_uid != leaderboard_uid:
logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}")
continue
# Get additional rules if available
additional_rules = leaderboard.get("additionnal_agent_rules", None)
# Check if we should force processing this leaderboard
# Using the new distinct options
force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid
force_retry_category = getattr(args, "force_retry_category", None) == normalized_category
# Support for the old option for backward compatibility (to be removed later)
legacy_force_retry = False
if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None:
legacy_force_retry = (
getattr(args, "force_retry", None) == leaderboard_uid or
getattr(args, "force_retry", None) == normalized_category
)
if legacy_force_retry:
logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.")
# Combine different sources of force_retry
force_retry = force_retry_uid or force_retry_category or legacy_force_retry
# Add explicit logs about the reason for force retry
if force_retry:
if force_retry_uid:
logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}")
elif force_retry_category:
logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}")
elif legacy_force_retry:
logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}")
# Search for the leaderboard URL in uid_to_host (direct dictionary lookup)
host = uid_to_host.get(leaderboard_uid)
if not host:
logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.")
# Show more information for debugging
logger.debug(f"Total number of UIDs available: {len(uid_to_host)}")
continue
# Create combined identifier (category_uid)
# The category is already normalized by create_category_slug
combined_uid = create_combined_id(normalized_category, leaderboard_uid)
# If force_retry is enabled, process the leaderboard without checking the time since last processing
if force_retry:
logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.")
leaderboards_to_process.append({
"uid": leaderboard_uid,
"host": host,
"category": normalized_category,
"additional_rules": additional_rules,
"force_retry": force_retry
})
continue # Skip directly to the next leaderboard
# Check if the leaderboard has already been processed recently
needs_reprocessing = True
if combined_uid in processed_results_map:
# Check if the leaderboard has been processed within the interval
result = processed_results_map[combined_uid]
# If the --retry-rejected option is active and the status is "rejected", force reprocessing
if getattr(args, "retry_rejected", False) and result.get("parsing_status") == "rejected":
logger.info(f"Leaderboard {combined_uid} previously rejected, forced reprocessing with --retry-rejected.")
elif "parsed_at" in result:
try:
# Convert parsing date to datetime object
parsed_at = datetime.datetime.fromisoformat(result["parsed_at"])
# Calculate time elapsed since last parsing
time_diff = now - parsed_at
# Add logs for debugging date checks
logger.info(f"DEBUG: Current date: {now.isoformat()}")
logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}")
logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}")
logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}")
# Strictly check if the duration in seconds is greater than the threshold
time_seconds = time_diff.total_seconds()
# If time elapsed is greater than max_age_seconds, reparse
if time_seconds > max_age_seconds:
needs_reprocessing = True
print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.")
else:
print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})")
continue
except (ValueError, TypeError):
# If date is invalid, reprocess by precaution
logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.")
else:
# If parsing date is missing, reprocess by precaution
logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.")
else:
# If the leaderboard has never been processed, process it
logger.info(f"New leaderboard {combined_uid} to process.")
if needs_reprocessing or force_retry:
leaderboards_to_process.append({
"uid": leaderboard_uid,
"host": host,
"category": normalized_category,
"additional_rules": additional_rules,
"force_retry": force_retry
})
# Information on the number of leaderboards to process
logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}")
# Process each leaderboard
for index, leaderboard_info in enumerate(leaderboards_to_process):
leaderboard_uid = leaderboard_info["uid"]
host = leaderboard_info["host"]
category_name = leaderboard_info["category"]
additional_rules = leaderboard_info["additional_rules"]
force_retry = leaderboard_info["force_retry"]
# Process this leaderboard
logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})")
try:
# Force restart of browser every 2 leaderboards to avoid memory leaks
if index > 0 and index % 2 == 0:
logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks")
cleanup_browser()
# Force garbage collection
import gc
gc.collect()
# Small pause to let the system clean up
time.sleep(3)
# Process the leaderboard
all_results = process_single_leaderboard(
leaderboard_uid,
host,
model,
index,
all_results,
additional_rules,
category_name
)
# Add detailed logs for diagnosing problems
logger.info(f"Results after processing: {len(all_results)} elements")
# Search for results corresponding to the processed leaderboard
for idx, res in enumerate(all_results):
if res.get("original_uid") == leaderboard_uid:
logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}")
# Clean up after each processing
cleanup_browser()
# Verify if the leaderboard exists with the exact normalized category
# MODIFICATION: Strict search by original_uid AND category
normalized_category_name = create_category_slug(category_name)
current_result = None
for result in all_results:
# Always compare normalized categories to avoid format issues
result_category = result.get("category", "")
if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name:
current_result = result
logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}")
break
# SUPPRESSION: No longer search for alternatives with only original_uid
# If result is not found, it's probably an error or processing failed
if not current_result:
logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}")
logger.error(f"Search for all results corresponding to this UID:")
for res in all_results:
if res.get("original_uid") == leaderboard_uid:
logger.error(f" - Result with category={res.get('category')}, uid={res.get('uid')}")
logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found")
continue
# Update only this specific leaderboard in the results file
logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file")
updated_results = update_leaderboard_result(current_result, results_file)
# CORRECTION CRITIQUE: Update all_results list with file data
# to avoid desynchronization between file and in-memory list
all_results = updated_results
# Update global result for next leaderboard
results_data = updated_results
logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved")
# Upload to HF Hub after each leaderboard if not in local-only mode
if not getattr(args, "local_only", False):
logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}")
try:
upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}")
except Exception as upload_err:
logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}")
except Exception as e:
logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}")
continue
# Final save not necessary as all leaderboards have already been updated individually
logger.info("Leaderboard processing completed")
update_server_status("completed")
return True, "Processing completed successfully"
except Exception as e:
update_server_status("failed", str(e))
logger.exception("Error processing leaderboards")
return False, f"Error processing leaderboards: {str(e)}" |