Spaces:
Runtime error
Runtime error
import os | |
# Set cache directories to writable locations | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" | |
os.environ["HF_HOME"] = "/tmp/hf_home" | |
os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface_cache" | |
os.makedirs("/tmp/huggingface_cache", exist_ok=True) | |
os.makedirs("/tmp/hf_home", exist_ok=True) | |
import pdfplumber | |
import re | |
import nltk | |
import torch | |
import uvicorn | |
import time | |
from nltk.tokenize import sent_tokenize | |
from transformers import pipeline | |
from fastapi import FastAPI, File, UploadFile, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
# Set NLTK data directory to a writable location | |
NLTK_DATA_DIR = "/tmp/nltk_data" | |
os.makedirs(NLTK_DATA_DIR, exist_ok=True) | |
nltk.data.path.append(NLTK_DATA_DIR) | |
# Download required NLTK resources | |
try: | |
nltk.data.find("tokenizers/punkt") | |
except LookupError: | |
nltk.download("punkt", download_dir=NLTK_DATA_DIR) | |
# Download punkt_tab as well (to fix the error) | |
try: | |
nltk.data.find("tokenizers/punkt_tab") | |
except LookupError: | |
nltk.download("punkt_tab", download_dir=NLTK_DATA_DIR) | |
# Initialize FastAPI App | |
app = FastAPI() | |
# Enable CORS for API Accessibility | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Automatically Detect Device (Use GPU if Available) | |
device = 0 if torch.cuda.is_available() else -1 | |
print(f"Using Device: {'GPU' if device == 0 else 'CPU'}") | |
# Load Summarization Model | |
summarizer = pipeline("summarization", model="google/pegasus-xsum", device=device) | |
# --- **Generalized Cleaning** --- | |
def clean_text(text): | |
text = re.sub(r"\[\d+\]|\(\d+\)|\(\d{4}\)", "", text) | |
text = re.sub(r"(References:.*$)", "", text, flags=re.IGNORECASE) | |
text = re.sub(r"https?://\S+|www\.\S+", "", text) | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
# --- **PDF Text Extraction** --- | |
def extract_text_from_pdf(pdf_path): | |
with pdfplumber.open(pdf_path) as pdf: | |
extracted_text = [page.extract_text() for page in pdf.pages if page.extract_text()] | |
return "\n".join(extracted_text) | |
# --- **Chunking for Summarization** --- | |
def split_text(text, chunk_size=2048): | |
sentences = sent_tokenize(text) | |
chunks, current_chunk = [], "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) + 1 <= chunk_size: | |
current_chunk += sentence + " " | |
else: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + " " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
# --- **Summarization Endpoint** --- | |
async def summarize_pdf(file: UploadFile = File(...)): | |
try: | |
start_time = time.time() | |
pdf_content = await file.read() | |
pdf_path = "/tmp/temp.pdf" # Store in /tmp/ | |
with open(pdf_path, "wb") as f: | |
f.write(pdf_content) | |
full_text = extract_text_from_pdf(pdf_path) | |
if not full_text.strip(): | |
return {"error": "No text extracted from the PDF."} | |
cleaned_text = clean_text(full_text) | |
text_chunks = split_text(cleaned_text, chunk_size=2048) | |
summaries = [summarizer(chunk, max_new_tokens=250, num_beams=5, truncation=True)[0]['summary_text'] for chunk in text_chunks] | |
final_summary = " ".join(summaries) | |
return {"summary": final_summary} | |
except Exception as e: | |
return {"error": str(e)} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |