File size: 21,794 Bytes
0821095
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b0bf659
0821095
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
"""
Leaderboard processing module for the leaderboard parser.
This module contains the main functions for processing leaderboards.
"""
import json
import os
import datetime
import logging
import time
import argparse
from typing import Dict, Any, List, Tuple, Optional

# Import functions from other modules
from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result
from src.file_utils import create_category_slug, split_combined_id, create_combined_id
from src.file_utils import load_and_validate_results, validate_leaderboard_result
from src.hub_utils import upload_to_hub, download_from_hub
from src.leaderboard_processor import process_single_leaderboard
from src.agents.parser_agent import get_default_model
from src.agents.browser import cleanup_browser

# Configure logger
logger = logging.getLogger("leaderboard-parser")

# Update state variables in server module
def update_server_status(status, error=None):
    """
    Updates the server status.
    
    Args:
        status: The new status ('idle', 'running', 'completed', 'failed')
        error: The error message in case of failure
    """
    try:
        from src.server import processing_status, processing_error
        
        # Update global variables in server.py
        globals()['processing_status'] = status
        globals()['processing_error'] = error
        
        # Update server module variables
        import src.server
        src.server.processing_status = status
        src.server.processing_error = error
    except ImportError:
        # In non-server mode, these variables don't exist
        pass

