tfrere's picture
update on server ui
7e0c218
"""
Scheduling module for the leaderboard parser.
This module contains scheduling functions for periodic execution of leaderboard processing.
"""
import datetime
import threading
import time
import logging
import os
# Initialize logger
logger = logging.getLogger("leaderboard-parser")
# Global variables for scheduler
stop_thread = False
last_run_time = None
# Reference to the processing function (will be defined by initialize_scheduler)
process_leaderboards_function = None
scheduler_args = None
def initialize_scheduler(process_function, args=None):
"""
Initialize the scheduler with the leaderboard processing function.
Args:
process_function: Function that processes leaderboards
args: Arguments passed from command line to be used in scheduled processing
"""
global process_leaderboards_function, scheduler_args
process_leaderboards_function = process_function
scheduler_args = args if args is not None else {}
logger.info("Scheduler initialized with processing function and arguments")
def scheduler_thread():
"""Thread that checks when to run the leaderboard processing job"""
global stop_thread, last_run_time, process_leaderboards_function, scheduler_args
if not process_leaderboards_function:
logger.error("Scheduler has not been initialized with a processing function")
return
logger.info("Scheduler thread started")
# Get the reprocess interval from environment, default to 24 hours
interval_hours = int(os.environ.get("LEADERBOARD_REPROCESS_INTERVAL_HOURS", 24))
interval_seconds = interval_hours * 3600
logger.info(f"Leaderboard reprocess interval set to {interval_hours} hours")
while not stop_thread:
now = datetime.datetime.now()
# If no previous run or if the interval has passed since last run
if last_run_time is None or (now - last_run_time).total_seconds() >= interval_seconds:
logger.info(f"{interval_hours} hours have passed since last run, executing the job")
# Get the current status
from src.server import processing_status
# If we're not already processing
if processing_status != "running":
# Run the processing job
last_run_time = now
# Combine saved arguments with local_only=False
args_to_use = {"local_only": False}
# Add ignore_cooldown if it was passed as a command line argument
if scheduler_args.get("ignore_cooldown", False):
args_to_use["ignore_cooldown"] = True
logger.info("Using --ignore-cooldown option in scheduled processing")
# Run in a separate thread to avoid blocking the main thread
logger.info("Starting processing job in a separate thread")
processing_thread = threading.Thread(
target=lambda: process_leaderboards_function(args_to_use)
)
processing_thread.daemon = True
processing_thread.start()
# Wait at least 80% of the interval before checking again
# This prevents multiple executions and provides a buffer
time.sleep(interval_seconds * 0.8)
else:
# Calculate time until next run
seconds_until_next_run = interval_seconds - (now - last_run_time).total_seconds()
hours_until_next_run = seconds_until_next_run / 3600
# Log progress every hour
if int(seconds_until_next_run) % 3600 < 10: # Log within the first 10 seconds of each hour
logger.info(f"Next scheduled run in {hours_until_next_run:.1f} hours")
# Sleep for a minute before checking again
time.sleep(60)
def start_scheduler():
"""Start the scheduler thread"""
global stop_thread
# Reset stop flag
stop_thread = False
# Start the scheduler thread
scheduler = threading.Thread(target=scheduler_thread)
scheduler.daemon = True
scheduler.start()
logger.info("Scheduler thread started")
return scheduler
def stop_scheduler():
"""Stop the scheduler thread"""
global stop_thread
stop_thread = True
logger.info("Scheduler thread stop requested")