import random import time import traceback from api.db.db_models import close_connection from api.db.services.task_service import TaskService from rag.settings import cron_logger from rag.utils.minio_conn import MINIO from rag.utils.redis_conn import REDIS_CONN def collect(): doc_locations = TaskService.get_ongoing_doc_name() print(doc_locations) if len(doc_locations) == 0: time.sleep(1) return return doc_locations def main(): locations = collect() if not locations:return print("TASKS:", len(locations)) for kb_id, loc in locations: try: if REDIS_CONN.is_alive(): try: key = "{}/{}".format(kb_id, loc) if REDIS_CONN.exist(key):continue file_bin = MINIO.get(kb_id, loc) REDIS_CONN.transaction(key, file_bin, 12 * 60) cron_logger.info("CACHE: {}".format(loc)) except Exception as e: traceback.print_stack(e) except Exception as e: traceback.print_stack(e) if __name__ == "__main__": while True: main() close_connection() time.sleep(1)