File size: 5,306 Bytes
a19a241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# file: main.py
import time
import os
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import List, Dict, Any
from dotenv import load_dotenv

# Import functions and classes from the new modular files
from document_processor import ingest_and_parse_document
from chunking import process_and_chunk
from embedding import EmbeddingClient
from retrieval import Retriever, generate_hypothetical_document
from generation import generate_answer

load_dotenv()

# --- FastAPI App Initialization ---
app = FastAPI(
    title="Modular RAG API",
    description="A modular API for Retrieval-Augmented Generation from documents.",
    version="2.0.0",
)

# --- Global Clients and API Keys ---
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
embedding_client = EmbeddingClient()
retriever = Retriever(embedding_client=embedding_client)


# --- Pydantic Models ---
class RunRequest(BaseModel):
    document_url: HttpUrl
    questions: List[str]

class RunResponse(BaseModel):
    answers: List[str]

class TestRequest(BaseModel):
    document_url: HttpUrl
#Endpoints

# --- NEW: Test Endpoint for Parsing ---
@app.post("/test/parse", response_model=Dict[str, Any], tags=["Testing"])
async def test_parsing_endpoint(request: TestRequest):
    """
    Tests the document ingestion and parsing phase.
    Returns the full markdown content and the time taken.
    """
    print("--- Running Parsing Test ---")
    start_time = time.perf_counter()
    
    try:
        markdown_content = await ingest_and_parse_document(request.document_url)
        
        end_time = time.perf_counter()
        duration = end_time - start_time
        print(f"--- Parsing took {duration:.2f} seconds ---")
        
        return {
            "parsing_time_seconds": duration,
            "character_count": len(markdown_content),
            "content": markdown_content
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred during parsing: {str(e)}")

@app.post("/hackrx/run", response_model=RunResponse)
async def run_rag_pipeline(request: RunRequest):
    """
    Runs the full RAG pipeline for a given document URL and a list of questions.
    """
    try:
        # --- STAGE 1 & 2: DOCUMENT INGESTION AND CHUNKING ---
        print("--- Kicking off RAG Pipeline ---")
        markdown_content = await ingest_and_parse_document(request.document_url)
        documents = process_and_chunk(markdown_content)

        if not documents:
            raise HTTPException(status_code=400, detail="Document could not be processed into chunks.")

        # --- STAGE 3: INDEXING (Embedding + BM25) ---
        # This step builds the search index for the current document.
        retriever.index(documents)

        # --- CONCURRENT WORKFLOW FOR ALL QUESTIONS ---

        # Step A: Concurrently generate hypothetical documents for all questions
        hyde_tasks = [generate_hypothetical_document(q, GROQ_API_KEY) for q in request.questions]
        all_hyde_docs = await asyncio.gather(*hyde_tasks)

        # Step B: Concurrently retrieve relevant chunks for all questions
        retrieval_tasks = [
            retriever.retrieve(q, hyde_doc)
            for q, hyde_doc in zip(request.questions, all_hyde_docs)
        ]
        all_retrieved_chunks = await asyncio.gather(*retrieval_tasks)

        # Step C: Concurrently generate final answers for all questions
        answer_tasks = [
            generate_answer(q, chunks, GROQ_API_KEY)
            for q, chunks in zip(request.questions, all_retrieved_chunks)
        ]
        final_answers = await asyncio.gather(*answer_tasks)

        print("--- RAG Pipeline Completed Successfully ---")
        return RunResponse(answers=final_answers)

    except Exception as e:
        print(f"An unhandled error occurred in the pipeline: {e}")
        # Re-raising as a 500 error for the client
        raise HTTPException(
            status_code=500, detail=f"An internal server error occurred: {str(e)}"
        )

@app.post("/test/chunk", response_model=Dict[str, Any], tags=["Testing"])
async def test_chunking_endpoint(request: TestRequest):
    """
    Tests both the parsing and chunking phases together.
    Returns the final list of chunks and the total time taken.
    """
    print("--- Running Parsing and Chunking Test ---")
    start_time = time.perf_counter()

    try:
        # Step 1: Parse the document
        markdown_content = await ingest_and_parse_document(request.document_url)
        
        # Step 2: Chunk the parsed content
        documents = process_and_chunk(markdown_content)
        
        end_time = time.perf_counter()
        duration = end_time - start_time
        print(f"--- Parsing and Chunking took {duration:.2f} seconds ---")

        # Convert Document objects to a JSON-serializable list
        chunk_results = [
            {"page_content": doc.page_content, "metadata": doc.metadata}
            for doc in documents
        ]
        
        return {
            "total_time_seconds": duration,
            "chunk_count": len(chunk_results),
            "chunks": chunk_results
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred during chunking: {str(e)}")