import os
import gradio_pdf
import hashlib
import spaces
import re
import time
import click
import gradio as gr
from io import BytesIO
from PIL import Image
from gradio_pdf import PDF
from loguru import logger
import sys # Added for logging configuration
import base64 # Added for image encoding
from bs4 import BeautifulSoup # Added for HTML manipulation
from datetime import datetime
from pathlib import Path
import torch
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration, Qwen2_5_VLForConditionalGeneration
from transformers.image_utils import load_image
import fitz # PyMuPDF library for PDF processing
import html2text
import markdown
import tempfile
# Define supported file suffixes
pdf_suffixes = [".pdf"]
image_suffixes = [".png", ".jpeg", ".jpg"]
# --- Model and Processor Initialization ---
device = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_ID = "Logics-MLLM/Logics-Parsing"
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID,
trust_remote_code=True,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device).eval()
@spaces.GPU
def parse_page(image: Image.Image) -> str:
"""
Parses a single document page image using the Qwen2.5-VL model.
"""
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": "Parse this document page into a clean, structured HTML representation. Preserve the logical structure with appropriate tags for content blocks such as paragraphs (
), headings (
-
), tables (
), figures (), formulas (), and others. Include category tags, and filter out irrelevant elements like headers and footers."},
],
},
]
prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[prompt_full],
images=[image],
return_tensors="pt",
padding=True,
).to(device)
with torch.no_grad():
generated_ids = model.generate(
**inputs,
max_new_tokens=2048,
temperature=0.1,
top_p=0.9,
do_sample=True,
repetition_penalty=1.05,
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return output_text
def images_bytes_to_pdf_bytes(image_bytes: bytes) -> bytes:
"""
Converts image bytes into PDF bytes.
"""
pdf_buffer = BytesIO()
image = Image.open(BytesIO(image_bytes)).convert("RGB")
image.save(pdf_buffer, format="PDF", save_all=True)
pdf_bytes = pdf_buffer.getvalue()
pdf_buffer.close()
return pdf_bytes
def read_fn(path: str or Path) -> bytes:
"""
Reads a file and returns its content in bytes. Converts images to PDF bytes.
"""
if not isinstance(path, Path):
path = Path(path)
with open(str(path), "rb") as input_file:
file_bytes = input_file.read()
if path.suffix in image_suffixes:
return images_bytes_to_pdf_bytes(file_bytes)
elif path.suffix in pdf_suffixes:
return file_bytes
else:
raise Exception(f"Unknown file suffix: {path.suffix}")
def safe_stem(file_path: str) -> str:
"""
Creates a safe file stem from a path.
"""
stem = Path(file_path).stem
return re.sub(r'[^\w.]', '_', stem)
def to_pdf(file_path: str) -> str or None:
"""
Ensures the input file is in PDF format for consistent processing.
If the input is an image, it's converted to a temporary PDF.
"""
if file_path is None:
return None
pdf_bytes = read_fn(file_path)
unique_filename = f'{safe_stem(file_path)}.pdf'
# Use Gradio's temp directory for temporary files
tmp_dir = tempfile.gettempdir()
tmp_file_path = os.path.join(tmp_dir, unique_filename)
with open(tmp_file_path, 'wb') as tmp_pdf_file:
tmp_pdf_file.write(pdf_bytes)
return tmp_file_path
async def pdf_parse(file_path: str, request: gr.Request):
"""
Main parsing function that orchestrates the PDF processing pipeline.
It now extracts images directly and injects them into the final HTML.
"""
if file_path is None:
logger.warning("file_path is None")
return (
"
Please upload a PDF file
", "", "
No input file
",
None, None, "Error: No file provided"
)
logger.info(f'Processing file: {file_path}')
tmp_pdf_path = to_pdf(file_path)
if tmp_pdf_path is None:
return (
"
Failed to process file
", "", "
Processing error
",
None, None, "Error: Failed to process file"
)
start_time = time.time()
try:
pdf_document = fitz.open(tmp_pdf_path)
html_parts = []
# Process each page
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
logger.info(f"Processing Page {page_num + 1}/{len(pdf_document)}")
# --- 1. Extract images directly from the PDF page using PyMuPDF ---
page_images_base64 = []
img_list = page.get_images(full=True)
for img_index, img in enumerate(img_list):
xref = img[0]
base_image = pdf_document.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
base64_string = f"data:image/{image_ext};base64,{base64.b64encode(image_bytes).decode()}"
page_images_base64.append(base64_string)
logger.info(f" > Found {len(page_images_base64)} images on page {page_num + 1}.")
# --- 2. Render the page to an image for the VL-Model ---
zoom = 200 / 72.0 # Corresponds to 200 DPI
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
page_image = Image.open(BytesIO(pix.tobytes("png")))
# --- 3. Get the structured HTML from the model ---
logger.info(f" > Parsing page layout with Logics-Parsing model...")
html_content = parse_page(page_image)
# --- 4. Inject extracted images back into the HTML ---
if page_images_base64:
logger.info(f" > Injecting {len(page_images_base64)} extracted images into generated HTML...")
soup = BeautifulSoup(html_content, 'html.parser')
figures = soup.find_all('figure')
# If model identified same number of figures, inject images into them
if len(figures) == len(page_images_base64):
for fig, b64_img in zip(figures, page_images_base64):
img_tag = soup.new_tag('img', src=b64_img, style="max-width:100%; height:auto;")
fig.append(img_tag)
else: # Otherwise, append all images at the end of the page content as a fallback
logger.warning(f" > Mismatch: Model found {len(figures)} figures, but {len(page_images_base64)} images were extracted. Appending images to the end.")
for b64_img in page_images_base64:
img_tag = soup.new_tag('img', src=b64_img, style="max-width:100%; height:auto;")
p_tag = soup.new_tag('p')
p_tag.append(img_tag)
soup.append(p_tag)
html_content = str(soup)
html_parts.append(f'
{html_content}
')
pdf_document.close()
full_html = '\n'.join(html_parts)
parsing_time = time.time() - start_time
# Convert final rich HTML to Markdown
mmd = html2text.html2text(full_html)
mmd_html = markdown.markdown(mmd)
qwen_html = full_html
# Create a temporary markdown file for download
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f:
f.write(mmd)
md_path = f.name
cost_time = f'Parsing time: {parsing_time:.2f}s, Total time: {parsing_time:.2f}s'
return mmd_html, mmd, qwen_html, md_path, tmp_pdf_path, cost_time
except Exception as e:
logger.error(f"Parsing failed: {e}")
import traceback
traceback.print_exc()
return (
"
Parsing failed. Please try again.
", "", f"
Error: {str(e)}
",
None, None, f"Error: {str(e)}"
)
@click.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=True))
@click.pass_context
def main(ctx, **kwargs):
"""
Sets up and launches the Gradio user interface.
"""
# **FIX: Configure Loguru for better visibility in deployment environments**
logger.remove() # Remove default handler
logger.add(sys.stdout, level="INFO")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📄 Logics-Parsing Document Analysis")
gr.Markdown("Upload a PDF or image file to parse its content into structured Markdown and HTML formats, now with improved image extraction.")
with gr.Row():
with gr.Column(variant='panel', scale=5):
with gr.Row():
input_file = gr.File(label='Please upload a PDF or image (Max 20 pages for conversion)',
file_types=pdf_suffixes + image_suffixes)
with gr.Row():
change_bu = gr.Button('Convert', variant='primary')
clear_bu = gr.ClearButton(value='Clear')
pdf_show = PDF(label='PDF Preview', interactive=False, visible=True, height=800)
example_root = 'parsing/examples'
logger.info(f'Looking for examples in: {example_root}')
if os.path.exists(example_root) and os.path.isdir(example_root):
example_files = [os.path.join(example_root, f) for f in os.listdir(example_root) if f.endswith(tuple(pdf_suffixes + image_suffixes))]
if example_files:
with gr.Accordion('Examples:', open=True):
gr.Examples(examples=example_files, inputs=input_file)
with gr.Column(variant='panel', scale=5):
output_file = gr.File(label='Download Markdown Result', interactive=False)
cost_time = gr.Text(label='Time Cost', interactive=False)
with gr.Tabs():
with gr.Tab('Markdown Rendering'):
mmd_html = gr.HTML(label='MMD Rendering')
with gr.Tab('Markdown Source'):
mmd = gr.TextArea(lines=45, show_copy_button=True, label="Markdown Source")
with gr.Tab('Generated HTML'):
raw_html = gr.TextArea(lines=45, show_copy_button=True, label="Generated HTML")
components_to_clear = [input_file, pdf_show, mmd, raw_html, output_file, mmd_html, cost_time]
clear_bu.add(components_to_clear)
input_file.change(fn=to_pdf, inputs=input_file, outputs=pdf_show, show_progress="full")
change_bu.click(
fn=pdf_parse,
inputs=[input_file],
outputs=[mmd_html, mmd, raw_html, output_file, pdf_show, cost_time],
concurrency_limit=15,
show_progress="full"
)
demo.launch(debug=True)
if __name__ == '__main__':
main()