# data_ingestion.py from typing import List, Dict, Any from pydantic import BaseModel import logging import traceback from config import * from arxiv_fetcher import fetch_arxiv_metadata from pdf_processor import process_pdf, split_text from qdrant_manager import QdrantManager from huggingface_dataset_manager import HuggingFaceDatasetManager logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class ArxivQueryRequest(BaseModel): query: str max_results: int = 10 qdrant_manager = QdrantManager(QDRANT_API_URL, QDRANT_API_KEY, COLLECTION_NAME) hf_dataset_manager = HuggingFaceDatasetManager(DATASET_NAME) def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str: logging.info(f"Starting ingestion of {len(metadata_list)} documents") new_data = [] for i, metadata in enumerate(metadata_list): try: logging.info(f"Processing document {i+1}/{len(metadata_list)}: {metadata['title']}") pdf_text = process_pdf(metadata["pdf_url"]) if not pdf_text: logging.warning(f"No text extracted from PDF: {metadata['pdf_url']}") continue chunks = split_text(pdf_text) if not chunks: logging.warning(f"No chunks generated from PDF text for document {metadata['title']}") continue qdrant_manager.add_texts(chunks, metadata) new_data.extend([{ "text": chunk, "metadata": metadata, "embedding": qdrant_manager.get_embeddings(chunk) } for chunk in chunks]) logging.info(f"Processed document {i+1}/{len(metadata_list)} successfully") except Exception as e: logging.error(f"Error processing document {i+1} ({metadata['title']}): {str(e)}") if not new_data: logging.error("No data to add to the dataset") return "No data to add to the dataset" hf_dataset_manager.update_dataset(new_data) result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset." logging.info(result_message) return result_message def run_ingestion_pipeline(query: str, max_results: int = 10) -> str: try: logging.info(f"Running ingestion pipeline with query: {query} and max_results: {max_results}") metadata_list = fetch_arxiv_metadata(query, max_results) if not metadata_list: return "No results found for the given query" result = ingest_documents(metadata_list) return result except Exception as e: error_message = f"Error in ingestion pipeline: {str(e)}\n{traceback.format_exc()}" logging.error(error_message) return error_message if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run the arXiv ingestion pipeline") parser.add_argument("query", type=str, help="arXiv search query") parser.add_argument("--max_results", type=int, default=10, help="Maximum number of results to fetch") args = parser.parse_args() result = run_ingestion_pipeline(args.query, args.max_results) print(result)