File size: 3,281 Bytes
84deff7
8c3a73e
84deff7
b4c442a
84deff7
 
 
 
 
 
b4c442a
 
8c3a73e
84deff7
 
 
8c3a73e
84deff7
 
8c3a73e
b4c442a
 
8c3a73e
 
b4c442a
 
84deff7
b4c442a
84deff7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4c442a
84deff7
 
 
 
 
 
 
 
b4c442a
 
 
 
 
 
84deff7
b4c442a
e808c52
 
b4c442a
 
 
84deff7
b4c442a
84deff7
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# data_ingestion.py
from typing import List, Dict, Any
from pydantic import BaseModel
import logging
import traceback
from config import *
from arxiv_fetcher import fetch_arxiv_metadata
from pdf_processor import process_pdf, split_text
from qdrant_manager import QdrantManager
from huggingface_dataset_manager import HuggingFaceDatasetManager

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class ArxivQueryRequest(BaseModel):
    query: str
    max_results: int = 10

qdrant_manager = QdrantManager(QDRANT_API_URL, QDRANT_API_KEY, COLLECTION_NAME)
hf_dataset_manager = HuggingFaceDatasetManager(DATASET_NAME)

def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str:
    logging.info(f"Starting ingestion of {len(metadata_list)} documents")
    new_data = []

    for i, metadata in enumerate(metadata_list):
        try:
            logging.info(f"Processing document {i+1}/{len(metadata_list)}: {metadata['title']}")
            pdf_text = process_pdf(metadata["pdf_url"])
            if not pdf_text:
                logging.warning(f"No text extracted from PDF: {metadata['pdf_url']}")
                continue

            chunks = split_text(pdf_text)
            if not chunks:
                logging.warning(f"No chunks generated from PDF text for document {metadata['title']}")
                continue

            qdrant_manager.add_texts(chunks, metadata)

            new_data.extend([{
                "text": chunk,
                "metadata": metadata,
                "embedding": qdrant_manager.get_embeddings(chunk)
            } for chunk in chunks])
            logging.info(f"Processed document {i+1}/{len(metadata_list)} successfully")
        except Exception as e:
            logging.error(f"Error processing document {i+1} ({metadata['title']}): {str(e)}")

    if not new_data:
        logging.error("No data to add to the dataset")
        return "No data to add to the dataset"

    hf_dataset_manager.update_dataset(new_data)

    result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset."
    logging.info(result_message)
    return result_message

def run_ingestion_pipeline(query: str, max_results: int = 10) -> str:
    try:
        logging.info(f"Running ingestion pipeline with query: {query} and max_results: {max_results}")
        metadata_list = fetch_arxiv_metadata(query, max_results)
        if not metadata_list:
            return "No results found for the given query"
        result = ingest_documents(metadata_list)
        return result
    except Exception as e:
        error_message = f"Error in ingestion pipeline: {str(e)}\n{traceback.format_exc()}"
        logging.error(error_message)
        return error_message

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Run the arXiv ingestion pipeline")
    parser.add_argument("query", type=str, help="arXiv search query")
    parser.add_argument("--max_results", type=int, default=10, help="Maximum number of results to fetch")
    args = parser.parse_args()

    result = run_ingestion_pipeline(args.query, args.max_results)
    print(result)