Spaces:
Build error
Build error
""" | |
Scheduling module for the leaderboard parser. | |
This module contains scheduling functions for periodic execution of leaderboard processing. | |
""" | |
import datetime | |
import threading | |
import time | |
import logging | |
import os | |
# Initialize logger | |
logger = logging.getLogger("leaderboard-parser") | |
# Global variables for scheduler | |
stop_thread = False | |
last_run_time = None | |
# Reference to the processing function (will be defined by initialize_scheduler) | |
process_leaderboards_function = None | |
scheduler_args = None | |
def initialize_scheduler(process_function, args=None): | |
""" | |
Initialize the scheduler with the leaderboard processing function. | |
Args: | |
process_function: Function that processes leaderboards | |
args: Arguments passed from command line to be used in scheduled processing | |
""" | |
global process_leaderboards_function, scheduler_args | |
process_leaderboards_function = process_function | |
scheduler_args = args if args is not None else {} | |
logger.info("Scheduler initialized with processing function and arguments") | |
def scheduler_thread(): | |
"""Thread that checks when to run the leaderboard processing job""" | |
global stop_thread, last_run_time, process_leaderboards_function, scheduler_args | |
if not process_leaderboards_function: | |
logger.error("Scheduler has not been initialized with a processing function") | |
return | |
logger.info("Scheduler thread started") | |
# Get the reprocess interval from environment, default to 24 hours | |
interval_hours = int(os.environ.get("LEADERBOARD_REPROCESS_INTERVAL_HOURS", 24)) | |
interval_seconds = interval_hours * 3600 | |
logger.info(f"Leaderboard reprocess interval set to {interval_hours} hours") | |
while not stop_thread: | |
now = datetime.datetime.now() | |
# If no previous run or if the interval has passed since last run | |
if last_run_time is None or (now - last_run_time).total_seconds() >= interval_seconds: | |
logger.info(f"{interval_hours} hours have passed since last run, executing the job") | |
# Get the current status | |
from src.server import processing_status | |
# If we're not already processing | |
if processing_status != "running": | |
# Run the processing job | |
last_run_time = now | |
# Combine saved arguments with local_only=False | |
args_to_use = {"local_only": False} | |
# Add ignore_cooldown if it was passed as a command line argument | |
if scheduler_args.get("ignore_cooldown", False): | |
args_to_use["ignore_cooldown"] = True | |
logger.info("Using --ignore-cooldown option in scheduled processing") | |
# Run in a separate thread to avoid blocking the main thread | |
logger.info("Starting processing job in a separate thread") | |
processing_thread = threading.Thread( | |
target=lambda: process_leaderboards_function(args_to_use) | |
) | |
processing_thread.daemon = True | |
processing_thread.start() | |
# Wait at least 80% of the interval before checking again | |
# This prevents multiple executions and provides a buffer | |
time.sleep(interval_seconds * 0.8) | |
else: | |
# Calculate time until next run | |
seconds_until_next_run = interval_seconds - (now - last_run_time).total_seconds() | |
hours_until_next_run = seconds_until_next_run / 3600 | |
# Log progress every hour | |
if int(seconds_until_next_run) % 3600 < 10: # Log within the first 10 seconds of each hour | |
logger.info(f"Next scheduled run in {hours_until_next_run:.1f} hours") | |
# Sleep for a minute before checking again | |
time.sleep(60) | |
def start_scheduler(): | |
"""Start the scheduler thread""" | |
global stop_thread | |
# Reset stop flag | |
stop_thread = False | |
# Start the scheduler thread | |
scheduler = threading.Thread(target=scheduler_thread) | |
scheduler.daemon = True | |
scheduler.start() | |
logger.info("Scheduler thread started") | |
return scheduler | |
def stop_scheduler(): | |
"""Stop the scheduler thread""" | |
global stop_thread | |
stop_thread = True | |
logger.info("Scheduler thread stop requested") |