# File: data_ingestion.py import arxiv import io import requests from typing import List, Dict, Any from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_qdrant import Qdrant from datasets import load_dataset, Dataset from langchain_community.document_loaders import PyMuPDFLoader from config import * embeddings = OpenAIEmbeddings(model="text-embedding-3-small") text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) def fetch_arxiv_metadata(query: str, max_results: int = 10) -> List[Dict[str, Any]]: client = arxiv.Client(page_size=max_results, delay_seconds=3, num_retries=3) search = arxiv.Search(query=query, max_results=max_results) results = [] for result in client.results(search): metadata = { "title": result.title, "authors": [author.name for author in result.authors], "published": result.published.isoformat(), "updated": result.updated.isoformat(), "pdf_url": result.pdf_url, "entry_id": result.entry_id, "summary": result.summary } results.append(metadata) return results def process_pdf(pdf_url: str) -> str: loader = PyMuPDFLoader(pdf_url) data = loader.load() return "\n".join([page.page_content for page in data]) def ingest_documents(metadata_list: List[Dict[str, Any]]): qdrant = Qdrant.from_documents( [], # We'll add documents one by one embeddings, url=QDRANT_API_URL, api_key=QDRANT_API_KEY, collection_name=COLLECTION_NAME, ) dataset = load_dataset(DATASET_NAME) new_data = [] for metadata in metadata_list: pdf_text = process_pdf(metadata["pdf_url"]) chunks = text_splitter.split_text(pdf_text) # Add to Qdrant qdrant.add_texts(chunks, metadatas=[metadata] * len(chunks)) # Prepare data for Hugging Face dataset for chunk in chunks: new_data.append({ "text": chunk, "metadata": metadata, "embedding": embeddings.embed_query(chunk) }) # Update Hugging Face dataset new_dataset = Dataset.from_dict({k: [d[k] for d in new_data] for k in new_data[0]}) dataset = dataset.add_item(new_dataset) dataset.push_to_hub(DATASET_NAME)