""" Scheduling module for the leaderboard parser. This module contains scheduling functions for periodic execution of leaderboard processing. """ import datetime import threading import time import logging import os # Initialize logger logger = logging.getLogger("leaderboard-parser") # Global variables for scheduler stop_thread = False last_run_time = None # Reference to the processing function (will be defined by initialize_scheduler) process_leaderboards_function = None def initialize_scheduler(process_function): """ Initialize the scheduler with the leaderboard processing function. Args: process_function: Function that processes leaderboards """ global process_leaderboards_function process_leaderboards_function = process_function logger.info("Scheduler initialized with processing function") def scheduler_thread(): """Thread that checks when to run the leaderboard processing job""" global stop_thread, last_run_time, process_leaderboards_function if not process_leaderboards_function: logger.error("Scheduler has not been initialized with a processing function") return logger.info("Scheduler thread started") # Get the reprocess interval from environment, default to 24 hours interval_hours = int(os.environ.get("LEADERBOARD_REPROCESS_INTERVAL_HOURS", 24)) interval_seconds = interval_hours * 3600 logger.info(f"Leaderboard reprocess interval set to {interval_hours} hours") while not stop_thread: now = datetime.datetime.now() # If no previous run or if the interval has passed since last run if last_run_time is None or (now - last_run_time).total_seconds() >= interval_seconds: logger.info(f"{interval_hours} hours have passed since last run, executing the job") # Get the current status from src.server import processing_status # If we're not already processing if processing_status != "running": # Run the processing job last_run_time = now success, message = process_leaderboards_function({"local_only": False}) logger.info(f"Processing job completed with status: {success}, message: {message}") # Wait at least 80% of the interval before checking again # This prevents multiple executions and provides a buffer time.sleep(interval_seconds * 0.8) else: # Calculate time until next run seconds_until_next_run = interval_seconds - (now - last_run_time).total_seconds() hours_until_next_run = seconds_until_next_run / 3600 # Log progress every hour if int(seconds_until_next_run) % 3600 < 10: # Log within the first 10 seconds of each hour logger.info(f"Next scheduled run in {hours_until_next_run:.1f} hours") # Sleep for a minute before checking again time.sleep(60) def start_scheduler(): """Start the scheduler thread""" global stop_thread # Reset stop flag stop_thread = False # Start the scheduler thread scheduler = threading.Thread(target=scheduler_thread) scheduler.daemon = True scheduler.start() logger.info("Scheduler thread started") return scheduler def stop_scheduler(): """Stop the scheduler thread""" global stop_thread stop_thread = True logger.info("Scheduler thread stop requested")