import os
import hashlib
import spaces
import re
import time
import click
import gradio as gr
from io import BytesIO
from PIL import Image
from loguru import logger
from pathlib import Path
import torch
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration
from transformers.image_utils import load_image
import fitz
import html2text
import markdown
import tempfile
from typing import Optional, Tuple
# --- Constants and Setup ---
pdf_suffixes = [".pdf"]
image_suffixes = [".png", ".jpeg", ".jpg"]
device = "cuda" if torch.cuda.is_available() else "cpu"
# --- Model and Processor Initialization ---
logger.info(f"Using device: {device}")
# Model 1: Logics-Parsing
MODEL_ID_1 = "Logics-MLLM/Logics-Parsing"
logger.info(f"Loading model 1: {MODEL_ID_1}")
processor_1 = AutoProcessor.from_pretrained(MODEL_ID_1, trust_remote_code=True)
model_1 = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_1,
trust_remote_code=True,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device).eval()
logger.info(f"Model '{MODEL_ID_1}' loaded successfully.")
# Model 2: Gliese-OCR-7B-Post1.0
MODEL_ID_2 = "prithivMLmods/Gliese-OCR-7B-Post1.0"
logger.info(f"Loading model 2: {MODEL_ID_2}")
processor_2 = AutoProcessor.from_pretrained(MODEL_ID_2, trust_remote_code=True)
model_2 = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID_2,
trust_remote_code=True,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device).eval()
logger.info(f"Model '{MODEL_ID_2}' loaded successfully.")
@spaces.GPU
def parse_page(image: Image.Image, model_name: str) -> str:
"""
Parses a single document page image using the selected model.
"""
# Select the appropriate model and processor based on the choice
if model_name == "Logics-Parsing":
current_processor = processor_1
current_model = model_1
elif model_name == "Gliese-OCR-7B-Post1.0":
current_processor = processor_2
current_model = model_2
else:
raise ValueError(f"Unknown model choice: {model_name}")
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": "Parse this document page into a clean, structured HTML representation. Preserve the logical structure with appropriate tags for content blocks such as paragraphs (
), headings (
-
), tables (
), figures (), formulas (), and others. Include category tags, and filter out irrelevant elements like headers and footers."},
],
},
]
prompt_full = current_processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = current_processor(
text=[prompt_full], images=[image], return_tensors="pt", padding=True
).to(device)
with torch.no_grad():
generated_ids = current_model.generate(
**inputs, max_new_tokens=2048, temperature=0.1, top_p=0.9, do_sample=True, repetition_penalty=1.05
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = current_processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return output_text
def convert_pdf_to_images_fitz(pdf_path: str, dpi: int = 200) -> list:
"""
Converts a PDF file to a list of PIL Images using PyMuPDF (fitz).
"""
images = []
try:
pdf_document = fitz.open(pdf_path)
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
image = Image.open(BytesIO(img_data))
images.append(image)
pdf_document.close()
except Exception as e:
logger.error(f"Failed to convert PDF using PyMuPDF: {e}")
raise
return images
async def pdf_parse(file_path: str, model_choice: str):
"""
Main parsing function that orchestrates the PDF processing pipeline.
"""
if not file_path:
logger.warning("File path is None.")
return "
Please upload a file first.
", "", "", None, "Error: No file provided", None, "No file loaded"
logger.info(f'Processing file: {file_path} with model: {model_choice}')
start_time = time.time()
try:
pages = convert_pdf_to_images_fitz(file_path, dpi=200)
if not pages:
raise ValueError("Could not extract any pages from the PDF.")
html_parts = []
for i, page in enumerate(pages):
logger.info(f"Parsing page {i+1}/{len(pages)}")
# Pass the model choice to the parsing function
html = parse_page(page, model_choice)
html_parts.append(f'\n{html}')
full_html = '\n'.join(html_parts)
parsing_time = time.time() - start_time
mmd = html2text.html2text(full_html)
mmd_html = markdown.markdown(mmd, extensions=['fenced_code', 'tables'])
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f:
f.write(mmd)
md_path = f.name
cost_time_str = f'Total processing time: {parsing_time:.2f}s'
preview_image = pages[0]
page_info_html = f'
"
return error_html, "", "", None, f"Error: {str(e)}", None, "Error processing"
def show_pdf_preview_as_image(file_path: Optional[str]) -> Tuple[Optional[Image.Image], str]:
"""
Generates a PIL Image preview of the first page of a PDF or image file
and provides page count information.
"""
if not file_path:
return None, '
No file loaded
'
page_info_html = '
Page 1 / 1
'
try:
if Path(file_path).suffix.lower() in image_suffixes:
return Image.open(file_path).convert("RGB"), page_info_html
elif Path(file_path).suffix.lower() == '.pdf':
doc = fitz.open(file_path)
page_count = len(doc)
page_info_html = f'