Spaces:
Build error
Build error
""" | |
Utilities for file management. | |
""" | |
import json | |
import os | |
import datetime | |
import shutil | |
import time | |
import random | |
import tempfile | |
import logging | |
from filelock import FileLock | |
logger = logging.getLogger("leaderboard-parser") | |
def save_results(results, file_path): | |
""" | |
Save results to a JSON file. | |
Args: | |
results: The results to save | |
file_path: The path to the file | |
""" | |
with open(file_path, "w") as f: | |
json.dump(results, f, indent=2) | |
def create_category_slug(category_name): | |
""" | |
Creates a slug from a category name. | |
The slug uses only hyphens as separators (no underscore). | |
Args: | |
category_name: The category name | |
Returns: | |
The category slug | |
""" | |
if not category_name: | |
return "" | |
# Convert to lowercase and replace spaces with hyphens | |
# Ensure no underscores are used in the category slug | |
return category_name.lower().replace(" ", "-").replace("_", "-") | |
def create_combined_id(category, uid): | |
""" | |
Creates a normalized combined identifier from a category and UID. | |
First normalizes the category using create_category_slug. | |
Args: | |
category: The category name | |
uid: The UID of the leaderboard | |
Returns: | |
The combined identifier in the format category_slug_uid | |
""" | |
normalized_category = create_category_slug(category) | |
return f"{normalized_category}_{uid}" | |
def validate_leaderboard_result(result): | |
""" | |
Validates and corrects if necessary a leaderboard result to ensure identifier consistency. | |
This function checks: | |
1. That 'uid' is present and correctly formatted (category_original_uid) | |
2. That 'original_uid' is present | |
3. That 'category' is present and normalized | |
4. That 'uid' corresponds to the combination of category and original_uid | |
Args: | |
result: The leaderboard result to validate (dict) | |
Returns: | |
The validated and corrected result, or None if validation is impossible | |
""" | |
if not isinstance(result, dict): | |
logger.error(f"Validation error: the result is not a dictionary") | |
return None | |
# Check if required fields are present | |
if "original_uid" not in result: | |
logger.error(f"Validation error: original_uid missing from result") | |
return None | |
if "category" not in result: | |
logger.error(f"Validation error: category missing from result") | |
return None | |
original_uid = result["original_uid"] | |
category = result["category"] | |
# Normalize the category if necessary | |
normalized_category = create_category_slug(category) | |
if normalized_category != category: | |
logger.warning(f"Category not normalized: '{category}' -> '{normalized_category}'") | |
result["category"] = normalized_category | |
# Recalculate the correct combined uid | |
correct_uid = create_combined_id(normalized_category, original_uid) | |
# Check if existing uid is correct | |
if "uid" not in result: | |
logger.warning(f"uid missing, adding calculated uid: {correct_uid}") | |
result["uid"] = correct_uid | |
elif result["uid"] != correct_uid: | |
logger.warning(f"uid inconsistent: '{result['uid']}' does not match '{correct_uid}', correction applied") | |
result["uid"] = correct_uid | |
return result | |
def load_and_validate_results(file_path): | |
""" | |
Loads results from the file without strict validation. | |
Args: | |
file_path: Path to the results file | |
Returns: | |
List of results, or empty list in case of error | |
""" | |
try: | |
# Load results from the file | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
results_data = json.load(f) | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
logger.warning(f"Unable to load file {file_path}: {str(e)}") | |
return [] | |
# Convert from dict with "leaderboards" to array if necessary | |
if isinstance(results_data, dict) and "leaderboards" in results_data: | |
array_results = [] | |
for uid, item in results_data["leaderboards"].items(): | |
item_copy = item.copy() | |
item_copy["uid"] = uid | |
array_results.append(item_copy) | |
results_data = array_results | |
# Ensure results_data is a list | |
if not isinstance(results_data, list): | |
logger.warning(f"Invalid data format in {file_path}, initializing empty list") | |
return [] | |
# Sort results | |
results_data.sort(key=lambda x: (x.get("category", ""), x.get("original_uid", ""))) | |
logger.info(f"Load successful: {len(results_data)} results") | |
return results_data | |
except Exception as e: | |
logger.error(f"Error loading results: {str(e)}") | |
return [] | |
def update_leaderboard_result(leaderboard_result, file_path, max_wait_seconds=30): | |
""" | |
Updates a leaderboard result in the specified file. | |
If an entry with the same uid already exists, it is updated. | |
Otherwise, a new entry is added. | |
Args: | |
leaderboard_result: The leaderboard result to update (must contain a uid) | |
file_path: Path to the results file | |
max_wait_seconds: Maximum wait time for file lock (in seconds) | |
Returns: | |
Updated results list or None in case of error | |
""" | |
if not leaderboard_result or "uid" not in leaderboard_result: | |
logger.error("Unable to update: invalid or missing leaderboard result or uid") | |
return None | |
# Create parent directory if necessary | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Use a lock to avoid concurrent writes | |
lock_path = f"{file_path}.lock" | |
lock = FileLock(lock_path, timeout=max_wait_seconds) | |
try: | |
with lock: | |
# Load existing results | |
current_results = load_and_validate_results(file_path) | |
# Index by uid for easy update | |
results_by_uid = {r.get("uid", ""): r for r in current_results if "uid" in r} | |
# Update or add result | |
uid = leaderboard_result["uid"] | |
if uid in results_by_uid: | |
# Update existing result | |
results_by_uid[uid].update(leaderboard_result) | |
logger.info(f"Result updated for uid: {uid}") | |
else: | |
# Add new result | |
results_by_uid[uid] = leaderboard_result | |
logger.info(f"New result added for uid: {uid}") | |
# Convert to list for writing | |
updated_results = list(results_by_uid.values()) | |
# Sort results | |
updated_results.sort(key=lambda x: (x.get("category", ""), x.get("original_uid", ""))) | |
# Write to temporary file then rename for atomicity | |
fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path)) | |
try: | |
with os.fdopen(fd, 'w', encoding='utf-8') as f: | |
json.dump(updated_results, f, indent=2, ensure_ascii=False) | |
# Replace original file with temporary file | |
shutil.move(temp_path, file_path) | |
logger.info(f"File updated successfully: {file_path}") | |
return updated_results | |
except Exception as e: | |
# Clean up in case of error | |
if os.path.exists(temp_path): | |
os.unlink(temp_path) | |
raise e | |
except Exception as e: | |
logger.error(f"Error updating file {file_path}: {str(e)}") | |
return None | |
def split_combined_id(combined_id): | |
""" | |
Splits a combined identifier (category_uid) into its components. | |
Uses only the first underscore "_" as separator. | |
Args: | |
combined_id: The combined identifier (category_uid) | |
Returns: | |
A tuple (category, uid) or (None, combined_id) if no underscore | |
""" | |
if not combined_id: | |
return None, None | |
# Search for the first underscore to separate category and uid | |
parts = combined_id.split("_", 1) | |
if len(parts) == 2: | |
return parts[0], parts[1] | |
else: | |
# If no underscore, consider it as just a uid without category | |
return None, combined_id | |
def format_datetime(dt_str): | |
""" | |
Format a datetime string to a human readable format. | |
Args: | |
dt_str: The datetime string to format | |
Returns: | |
A formatted datetime string | |
""" | |
try: | |
# Check if input is already a datetime object | |
if isinstance(dt_str, datetime.datetime): | |
dt = dt_str | |
else: | |
# Convert ISO format to datetime object | |
# Handle different formats of ISO dates including fractional seconds and timezone | |
try: | |
dt = datetime.datetime.fromisoformat(dt_str) | |
except ValueError: | |
# Handle other common formats | |
formats = [ | |
"%Y-%m-%dT%H:%M:%S.%f%z", | |
"%Y-%m-%dT%H:%M:%S.%f", | |
"%Y-%m-%dT%H:%M:%S%z", | |
"%Y-%m-%dT%H:%M:%S", | |
"%Y-%m-%d %H:%M:%S", | |
"%Y-%m-%d" | |
] | |
for fmt in formats: | |
try: | |
dt = datetime.datetime.strptime(dt_str, fmt) | |
break | |
except ValueError: | |
continue | |
else: | |
# If no format matches | |
return dt_str | |
# Format the datetime object | |
return dt.strftime("%d/%m/%Y à %H:%M:%S") | |
except (ValueError, TypeError) as e: | |
print(f"Error formatting date {dt_str}: {e}") | |
return dt_str | |
def clean_output_files(results_file): | |
""" | |
Clean the output files, but keep a backup of the original. | |
Args: | |
results_file: The results file to clean | |
""" | |
# If results file exists, make a backup | |
if os.path.exists(results_file): | |
backup_file = f"{results_file}.backup" | |
shutil.copy2(results_file, backup_file) | |
print(f"Backup of {results_file} created in {backup_file}") | |
# Create an empty results file | |
with open(results_file, "w") as f: | |
json.dump([], f, indent=2) | |
print(f"File {results_file} cleaned") |