arxiv-rag-mvp / data_ingestion.py
donb-hf's picture
update services
84deff7
# data_ingestion.py
from typing import List, Dict, Any
from pydantic import BaseModel
import logging
import traceback
from config import *
from arxiv_fetcher import fetch_arxiv_metadata
from pdf_processor import process_pdf, split_text
from qdrant_manager import QdrantManager
from huggingface_dataset_manager import HuggingFaceDatasetManager
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ArxivQueryRequest(BaseModel):
query: str
max_results: int = 10
qdrant_manager = QdrantManager(QDRANT_API_URL, QDRANT_API_KEY, COLLECTION_NAME)
hf_dataset_manager = HuggingFaceDatasetManager(DATASET_NAME)
def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str:
logging.info(f"Starting ingestion of {len(metadata_list)} documents")
new_data = []
for i, metadata in enumerate(metadata_list):
try:
logging.info(f"Processing document {i+1}/{len(metadata_list)}: {metadata['title']}")
pdf_text = process_pdf(metadata["pdf_url"])
if not pdf_text:
logging.warning(f"No text extracted from PDF: {metadata['pdf_url']}")
continue
chunks = split_text(pdf_text)
if not chunks:
logging.warning(f"No chunks generated from PDF text for document {metadata['title']}")
continue
qdrant_manager.add_texts(chunks, metadata)
new_data.extend([{
"text": chunk,
"metadata": metadata,
"embedding": qdrant_manager.get_embeddings(chunk)
} for chunk in chunks])
logging.info(f"Processed document {i+1}/{len(metadata_list)} successfully")
except Exception as e:
logging.error(f"Error processing document {i+1} ({metadata['title']}): {str(e)}")
if not new_data:
logging.error("No data to add to the dataset")
return "No data to add to the dataset"
hf_dataset_manager.update_dataset(new_data)
result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset."
logging.info(result_message)
return result_message
def run_ingestion_pipeline(query: str, max_results: int = 10) -> str:
try:
logging.info(f"Running ingestion pipeline with query: {query} and max_results: {max_results}")
metadata_list = fetch_arxiv_metadata(query, max_results)
if not metadata_list:
return "No results found for the given query"
result = ingest_documents(metadata_list)
return result
except Exception as e:
error_message = f"Error in ingestion pipeline: {str(e)}\n{traceback.format_exc()}"
logging.error(error_message)
return error_message
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run the arXiv ingestion pipeline")
parser.add_argument("query", type=str, help="arXiv search query")
parser.add_argument("--max_results", type=int, default=10, help="Maximum number of results to fetch")
args = parser.parse_args()
result = run_ingestion_pipeline(args.query, args.max_results)
print(result)