# from celery_worker import celery # from core.database import SessionLocal # from models.analysis_job import AnalysisJob # from tools.data_tools import get_stock_data # from tools.news_tools import get_combined_news_and_sentiment # from tools.analyst_tools import get_llm_analysis # from uuid import UUID # import json # @celery.task # def run_full_analysis(job_id: str, ticker: str): # print(f"\n--- [START] Full Analysis for Job ID: {job_id} ---") # # --- Stage 1: Data Fetching --- # try: # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # if not job: raise ValueError("Job not found") # job.status = "DATA_FETCHING" # db.commit() # print("[LOG] STATUS UPDATE: DATA_FETCHING") # data_result = get_stock_data(ticker) # if "error" in data_result: raise ValueError(data_result['error']) # company_name = data_result.get("company_name", ticker) # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # job.result = data_result # db.commit() # db.refresh(job) # Force reload from DB # print(f"[LOG] DB SAVE 1 (Data): Result keys are now: {list(job.result.keys())}") # except Exception as e: # print(f"!!! [FAILURE] Stage 1 (Data): {e}") # # ... error handling ... # return # # --- Stage 2: Intelligence Gathering --- # try: # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # job.status = "INTELLIGENCE_GATHERING" # db.commit() # print("[LOG] STATUS UPDATE: INTELLIGENCE_GATHERING") # intelligence_result = get_combined_news_and_sentiment(ticker, company_name) # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # current_result = dict(job.result) # current_result['intelligence_briefing'] = intelligence_result # job.result = current_result # db.commit() # db.refresh(job) # Force reload # print(f"[LOG] DB SAVE 2 (Intelligence): Result keys are now: {list(job.result.keys())}") # except Exception as e: # print(f"!!! [FAILURE] Stage 2 (Intelligence): {e}") # # ... error handling ... # return # # --- Stage 3: LLM Analysis --- # try: # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # job.status = "ANALYZING" # db.commit() # print("[LOG] STATUS UPDATE: ANALYZING") # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # data_for_llm = job.result # llm_result = get_llm_analysis(ticker, company_name, data_for_llm.get("intelligence_briefing", {})) # if "error" in llm_result: raise ValueError(llm_result['error']) # # --- Final Assembly and Save --- # print("[LOG] Finalizing results...") # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # final_result_data = dict(job.result) # final_result_data['llm_analysis'] = llm_result # job.result = final_result_data # job.status = "SUCCESS" # db.commit() # db.refresh(job) # print(f"[LOG] DB SAVE 3 (Final): Result keys are now: {list(job.result.keys())}") # print(f"--- [SUCCESS] Full analysis for {job_id} complete. ---") # except Exception as e: # print(f"!!! [FAILURE] Stage 3 (LLM): {e}") # with SessionLocal() as db: # job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() # if job: # job.status = "FAILED" # error_data = job.result if job.result else {} # error_data['error'] = str(e) # job.result = error_data # db.commit() from celery_worker import celery from core.database import SessionLocal from models.analysis_job import AnalysisJob from tools.data_tools import get_stock_data from tools.news_tools import get_combined_news_and_sentiment from tools.analyst_tools import get_llm_analysis from uuid import UUID import json @celery.task def run_full_analysis(job_id: str, ticker: str): """ The single, main task that orchestrates the entire analysis pipeline. """ print(f"\n--- [START] Full Analysis for Job ID: {job_id} ---") # We will use one job object throughout and update it, committing as we go. # This requires careful session management. db = SessionLocal() job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() if not job: print(f"Job {job_id} not found. Aborting.") db.close() return try: # --- Stage 1: Data Fetching --- print(f"Stage 1: DATA_FETCHING for job {job_id}") job.status = "DATA_FETCHING" db.commit() data_result = get_stock_data(ticker) if "error" in data_result: raise ValueError(f"Data fetching failed: {data_result['error']}") company_name = data_result.get("company_name", ticker) job.result = data_result db.commit() print("-> Data fetching stage complete.") # --- Stage 2: Intelligence Gathering --- print(f"Stage 2: INTELLIGENCE_GATHERING for job {job_id}") job.status = "INTELLIGENCE_GATHERING" db.commit() intelligence_result = get_combined_news_and_sentiment(ticker, company_name) current_result = dict(job.result) current_result['intelligence_briefing'] = intelligence_result job.result = current_result db.commit() print("-> Intelligence gathering stage complete.") # --- Stage 3: LLM Analysis --- print(f"Stage 3: ANALYZING for job {job_id}") job.status = "ANALYZING" db.commit() # We need to refresh the job object to get the latest result for the LLM db.refresh(job) data_for_llm = job.result llm_result = get_llm_analysis(ticker, company_name, data_for_llm.get("intelligence_briefing", {})) if "error" in llm_result: raise ValueError(f"LLM analysis failed: {llm_result['error']}") # --- Final Assembly and Save --- print("Finalizing results for job {job_id}") final_result_data = dict(job.result) final_result_data['llm_analysis'] = llm_result job.result = final_result_data job.status = "SUCCESS" db.commit() print(f"--- [SUCCESS] Full analysis for {job_id} complete. ---") except Exception as e: error_message = str(e) print(f"!!! [FAILURE] Full analysis for {job_id} FAILED: {error_message}") if job: job.status = "FAILED" # Provide a cleaner error message for the user, while keeping technical details user_friendly_error = f"Analysis failed for ticker '{ticker}'. This stock may not be listed or there was a problem fetching its data. Please check the ticker symbol and try again. (Details: {error_message})" error_data = job.result if job.result else {} error_data['error'] = user_friendly_error job.result = error_data db.commit() finally: db.close()