Spaces:
Paused
Paused
# data_ingestion.py | |
from typing import List, Dict, Any | |
from pydantic import BaseModel | |
import logging | |
import traceback | |
from config import * | |
from arxiv_fetcher import fetch_arxiv_metadata | |
from pdf_processor import process_pdf, split_text | |
from qdrant_manager import QdrantManager | |
from huggingface_dataset_manager import HuggingFaceDatasetManager | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
class ArxivQueryRequest(BaseModel): | |
query: str | |
max_results: int = 10 | |
qdrant_manager = QdrantManager(QDRANT_API_URL, QDRANT_API_KEY, COLLECTION_NAME) | |
hf_dataset_manager = HuggingFaceDatasetManager(DATASET_NAME) | |
def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str: | |
logging.info(f"Starting ingestion of {len(metadata_list)} documents") | |
new_data = [] | |
for i, metadata in enumerate(metadata_list): | |
try: | |
logging.info(f"Processing document {i+1}/{len(metadata_list)}: {metadata['title']}") | |
pdf_text = process_pdf(metadata["pdf_url"]) | |
if not pdf_text: | |
logging.warning(f"No text extracted from PDF: {metadata['pdf_url']}") | |
continue | |
chunks = split_text(pdf_text) | |
if not chunks: | |
logging.warning(f"No chunks generated from PDF text for document {metadata['title']}") | |
continue | |
qdrant_manager.add_texts(chunks, metadata) | |
new_data.extend([{ | |
"text": chunk, | |
"metadata": metadata, | |
"embedding": qdrant_manager.get_embeddings(chunk) | |
} for chunk in chunks]) | |
logging.info(f"Processed document {i+1}/{len(metadata_list)} successfully") | |
except Exception as e: | |
logging.error(f"Error processing document {i+1} ({metadata['title']}): {str(e)}") | |
if not new_data: | |
logging.error("No data to add to the dataset") | |
return "No data to add to the dataset" | |
hf_dataset_manager.update_dataset(new_data) | |
result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset." | |
logging.info(result_message) | |
return result_message | |
def run_ingestion_pipeline(query: str, max_results: int = 10) -> str: | |
try: | |
logging.info(f"Running ingestion pipeline with query: {query} and max_results: {max_results}") | |
metadata_list = fetch_arxiv_metadata(query, max_results) | |
if not metadata_list: | |
return "No results found for the given query" | |
result = ingest_documents(metadata_list) | |
return result | |
except Exception as e: | |
error_message = f"Error in ingestion pipeline: {str(e)}\n{traceback.format_exc()}" | |
logging.error(error_message) | |
return error_message | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description="Run the arXiv ingestion pipeline") | |
parser.add_argument("query", type=str, help="arXiv search query") | |
parser.add_argument("--max_results", type=int, default=10, help="Maximum number of results to fetch") | |
args = parser.parse_args() | |
result = run_ingestion_pipeline(args.query, args.max_results) | |
print(result) | |