def process_leaderboards(args_dict=None) -> Tuple[bool, str]:
    """
    Process leaderboards with the given arguments.
    Returns a tuple of (success, message)
    """
    # Update status
    update_server_status("running")
    
    # Set default arguments if none provided
    if args_dict is None:
        args_dict = {"local_only": False}
    
    # Create an argparse.Namespace object from the dictionary
    args = argparse.Namespace(**args_dict)
    
    try:
        # Ensure we're in the correct directory
        script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        os.chdir(script_dir)
        
        # Verify that the HF token is set
        if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only:
            raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!")
        
        # Use default paths for category list and leaderboards
        category_list_file = "data/best_model_for_category_list.json"
        leaderboards_file = "data/final_leaderboards.json"
        results_file = "data/best_model_for_results.json"
        
        # Clean if requested
        if getattr(args, "clean", False):
            clean_output_files(results_file)
        
        # Check if we're just uploading
        if getattr(args, "upload_only", False):
            upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
            update_server_status("completed")
            return True, "Upload completed successfully"
        
        # Download data from the Hub if not in local-only mode
        if not getattr(args, "local_only", False):
            download_from_hub()
        
        # Just before the line that generates the error
        logger.info(f"Starting leaderboard processing")
        
        # Load the category list and leaderboards data
        try:
            with open(category_list_file, "r", encoding="utf-8") as f:
                category_list = json.load(f)
            
            with open(leaderboards_file, "r", encoding="utf-8") as f:
                leaderboards = json.load(f)
                
            # Create a mapping UID -> HOST for all leaderboards
            uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb}
            logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}")
        except FileNotFoundError as e:
            update_server_status("failed", str(e))
            return False, f"File not found: {e}"
        
        # Load existing results if any
        try:
            logger.info(f"Loading and validating results from {results_file}")
            results_data = load_and_validate_results(results_file)
            all_results = results_data
            logger.info(f"Loaded and validated {len(all_results)} existing results")
        except Exception as e:
            logger.warning(f"Error loading results: {str(e)}")
            results_data = []
            all_results = []
        
        # Create a map of combined UIDs to their complete data (for checking parsing date)
        processed_results_map = {}
        for result in results_data:
            if "uid" in result:
                processed_results_map[result["uid"]] = result
        
        # Get reprocessing interval from environment variable (in hours)
        # Default value: 24 hours
        reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24"))
        
        # Maximum age without update (in seconds)
        max_age_seconds = reprocess_interval_hours * 60 * 60
        logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours")
        
        # Current date and time
        now = datetime.datetime.now()
        print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}")
        
        # Get the default agent
        model = get_default_model()
        
        # Collect all leaderboards to process
        leaderboards_to_process = []
        force_retry_leaderboards = []
        
        # Add logs for debugging
        logger.info(f"Available categories: {len(category_list)}")
        logger.info(f"Available leaderboards: {len(uid_to_host)}")
        logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}")
        
        # Check if a specific category is requested
        target_category = getattr(args, "force_retry_category", None)
        target_uid = getattr(args, "force_retry_uid", None)
        
        # Exclusive mode (only process specified leaderboards)
        exclusive_mode = target_category is not None or target_uid is not None
        
        if target_category:
            logger.info(f"Force retry category mode enabled (exclusive): {target_category}")
            
        if target_uid:
            logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}")
        
        # Process leaderboards
        for category in category_list:
            category_name = category["category"]
            normalized_category = create_category_slug(category_name)
            
            # If in specific category mode and this is not the target category, skip to the next
            if target_category and target_category != normalized_category:
                logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}")
                continue
                
            # ADDITIONAL SAFETY: Reload data from file before each new category
            # This ensures there is no contamination between categories
            try:
                logger.info(f"Reloading data from file before processing category: {category_name}")
                all_results = load_and_validate_results(results_file)
                logger.info(f"Data reloaded successfully: {len(all_results)} results available")
            except Exception as e:
                logger.warning(f"Unable to reload data before category {category_name}: {str(e)}")
                # In case of error, keep existing data if possible
                if not isinstance(all_results, list):
                    all_results = []
            
            # Check if category has leaderboards
            if "leaderboards" not in category or not isinstance(category["leaderboards"], list):
                logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.")
                continue
            
            # Process each leaderboard in the category
            for leaderboard in category["leaderboards"]:
                if "uid" not in leaderboard:
                    logger.warning(f"Leaderboard in category '{category_name}' has no UID.")
                    continue
                
                leaderboard_uid = leaderboard["uid"]
                
                # In specific UID mode, ignore all other leaderboards
                if target_uid and target_uid != leaderboard_uid:
                    logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}")
                    continue
                
                # Get additional rules if available
                additional_rules = leaderboard.get("additionnal_agent_rules", None)
                
                # Check if we should force processing this leaderboard
                # Using the new distinct options
                force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid
                force_retry_category = getattr(args, "force_retry_category", None) == normalized_category
                
                # Support for the old option for backward compatibility (to be removed later)
                legacy_force_retry = False
                if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None:
                    legacy_force_retry = (
                        getattr(args, "force_retry", None) == leaderboard_uid or
                        getattr(args, "force_retry", None) == normalized_category
                    )
                    if legacy_force_retry:
                        logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.")
                
                # Combine different sources of force_retry
                force_retry = force_retry_uid or force_retry_category or legacy_force_retry
                
                # Add explicit logs about the reason for force retry
                if force_retry:
                    if force_retry_uid:
                        logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}")
                    elif force_retry_category:
                        logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}")
                    elif legacy_force_retry:
                        logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}")
                
                # Search for the leaderboard URL in uid_to_host (direct dictionary lookup)
                host = uid_to_host.get(leaderboard_uid)
                
                if not host:
                    logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.")
                    # Show more information for debugging
                    logger.debug(f"Total number of UIDs available: {len(uid_to_host)}")
                    continue
                
                # Create combined identifier (category_uid)
                # The category is already normalized by create_category_slug
                combined_uid = create_combined_id(normalized_category, leaderboard_uid)
                
                # If force_retry is enabled, process the leaderboard without checking the time since last processing
                if force_retry:
                    logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.")
                    leaderboards_to_process.append({
                        "uid": leaderboard_uid,
                        "host": host,
                        "category": normalized_category,
                        "additional_rules": additional_rules,
                        "force_retry": force_retry
                    })
                    continue  # Skip directly to the next leaderboard
                
                # Check if the leaderboard has already been processed recently
                needs_reprocessing = True
                if combined_uid in processed_results_map:
                    # Check if the leaderboard has been processed within the interval
                    result = processed_results_map[combined_uid]
                    
                    # If the --retry-rejected option is active and the status is "rejected", force reprocessing
                    if getattr(args, "retry_rejected", False) and result.get("parsing_status") == "rejected":
                        logger.info(f"Leaderboard {combined_uid} previously rejected, forced reprocessing with --retry-rejected.")
                    elif "parsed_at" in result:
                        try:
                            # Convert parsing date to datetime object
                            parsed_at = datetime.datetime.fromisoformat(result["parsed_at"])
                            
                            # Calculate time elapsed since last parsing
                            time_diff = now - parsed_at
                            
                            # Add logs for debugging date checks
                            logger.info(f"DEBUG: Current date: {now.isoformat()}")
                            logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}")
                            logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}")
                            logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}")
                            
                            # Strictly check if the duration in seconds is greater than the threshold
                            time_seconds = time_diff.total_seconds()
                            
                            # If time elapsed is greater than max_age_seconds, reparse
                            if time_seconds > max_age_seconds:
                                needs_reprocessing = True
                                print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.")
                            else:
                                print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})")
                                continue
                        except (ValueError, TypeError):
                            # If date is invalid, reprocess by precaution
                            logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.")
                    else:
                        # If parsing date is missing, reprocess by precaution
                        logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.")
                else:
                    # If the leaderboard has never been processed, process it
                    logger.info(f"New leaderboard {combined_uid} to process.")
                
                if needs_reprocessing or force_retry:
                    leaderboards_to_process.append({
                        "uid": leaderboard_uid,
                        "host": host,
                        "category": normalized_category,
                        "additional_rules": additional_rules,
                        "force_retry": force_retry
                    })
        
        # Information on the number of leaderboards to process
        logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}")
        
        # Process each leaderboard
        for index, leaderboard_info in enumerate(leaderboards_to_process):
            leaderboard_uid = leaderboard_info["uid"]
            host = leaderboard_info["host"]
            category_name = leaderboard_info["category"]
            additional_rules = leaderboard_info["additional_rules"]
            force_retry = leaderboard_info["force_retry"]
            
            # Process this leaderboard
            logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})")
            
            try:
                # Force restart of browser every 2 leaderboards to avoid memory leaks
                if index > 0 and index % 2 == 0:
                    logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks")
                    cleanup_browser()
                    # Force garbage collection
                    import gc
                    gc.collect()
                    # Small pause to let the system clean up
                    time.sleep(3)
                
                # Process the leaderboard
                all_results = process_single_leaderboard(
                    leaderboard_uid, 
                    host, 
                    model, 
                    index, 
                    all_results, 
                    additional_rules, 
                    category_name
                )
                
                # Add detailed logs for diagnosing problems
                logger.info(f"Results after processing: {len(all_results)} elements")
                # Search for results corresponding to the processed leaderboard
                for idx, res in enumerate(all_results):
                    if res.get("original_uid") == leaderboard_uid:
                        logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}")
                
                # Clean up after each processing
                cleanup_browser()
                
                # Verify if the leaderboard exists with the exact normalized category
                # MODIFICATION: Strict search by original_uid AND category
                normalized_category_name = create_category_slug(category_name)
                current_result = None
                for result in all_results:
                    # Always compare normalized categories to avoid format issues
                    result_category = result.get("category", "")
                    if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name:
                        current_result = result
                        logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}")
                        break
                
                # SUPPRESSION: No longer search for alternatives with only original_uid
                # If result is not found, it's probably an error or processing failed
                if not current_result:
                    logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}")
                    logger.error(f"Search for all results corresponding to this UID:")
                    for res in all_results:
                        if res.get("original_uid") == leaderboard_uid:
                            logger.error(f"  - Result with category={res.get('category')}, uid={res.get('uid')}")
                    logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found")
                    continue
                
                # Update only this specific leaderboard in the results file
                logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file")
                updated_results = update_leaderboard_result(current_result, results_file)
                
                # CORRECTION CRITIQUE: Update all_results list with file data
                # to avoid desynchronization between file and in-memory list
                all_results = updated_results
                
                # Update global result for next leaderboard
                results_data = updated_results
                
                logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved")
                
                # Upload to HF Hub after each leaderboard if not in local-only mode
                if not getattr(args, "local_only", False):
                    logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}")
                    try:
                        upload_to_hub(to_parse_file=category_list_file, results_file=results_file)
                        logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}")
                    except Exception as upload_err:
                        logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}")
            except Exception as e:
                logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}")
                continue
        
        # Final save not necessary as all leaderboards have already been updated individually
        logger.info("Leaderboard processing completed")
        
        update_server_status("completed")
        return True, "Processing completed successfully"
    
    except Exception as e:
        update_server_status("failed", str(e))
        logger.exception("Error processing leaderboards")
        return False, f"Error processing leaderboards: {str(e)}"