Spaces:
Sleeping
Sleeping
File size: 4,546 Bytes
ef609eb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
from fastapi import FastAPI, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import PyPDF2
import openai
import numpy as np
import faiss
import tiktoken
from typing import List
import io
from dotenv import load_dotenv
import os
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory storage
class DocumentStore:
def __init__(self):
self.documents: List[str] = []
self.embeddings = None
self.index = None
def reset(self):
self.documents = []
self.embeddings = None
self.index = None
doc_store = DocumentStore()
class Question(BaseModel):
text: str
def get_embedding(text: str) -> List[float]:
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def chunk_text(text: str, chunk_size: int = 1000) -> List[str]:
words = text.split()
chunks = []
current_chunk = []
current_size = 0
for word in words:
current_chunk.append(word)
current_size += len(word) + 1
if current_size >= chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
@app.post("/upload")
async def upload_pdf(file: UploadFile):
if not file.filename.endswith('.pdf'):
raise HTTPException(status_code=400, detail="File must be a PDF")
try:
# Reset the document store
doc_store.reset()
# Read PDF content
content = await file.read()
pdf_reader = PyPDF2.PdfReader(io.BytesIO(content))
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
# Chunk the text
chunks = chunk_text(text)
doc_store.documents = chunks
# Create embeddings
embeddings = [get_embedding(chunk) for chunk in chunks]
doc_store.embeddings = np.array(embeddings, dtype=np.float32)
# Create FAISS index
dimension = len(embeddings[0])
doc_store.index = faiss.IndexFlatL2(dimension)
doc_store.index.add(doc_store.embeddings)
return {"message": "PDF processed successfully", "chunks": len(chunks)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ask")
async def ask_question(question: Question):
if not doc_store.index:
raise HTTPException(
status_code=400, detail="No document has been uploaded yet")
try:
# Get question embedding
question_embedding = get_embedding(question.text)
# Search similar chunks
k = 10 # Number of relevant chunks to retrieve
D, I = doc_store.index.search(
np.array([question_embedding], dtype=np.float32), k)
# Get relevant chunks
relevant_chunks = [doc_store.documents[i] for i in I[0]]
print(relevant_chunks)
# Create prompt
prompt = f"""Based on the following context, please answer the question.
If the answer cannot be found in the context, say "I cannot find the answer in the document." You may also use the context to infer information that is not explicitly stated in the context. For example, if the context does not explicitly state what the paper is about, you may infer that the paper is about the topic of the question or the retrieved context.
Context:
{' '.join(relevant_chunks)}
Question: {question.text}
"""
# Get response from OpenAI
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
{"role": "user", "content": prompt}
]
)
return {"answer": response.choices[0].message.content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Configure OpenAI API key
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info",
workers=1
)
|