TrustOCR-Demo / app.py
MohammadReza-Halakoo's picture
Update app.py
cf7e59c verified
# # app.py — TRUST OCR DEMO (Streamlit) — works even if batch_text_detection is missing
# import os
# import io
# import tempfile
# from typing import List
# import numpy as np
# import cv2
# from PIL import Image
# import pypdfium2
# import pytesseract
# # --- set safe dirs before importing streamlit ---
# safe_home = os.environ.get("HOME") or "/app"
# os.environ["HOME"] = safe_home
# cfg_dir = os.path.join(safe_home, ".streamlit")
# os.makedirs(cfg_dir, exist_ok=True)
# # --- قبل از import streamlit، احیاناً مسیر کش قابل‌نوشتن:
# import os, tempfile
# os.environ.setdefault("HF_HOME", "/tmp/hf_home")
# os.makedirs(os.environ["HF_HOME"], exist_ok=True)
# import tempfile, os
# temp_dir = os.path.join(tempfile.gettempdir(), "trustocr_temp")
# os.makedirs(temp_dir, exist_ok=True)
# # جای "temp_files" استفاده کن
# # اطمینان از اینکه Streamlit همه فایل‌ها را اینجا می‌نویسد
# os.environ["STREAMLIT_CONFIG_DIR"] = cfg_dir
# # اگر دوست داری همین‌جا config.toml بسازی و usage stats را خاموش کنی:
# conf_path = os.path.join(cfg_dir, "config.toml")
# if not os.path.exists(conf_path):
# with open(conf_path, "w", encoding="utf-8") as f:
# f.write("browser.gatherUsageStats = false\n")
# # runtime dir امن
# runtime_dir = os.path.join(tempfile.gettempdir(), ".streamlit")
# os.environ["STREAMLIT_RUNTIME_DIR"] = runtime_dir
# os.makedirs(runtime_dir, exist_ok=True)
# import streamlit as st
# # ===== Safe runtime dir for Streamlit/HF cache =====
# # runtime_dir = os.path.join(tempfile.gettempdir(), ".streamlit")
# # os.environ["STREAMLIT_RUNTIME_DIR"] = runtime_dir
# # os.makedirs(runtime_dir, exist_ok=True)
# # ===== Try to import Surya APIs =====
# DET_AVAILABLE = True
# try:
# from surya.detection import batch_text_detection
# except Exception:
# DET_AVAILABLE = False
# from surya.layout import batch_layout_detection # may still import; we’ll gate usage by DET_AVAILABLE
# # Detection model loaders: segformer (newer) vs model (older)
# try:
# from surya.model.detection.segformer import load_model as load_det_model, load_processor as load_det_processor
# except Exception:
# from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor
# from surya.model.recognition.model import load_model as load_rec_model
# from surya.model.recognition.processor import load_processor as load_rec_processor
# from surya.model.ordering.model import load_model as load_order_model
# from surya.model.ordering.processor import load_processor as load_order_processor
# from surya.ordering import batch_ordering
# from surya.ocr import run_ocr
# from surya.postprocessing.heatmap import draw_polys_on_image
# from surya.postprocessing.text import draw_text_on_image
# from surya.languages import CODE_TO_LANGUAGE
# from surya.input.langs import replace_lang_with_code
# from surya.schema import OCRResult, TextDetectionResult, LayoutResult, OrderResult
# # ===================== Helper Functions =====================
# def remove_border(image_path: str, output_path: str) -> np.ndarray:
# """Remove outer border & deskew (perspective) if a rectangular contour is found."""
# image = cv2.imread(image_path)
# if image is None:
# raise ValueError(f"Cannot read image: {image_path}")
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if not contours:
# cv2.imwrite(output_path, image)
# return image
# max_contour = max(contours, key=cv2.contourArea)
# epsilon = 0.02 * cv2.arcLength(max_contour, True)
# approx = cv2.approxPolyDP(max_contour, epsilon, True)
# if len(approx) == 4:
# pts = approx.reshape(4, 2).astype("float32")
# rect = np.zeros((4, 2), dtype="float32")
# s = pts.sum(axis=1)
# rect[0] = pts[np.argmin(s)] # tl
# rect[2] = pts[np.argmax(s)] # br
# diff = np.diff(pts, axis=1)
# rect[1] = pts[np.argmin(diff)] # tr
# rect[3] = pts[np.argmax(diff)] # bl
# (tl, tr, br, bl) = rect
# widthA = np.linalg.norm(br - bl)
# widthB = np.linalg.norm(tr - tl)
# maxWidth = max(int(widthA), int(widthB))
# heightA = np.linalg.norm(tr - br)
# heightB = np.linalg.norm(tl - bl)
# maxHeight = max(int(heightA), int(heightB))
# dst = np.array([[0, 0], [maxWidth - 1, 0],
# [maxWidth - 1, maxHeight - 1],
# [0, maxHeight - 1]], dtype="float32")
# M = cv2.getPerspectiveTransform(rect, dst)
# cropped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
# cv2.imwrite(output_path, cropped)
# return cropped
# else:
# cv2.imwrite(output_path, image)
# return image
# def open_pdf(pdf_file) -> pypdfium2.PdfDocument:
# stream = io.BytesIO(pdf_file.getvalue())
# return pypdfium2.PdfDocument(stream)
# @st.cache_data(show_spinner=False)
# def get_page_image(pdf_file, page_num: int, dpi: int = 96) -> Image.Image:
# doc = open_pdf(pdf_file)
# renderer = doc.render(pypdfium2.PdfBitmap.to_pil, page_indices=[page_num - 1], scale=dpi / 72)
# png = list(renderer)[0]
# return png.convert("RGB")
# @st.cache_data(show_spinner=False)
# def page_count(pdf_file) -> int:
# doc = open_pdf(pdf_file)
# return len(doc)
# # ===================== Streamlit UI =====================
# st.set_page_config(page_title="TRUST OCR DEMO", layout="wide")
# st.markdown("# TRUST OCR DEMO")
# if not DET_AVAILABLE:
# st.warning("⚠️ ماژول تشخیص متن Surya در این محیط در دسترس نیست. OCR کامل کار می‌کند، اما دکمه‌های Detection/Layout/Order غیرفعال شده‌اند. برای فعال‌سازی آن‌ها، Surya را به نسخهٔ سازگار پین کنید (راهنما پایین صفحه).")
# # Sidebar controls
# in_file = st.sidebar.file_uploader("فایل PDF یا عکس :", type=["pdf", "png", "jpg", "jpeg", "gif", "webp"])
# languages = st.sidebar.multiselect(
# "زبان‌ها (Languages)",
# sorted(list(CODE_TO_LANGUAGE.values())),
# default=["Persian"],
# max_selections=4
# )
# auto_rotate = st.sidebar.toggle("چرخش خودکار (Tesseract OSD)", value=True)
# auto_border = st.sidebar.toggle("حذف قاب/کادر تصویر ورودی", value=True)
# # Buttons (disable some if detection missing)
# text_det_btn = st.sidebar.button("تشخیص متن (Detection)", disabled=not DET_AVAILABLE)
# layout_det_btn = st.sidebar.button("آنالیز صفحه (Layout)", disabled=not DET_AVAILABLE)
# order_det_btn = st.sidebar.button("ترتیب خوانش (Reading Order)", disabled=not DET_AVAILABLE)
# text_rec_btn = st.sidebar.button("تبدیل به متن (Recognition)")
# if in_file is None:
# st.info("یک فایل PDF/عکس از سایدبار انتخاب کنید. | Please upload a file to begin.")
# st.stop()
# filetype = in_file.type
# # Two-column layout (left: outputs / right: input image)
# col2, col1 = st.columns([.5, .5])
# # ===================== Load Models (cached) =====================
# @st.cache_resource(show_spinner=True)
# def load_det_cached():
# return load_det_model(checkpoint="vikp/surya_det2"), load_det_processor(checkpoint="vikp/surya_det2")
# # from huggingface_hub import HfFolder
# # HF_TOKEN = os.environ.get("HF_TOKEN")
# # @st.cache_resource(show_spinner=True)
# # def load_rec_cached():
# # return load_rec_model(checkpoint="MohammadReza-Halakoo/TrustOCR", token=HF_TOKEN), \
# # load_rec_processor(checkpoint="MohammadReza-Halakoo/TrustOCR", token=HF_TOKEN)
# @st.cache_resource(show_spinner=True)
# def load_rec_cached():
# checkpoints = [
# "MohammadReza-Halakoo/TrustOCR", # خصوصی
# "vikp/surya_rec2", # عمومی (fallback)
# ]
# last_err = None
# for ckpt in checkpoints:
# try:
# m = load_rec_model(checkpoint=ckpt)
# p = load_rec_processor(checkpoint=ckpt)
# return m, p
# except Exception as e:
# last_err = e
# st.error(f"Loading recognition checkpoint failed: {last_err}")
# raise last_err
# # @st.cache_resource(show_spinner=True)
# # def load_rec_cached():
# # return load_rec_model(checkpoint="MohammadReza-Halakoo/TrustOCR"), \
# # load_rec_processor(checkpoint="MohammadReza-Halakoo/TrustOCR")
# @st.cache_resource(show_spinner=True)
# def load_layout_cached():
# return load_det_model(checkpoint="vikp/surya_layout2"), load_det_processor(checkpoint="vikp/surya_layout2")
# @st.cache_resource(show_spinner=True)
# def load_order_cached():
# return load_order_model(checkpoint="vikp/surya_order"), load_order_processor(checkpoint="vikp/surya_order")
# # recognition models are enough for run_ocr; detection/layout/order models used only if DET_AVAILABLE
# rec_model, rec_processor = load_rec_cached()
# if DET_AVAILABLE:
# det_model, det_processor = load_det_cached()
# layout_model, layout_processor = load_layout_cached()
# order_model, order_processor = load_order_cached()
# else:
# det_model = det_processor = layout_model = layout_processor = order_model = order_processor = None
# # ===================== High-level Ops =====================
# def _apply_auto_rotate(pil_img: Image.Image) -> Image.Image:
# """Auto-rotate using Tesseract OSD if enabled."""
# if not auto_rotate:
# return pil_img
# try:
# osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT)
# angle = int(osd.get("rotate", 0)) # 0/90/180/270
# if angle and angle % 360 != 0:
# return pil_img.rotate(-angle, expand=True)
# return pil_img
# except Exception as e:
# st.warning(f"OSD rotation failed, continuing without rotation. Error: {e}")
# return pil_img
# def text_detection(pil_img: Image.Image):
# pred: TextDetectionResult = batch_text_detection([pil_img], det_model, det_processor)[0]
# polygons = [p.polygon for p in pred.bboxes]
# det_img = draw_polys_on_image(polygons, pil_img.copy())
# return det_img, pred
# def layout_detection(pil_img: Image.Image):
# _, det_pred = text_detection(pil_img)
# pred: LayoutResult = batch_layout_detection([pil_img], layout_model, layout_processor, [det_pred])[0]
# polygons = [p.polygon for p in pred.bboxes]
# labels = [p.label for p in pred.bboxes]
# layout_img = draw_polys_on_image(polygons, pil_img.copy(), labels=labels, label_font_size=40)
# return layout_img, pred
# def order_detection(pil_img: Image.Image):
# _, layout_pred = layout_detection(pil_img)
# bboxes = [l.bbox for l in layout_pred.bboxes]
# pred: OrderResult = batch_ordering([pil_img], [bboxes], order_model, order_processor)[0]
# polys = [l.polygon for l in pred.bboxes]
# positions = [str(l.position) for l in pred.bboxes]
# order_img = draw_polys_on_image(polys, pil_img.copy(), labels=positions, label_font_size=40)
# return order_img, pred
# def ocr_page(pil_img: Image.Image, langs: List[str]):
# """Full-page OCR using Surya run_ocr — works without detection import."""
# langs = list(langs) if langs else ["Persian"]
# replace_lang_with_code(langs) # in-place
# # If detection models are loaded, pass them; else, let run_ocr use its internal defaults
# args = [pil_img], [langs]
# if det_model and det_processor and rec_model and rec_processor:
# img_pred: OCRResult = run_ocr([pil_img], [langs], det_model, det_processor, rec_model, rec_processor)[0]
# else:
# img_pred: OCRResult = run_ocr([pil_img], [langs])[0]
# bboxes = [l.bbox for l in img_pred.text_lines]
# text = [l.text for l in img_pred.text_lines]
# rec_img = draw_text_on_image(bboxes, text, pil_img.size, langs, has_math="_math" in langs)
# return rec_img, img_pred
# # ===================== Input Handling =====================
# if "pdf" in filetype:
# try:
# pg_cnt = page_count(in_file)
# except Exception as e:
# st.error(f"خواندن PDF ناموفق بود: {e}")
# st.stop()
# page_number = st.sidebar.number_input("صفحه:", min_value=1, value=1, max_value=pg_cnt)
# pil_image = get_page_image(in_file, page_number)
# else:
# bytes_data = in_file.getvalue()
# temp_dir = "temp_files"
# os.makedirs(temp_dir, exist_ok=True)
# file_path = os.path.join(temp_dir, in_file.name)
# with open(file_path, "wb") as f:
# f.write(bytes_data)
# out_file = os.path.splitext(file_path)[0] + "-1.JPG"
# try:
# if auto_border:
# _ = remove_border(file_path, out_file)
# pil_image = Image.open(out_file).convert("RGB")
# else:
# pil_image = Image.open(file_path).convert("RGB")
# except Exception as e:
# st.warning(f"حذف قاب/بازخوانی تصویر با خطا مواجه شد؛ تصویر اصلی استفاده می‌شود. Error: {e}")
# pil_image = Image.open(file_path).convert("RGB")
# # Auto-rotate if enabled
# pil_image = _apply_auto_rotate(pil_image)
# # ===================== Buttons Logic =====================
# with col1:
# if text_det_btn and DET_AVAILABLE:
# try:
# det_img, det_pred = text_detection(pil_image)
# st.image(det_img, caption="تشخیص متن (Detection)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در تشخیص متن: {e}")
# if layout_det_btn and DET_AVAILABLE:
# try:
# layout_img, layout_pred = layout_detection(pil_image)
# st.image(layout_img, caption="آنالیز صفحه (Layout)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در آنالیز صفحه: {e}")
# if order_det_btn and DET_AVAILABLE:
# try:
# order_img, order_pred = order_detection(pil_image)
# st.image(order_img, caption="ترتیب خوانش (Reading Order)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در ترتیب خوانش: {e}")
# if text_rec_btn:
# try:
# rec_img, ocr_pred = ocr_page(pil_image, languages)
# text_tab, json_tab = st.tabs(["متن صفحه | Page Text", "JSON"])
# with text_tab:
# st.text("\n".join([p.text for p in ocr_pred.text_lines]))
# with json_tab:
# st.json(ocr_pred.model_dump(), expanded=False)
# except Exception as e:
# st.error(f"خطا در بازشناسی متن (Recognition): {e}")
# with col2:
# st.image(pil_image, caption="تصویر ورودی | Input Preview", use_column_width=True)
# app.py — TRUST OCR DEMO (Streamlit)
# Works on Hugging Face Spaces (no permission/XSRF issues)
# import os
# import io
# import tempfile
# from typing import List
# import numpy as np
# import cv2
# from PIL import Image
# import pypdfium2
# import pytesseract
# # -------------------- Safe, writable dirs & config (BEFORE importing streamlit) --------------------
# # Put everything under /tmp (world-writable on Spaces)
# os.environ.setdefault("HOME", "/tmp")
# os.environ.setdefault("STREAMLIT_CONFIG_DIR", "/tmp/.streamlit")
# os.environ.setdefault("STREAMLIT_RUNTIME_DIR", "/tmp/.streamlit")
# os.environ.setdefault("HF_HOME", "/tmp/hf_home")
# for d in (os.environ["STREAMLIT_CONFIG_DIR"], os.environ["STREAMLIT_RUNTIME_DIR"], os.environ["HF_HOME"]):
# os.makedirs(d, exist_ok=True)
# # Create a minimal config.toml to avoid 403 on uploads and reduce telemetry writes
# conf_path = os.path.join(os.environ["STREAMLIT_CONFIG_DIR"], "config.toml")
# if not os.path.exists(conf_path):
# with open(conf_path, "w", encoding="utf-8") as f:
# f.write(
# "[server]\n"
# "enableXsrfProtection = false\n"
# "enableCORS = false\n"
# "maxUploadSize = 200\n"
# "\n[browser]\n"
# "gatherUsageStats = false\n"
# )
# import streamlit as st
# # -------------------- Surya imports (gated) --------------------
# DET_AVAILABLE = True
# try:
# from surya.detection import batch_text_detection
# except Exception:
# DET_AVAILABLE = False
# from surya.layout import batch_layout_detection # we'll gate usage using DET_AVAILABLE
# # Detection model loaders: try newer segformer, fall back to older
# try:
# from surya.model.detection.segformer import load_model as load_det_model, load_processor as load_det_processor
# except Exception:
# from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor
# from surya.model.recognition.model import load_model as load_rec_model
# from surya.model.recognition.processor import load_processor as load_rec_processor
# from surya.model.ordering.model import load_model as load_order_model
# from surya.model.ordering.processor import load_processor as load_order_processor
# from surya.ordering import batch_ordering
# from surya.ocr import run_ocr
# from surya.postprocessing.heatmap import draw_polys_on_image
# from surya.postprocessing.text import draw_text_on_image
# from surya.languages import CODE_TO_LANGUAGE
# from surya.input.langs import replace_lang_with_code
# from surya.schema import OCRResult, TextDetectionResult, LayoutResult, OrderResult
# # ===================== Helper Functions =====================
# def remove_border(image_path: str, output_path: str) -> np.ndarray:
# """Remove outer border & deskew (perspective) if a rectangular contour is found."""
# image = cv2.imread(image_path)
# if image is None:
# raise ValueError(f"Cannot read image: {image_path}")
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# if not contours:
# cv2.imwrite(output_path, image)
# return image
# max_contour = max(contours, key=cv2.contourArea)
# epsilon = 0.02 * cv2.arcLength(max_contour, True)
# approx = cv2.approxPolyDP(max_contour, epsilon, True)
# if len(approx) == 4:
# pts = approx.reshape(4, 2).astype("float32")
# rect = np.zeros((4, 2), dtype="float32")
# s = pts.sum(axis=1)
# rect[0] = pts[np.argmin(s)] # tl
# rect[2] = pts[np.argmax(s)] # br
# diff = np.diff(pts, axis=1)
# rect[1] = pts[np.argmin(diff)] # tr
# rect[3] = pts[np.argmax(diff)] # bl
# (tl, tr, br, bl) = rect
# widthA = np.linalg.norm(br - bl)
# widthB = np.linalg.norm(tr - tl)
# maxWidth = max(int(widthA), int(widthB))
# heightA = np.linalg.norm(tr - br)
# heightB = np.linalg.norm(tl - bl)
# maxHeight = max(int(heightA), int(heightB))
# dst = np.array([[0, 0], [maxWidth - 1, 0],
# [maxWidth - 1, maxHeight - 1],
# [0, maxHeight - 1]], dtype="float32")
# M = cv2.getPerspectiveTransform(rect, dst)
# cropped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
# cv2.imwrite(output_path, cropped)
# return cropped
# else:
# cv2.imwrite(output_path, image)
# return image
# def open_pdf(pdf_file) -> pypdfium2.PdfDocument:
# stream = io.BytesIO(pdf_file.getvalue())
# return pypdfium2.PdfDocument(stream)
# @st.cache_data(show_spinner=False)
# def get_page_image(pdf_file, page_num: int, dpi: int = 96) -> Image.Image:
# doc = open_pdf(pdf_file)
# renderer = doc.render(pypdfium2.PdfBitmap.to_pil, page_indices=[page_num - 1], scale=dpi / 72)
# png = list(renderer)[0]
# return png.convert("RGB")
# @st.cache_data(show_spinner=False)
# def page_count(pdf_file) -> int:
# doc = open_pdf(pdf_file)
# return len(doc)
# # ===================== Streamlit UI =====================
# st.set_page_config(page_title="TRUST OCR DEMO", layout="wide")
# st.markdown("# TRUST OCR DEMO")
# if not DET_AVAILABLE:
# st.warning("⚠️ ماژول تشخیص متن Surya در این محیط در دسترس نیست. OCR کامل کار می‌کند، اما دکمه‌های Detection/Layout/Order غیرفعال شده‌اند.")
# # Sidebar controls
# in_file = st.sidebar.file_uploader("فایل PDF یا عکس :", type=["pdf", "png", "jpg", "jpeg", "gif", "webp"])
# languages = st.sidebar.multiselect(
# "زبان‌ها (Languages)",
# sorted(list(CODE_TO_LANGUAGE.values())),
# default=["Persian"],
# max_selections=4
# )
# auto_rotate = st.sidebar.toggle("چرخش خودکار (Tesseract OSD)", value=True)
# auto_border = st.sidebar.toggle("حذف قاب/کادر تصویر ورودی", value=True)
# # Buttons (disable some if detection missing)
# text_det_btn = st.sidebar.button("تشخیص متن (Detection)", disabled=not DET_AVAILABLE)
# layout_det_btn = st.sidebar.button("آنالیز صفحه (Layout)", disabled=not DET_AVAILABLE)
# order_det_btn = st.sidebar.button("ترتیب خوانش (Reading Order)", disabled=not DET_AVAILABLE)
# text_rec_btn = st.sidebar.button("تبدیل به متن (Recognition)")
# if in_file is None:
# st.info("یک فایل PDF/عکس از سایدبار انتخاب کنید. | Please upload a file to begin.")
# st.stop()
# filetype = in_file.type
# # Two-column layout (left: outputs / right: input image)
# col2, col1 = st.columns([.5, .5])
# # ===================== Load Models (cached) =====================
# @st.cache_resource(show_spinner=True)
# def load_det_cached():
# return load_det_model(checkpoint="vikp/surya_det2"), load_det_processor(checkpoint="vikp/surya_det2")
# @st.cache_resource(show_spinner=True)
# def load_rec_cached():
# """Try private checkpoint first, then fall back to public."""
# checkpoints = [
# "MohammadReza-Halakoo/TrustOCR", # private (requires HUGGINGFACE_HUB_TOKEN if private)
# "vikp/surya_rec2", # public fallback
# ]
# last_err = None
# for ckpt in checkpoints:
# try:
# m = load_rec_model(checkpoint=ckpt)
# p = load_rec_processor(checkpoint=ckpt)
# return m, p
# except Exception as e:
# last_err = e
# st.error(f"Loading recognition checkpoint failed: {last_err}")
# raise last_err
# @st.cache_resource(show_spinner=True)
# def load_layout_cached():
# return load_det_model(checkpoint="vikp/surya_layout2"), load_det_processor(checkpoint="vikp/surya_layout2")
# @st.cache_resource(show_spinner=True)
# def load_order_cached():
# return load_order_model(checkpoint="vikp/surya_order"), load_order_processor(checkpoint="vikp/surya_order")
# # recognition models are enough for run_ocr; detection/layout/order models used only if DET_AVAILABLE
# rec_model, rec_processor = load_rec_cached()
# if DET_AVAILABLE:
# det_model, det_processor = load_det_cached()
# layout_model, layout_processor = load_layout_cached()
# order_model, order_processor = load_order_cached()
# else:
# det_model = det_processor = layout_model = layout_processor = order_model = order_processor = None
# # ===================== High-level Ops =====================
# def _apply_auto_rotate(pil_img: Image.Image) -> Image.Image:
# """Auto-rotate using Tesseract OSD if enabled."""
# if not auto_rotate:
# return pil_img
# try:
# osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT)
# angle = int(osd.get("rotate", 0)) # 0/90/180/270
# if angle and angle % 360 != 0:
# return pil_img.rotate(-angle, expand=True)
# return pil_img
# except Exception as e:
# st.warning(f"OSD rotation failed, continuing without rotation. Error: {e}")
# return pil_img
# def text_detection(pil_img: Image.Image):
# pred: TextDetectionResult = batch_text_detection([pil_img], det_model, det_processor)[0]
# polygons = [p.polygon for p in pred.bboxes]
# det_img = draw_polys_on_image(polygons, pil_img.copy())
# return det_img, pred
# def layout_detection(pil_img: Image.Image):
# _, det_pred = text_detection(pil_img)
# pred: LayoutResult = batch_layout_detection([pil_img], layout_model, layout_processor, [det_pred])[0]
# polygons = [p.polygon for p in pred.bboxes]
# labels = [p.label for p in pred.bboxes]
# layout_img = draw_polys_on_image(polygons, pil_img.copy(), labels=labels, label_font_size=40)
# return layout_img, pred
# def order_detection(pil_img: Image.Image):
# _, layout_pred = layout_detection(pil_img)
# bboxes = [l.bbox for l in layout_pred.bboxes]
# pred: OrderResult = batch_ordering([pil_img], [bboxes], order_model, order_processor)[0]
# polys = [l.polygon for l in pred.bboxes]
# positions = [str(l.position) for l in pred.bboxes]
# order_img = draw_polys_on_image(polys, pil_img.copy(), labels=positions, label_font_size=40)
# return order_img, pred
# def ocr_page(pil_img: Image.Image, langs: List[str]):
# """Full-page OCR using Surya run_ocr — works without detection import."""
# langs = list(langs) if langs else ["Persian"]
# replace_lang_with_code(langs) # in-place
# # If detection/recognition models are loaded, pass them; else rely on Surya defaults
# if det_model and det_processor and rec_model and rec_processor:
# img_pred: OCRResult = run_ocr([pil_img], [langs], det_model, det_processor, rec_model, rec_processor)[0]
# else:
# img_pred: OCRResult = run_ocr([pil_img], [langs])[0]
# bboxes = [l.bbox for l in img_pred.text_lines]
# text = [l.text for l in img_pred.text_lines]
# rec_img = draw_text_on_image(bboxes, text, pil_img.size, langs, has_math="_math" in langs)
# return rec_img, img_pred
# # ===================== Input Handling =====================
# if "pdf" in filetype:
# try:
# pg_cnt = page_count(in_file)
# except Exception as e:
# st.error(f"خواندن PDF ناموفق بود: {e}")
# st.stop()
# page_number = st.sidebar.number_input("صفحه:", min_value=1, value=1, max_value=pg_cnt)
# pil_image = get_page_image(in_file, page_number)
# else:
# bytes_data = in_file.getvalue()
# # use /tmp for writes
# temp_dir = os.path.join(tempfile.gettempdir(), "trustocr_temp")
# os.makedirs(temp_dir, exist_ok=True)
# file_path = os.path.join(temp_dir, in_file.name)
# with open(file_path, "wb") as f:
# f.write(bytes_data)
# out_file = os.path.splitext(file_path)[0] + "-1.JPG"
# try:
# if auto_border:
# _ = remove_border(file_path, out_file)
# pil_image = Image.open(out_file).convert("RGB")
# else:
# pil_image = Image.open(file_path).convert("RGB")
# except Exception as e:
# st.warning(f"حذف قاب/بازخوانی تصویر با خطا مواجه شد؛ تصویر اصلی استفاده می‌شود. Error: {e}")
# pil_image = Image.open(file_path).convert("RGB")
# # Auto-rotate if enabled
# pil_image = _apply_auto_rotate(pil_image)
# # ===================== Buttons Logic =====================
# with col1:
# if text_det_btn and DET_AVAILABLE:
# try:
# det_img, det_pred = text_detection(pil_image)
# st.image(det_img, caption="تشخیص متن (Detection)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در تشخیص متن: {e}")
# if layout_det_btn and DET_AVAILABLE:
# try:
# layout_img, layout_pred = layout_detection(pil_image)
# st.image(layout_img, caption="آنالیز صفحه (Layout)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در آنالیز صفحه: {e}")
# if order_det_btn and DET_AVAILABLE:
# try:
# order_img, order_pred = order_detection(pil_image)
# st.image(order_img, caption="ترتیب خوانش (Reading Order)", use_column_width=True)
# except Exception as e:
# st.error(f"خطا در ترتیب خوانش: {e}")
# if text_rec_btn:
# try:
# rec_img, ocr_pred = ocr_page(pil_image, languages)
# text_tab, json_tab = st.tabs(["متن صفحه | Page Text", "JSON"])
# with text_tab:
# st.text("\n".join([p.text for p in ocr_pred.text_lines]))
# with json_tab:
# st.json(ocr_pred.model_dump(), expanded=False)
# except Exception as e:
# st.error(f"خطا در بازشناسی متن (Recognition): {e}")
# with col2:
# st.image(pil_image, caption="تصویر ورودی | Input Preview", use_column_width=True)
########################################################################################
# app.py — TRUST OCR DEMO (Streamlit) — personal-recognition-only
# app.py — TRUST OCR DEMO (Streamlit) with personal recognition model, safe dirs, eager attention, lazy order
# app.py — TRUST OCR DEMO (Streamlit) — فقط با مدل شخصی شما
# -*- coding: utf-8 -*-
# TRUST OCR DEMO – Streamlit app (Surya OCR + مدل شخصی)
# -*- coding: utf-8 -*-
# TRUST OCR DEMO – Streamlit app (Surya OCR + مدل شخصی)
# -*- coding: utf-8 -*-
# TRUST OCR DEMO – Streamlit app (Surya OCR + مدل شخصی)
import os
import io
import tempfile
import logging
from typing import List
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
import pypdfium2
import pytesseract
# -------------------- Logger --------------------
logger = logging.getLogger("trustocr")
if not logger.handlers:
logging.basicConfig(level=logging.INFO)
# -------------------- Safe dirs & config (قبل از import streamlit) --------------------
# ===== Env =====
HF_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN") or os.getenv("HF_TOKEN")
if not HF_TOKEN:
logger.warning("HF token is not set. Add HUGGINGFACE_HUB_TOKEN in Space Settings → Secrets.")
# دایرکتوری‌های قابل‌نوشتن
os.environ.setdefault("HOME", "/tmp")
os.environ.setdefault("STREAMLIT_CONFIG_DIR", "/tmp/.streamlit")
os.environ.setdefault("STREAMLIT_RUNTIME_DIR", "/tmp/.streamlit")
os.environ.setdefault("HF_HOME", "/tmp/hf_home")
os.environ.setdefault("TRANSFORMERS_CACHE", "/tmp/hf_home")
# جلوگیری از sdpa backend که با Surya ordering ممکن است ناسازگار باشد
os.environ.setdefault("TRANSFORMERS_ATTENTION_BACKEND", "eager")
# مسیرهای استاتیک/کش به /tmp برای جلوگیری از Permission denied
os.environ.setdefault("STREAMLIT_STATIC_DIR", "/tmp/streamlit_static")
os.environ.setdefault("MPLCONFIGDIR", "/tmp/mpl")
for d in (
os.environ["STREAMLIT_CONFIG_DIR"],
os.environ["STREAMLIT_RUNTIME_DIR"],
os.environ["HF_HOME"],
os.environ["STREAMLIT_STATIC_DIR"],
os.environ["MPLCONFIGDIR"],
):
os.makedirs(d, exist_ok=True)
# config.toml مینیمال
conf_path = os.path.join(os.environ["STREAMLIT_CONFIG_DIR"], "config.toml")
if not os.path.exists(conf_path):
with open(conf_path, "w", encoding="utf-8") as f:
f.write(
"[server]\nheadless = true\nenableXsrfProtection = false\nenableCORS = false\nmaxUploadSize = 200\n"
"\n[browser]\ngatherUsageStats = false\n"
)
# توکن HF برای ریپوی خصوصی (اختیاری)
if HF_TOKEN:
os.environ["HUGGINGFACE_HUB_TOKEN"] = HF_TOKEN
try:
from huggingface_hub import login
login(token=HF_TOKEN, add_to_git_credential=False)
logger.info("Logged into Hugging Face hub.")
except Exception as e:
logger.warning(f"HF login skipped/failed: {e}")
import streamlit as st
# -------------------- Surya imports --------------------
DET_AVAILABLE = True
try:
from surya.detection import batch_text_detection
except Exception:
DET_AVAILABLE = False
from surya.layout import batch_layout_detection
# Detection loaders: segformer اولویت دارد
try:
from surya.model.detection.segformer import load_model as load_det_model, load_processor as load_det_processor
except Exception:
from surya.model.detection.model import load_model as load_det_model, load_processor as load_det_processor
from surya.model.recognition.model import load_model as load_rec_model
from surya.model.recognition.processor import load_processor as load_rec_processor
from surya.model.ordering.model import load_model as load_order_model
from surya.model.ordering.processor import load_processor as load_order_processor
from surya.ordering import batch_ordering
from surya.ocr import run_ocr
# مهم: دیگر از surya.postprocessing.* استفاده نمی‌کنیم تا چیزی در site-packages ننویسد
# from surya.postprocessing.heatmap import draw_polys_on_image
# from surya.postprocessing.text import draw_text_on_image
from surya.languages import CODE_TO_LANGUAGE
from surya.input.langs import replace_lang_with_code
from surya.schema import OCRResult, TextDetectionResult, LayoutResult, OrderResult
# ===================== Helper Functions =====================
def remove_border(image_path: str, output_path: str) -> np.ndarray:
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Cannot read image: {image_path}")
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
cv2.imwrite(output_path, image); return image
max_contour = max(contours, key=cv2.contourArea)
epsilon = 0.02 * cv2.arcLength(max_contour, True)
approx = cv2.approxPolyDP(max_contour, epsilon, True)
if len(approx) == 4:
pts = approx.reshape(4, 2).astype("float32")
rect = np.zeros((4, 2), dtype="float32")
s = pts.sum(axis=1)
rect[0] = pts[np.argmin(s)]; rect[2] = pts[np.argmax(s)]
diff = np.diff(pts, axis=1)
rect[1] = pts[np.argmin(diff)]; rect[3] = pts[np.argmax(diff)]
(tl, tr, br, bl) = rect
widthA = np.linalg.norm(br - bl); widthB = np.linalg.norm(tr - tl)
maxWidth = max(int(widthA), int(widthB))
heightA = np.linalg.norm(tr - br); heightB = np.linalg.norm(tl - bl)
maxHeight = max(int(heightA), int(heightB))
dst = np.array([[0,0],[maxWidth-1,0],[maxWidth-1,maxHeight-1],[0,maxHeight-1]], dtype="float32")
M = cv2.getPerspectiveTransform(rect, dst)
cropped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
cv2.imwrite(output_path, cropped); return cropped
else:
cv2.imwrite(output_path, image); return image
def open_pdf(pdf_file) -> pypdfium2.PdfDocument:
stream = io.BytesIO(pdf_file.getvalue())
return pypdfium2.PdfDocument(stream)
@st.cache_data(show_spinner=False)
def get_page_image(pdf_file, page_num: int, dpi: int = 120) -> Image.Image:
doc = open_pdf(pdf_file)
renderer = doc.render(pypdfium2.PdfBitmap.to_pil, page_indices=[page_num-1], scale=dpi/72)
png = list(renderer)[0]
return png.convert("RGB")
@st.cache_data(show_spinner=False)
def page_count(pdf_file) -> int:
doc = open_pdf(pdf_file)
return len(doc)
# ----- رسم سبک خودمان (بدون وابستگی به surya.postprocessing) -----
def _norm_poly(polygon) -> list[tuple[int, int]]:
arr = np.array(polygon).reshape(-1, 2)
return [(int(x), int(y)) for x, y in arr]
def draw_polys_simple(pil_img: Image.Image, polygons, labels=None) -> Image.Image:
"""Draw polygons (and optional labels) using Pillow only. No disk writes."""
img = pil_img.copy()
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
for i, poly in enumerate(polygons):
pts = _norm_poly(poly)
# خطوط چندضلعی
draw.polygon(pts, outline=(0, 255, 0))
# برچسب اختیاری
if labels is not None and i < len(labels):
x, y = pts[0]
draw.text((x, max(0, y - 12)), str(labels[i]), fill=(255, 0, 0), font=font)
return img
# ===================== Streamlit UI =====================
st.set_page_config(page_title="TRUST OCR DEMO", layout="wide")
st.markdown("# TRUST OCR DEMO")
if not DET_AVAILABLE:
st.warning("⚠️ ماژول تشخیص متن Surya در این محیط در دسترس نیست. OCR کامل کار می‌کند، اما دکمه‌های Detection/Layout/Order غیرفعال شده‌اند.")
in_file = st.sidebar.file_uploader("فایل PDF یا عکس :", type=["pdf","png","jpg","jpeg","gif","webp"])
languages = st.sidebar.multiselect("زبان‌ها (Languages)", sorted(list(CODE_TO_LANGUAGE.values())), default=["Persian"], max_selections=4)
auto_rotate = st.sidebar.toggle("چرخش خودکار (Tesseract OSD)", value=True)
auto_border = st.sidebar.toggle("حذف قاب/کادر تصویر ورودی", value=True)
text_det_btn = st.sidebar.button("تشخیص متن (Detection)", disabled=not DET_AVAILABLE)
layout_det_btn = st.sidebar.button("آنالیز صفحه (Layout)", disabled=not DET_AVAILABLE)
order_det_btn = st.sidebar.button("ترتیب خوانش (Reading Order)", disabled=not DET_AVAILABLE)
text_rec_btn = st.sidebar.button("تبدیل به متن (Recognition)")
if in_file is None:
st.info("یک فایل PDF/عکس از سایدبار انتخاب کنید. | Please upload a file to begin.")
st.stop()
filetype = in_file.type
col2, col1 = st.columns([.5, .5])
# ===================== Load Models (cached) =====================
@st.cache_resource(show_spinner=True)
def load_det_cached():
return load_det_model(checkpoint="vikp/surya_det2"), load_det_processor(checkpoint="vikp/surya_det2")
@st.cache_resource(show_spinner=True)
def load_layout_cached():
return load_det_model(checkpoint="vikp/surya_layout2"), load_det_processor(checkpoint="vikp/surya_layout2")
@st.cache_resource(show_spinner=True)
def load_order_cached():
return load_order_model(checkpoint="vikp/surya_order"), load_order_processor(checkpoint="vikp/surya_order")
# ---------- PERSONAL RECOGNITION ONLY ----------
PERSONAL_MODEL_PATH = os.environ.get("TRUSTOCR_PATH") # فولدر لوکال
PERSONAL_HF_REPO = os.environ.get("TRUSTOCR_REPO") # ریپوی مدل HF
@st.cache_resource(show_spinner=True)
def load_rec_personal():
"""
اولویت با مدل شخصی است. اگر تنظیم نبود، به یک مدل عمومی Surya فالبک می‌شود.
اگر فالبک نمی‌خواهی، بخش آخر را حذف کن و به‌جایش RuntimeError بده.
"""
if PERSONAL_MODEL_PATH and os.path.isdir(PERSONAL_MODEL_PATH):
m = load_rec_model(checkpoint=PERSONAL_MODEL_PATH)
p = load_rec_processor() # نسخه Surya شما بدون ورودی است
return m, p
if PERSONAL_HF_REPO:
m = load_rec_model(checkpoint=PERSONAL_HF_REPO)
p = load_rec_processor() # بدون ورودی
return m, p
# --- فالبک اختیاری به مدل عمومی ---
st.warning("⚠️ مدل شخصی تنظیم نشده؛ از مدل عمومی Surya استفاده می‌شود (vikp/surya_rec2).")
m = load_rec_model(checkpoint="vikp/surya_rec2")
p = load_rec_processor() # بدون ورودی
return m, p
# Load all
if DET_AVAILABLE:
det_model, det_processor = load_det_cached()
layout_model, layout_processor = load_layout_cached()
try:
order_model, order_processor = load_order_cached()
except Exception as e:
order_model = order_processor = None
st.warning(f"Ordering غیرفعال شد: {e}")
else:
det_model = det_processor = layout_model = layout_processor = order_model = order_processor = None
rec_model, rec_processor = load_rec_personal()
st.caption(f"Recognition source: {os.environ.get('TRUSTOCR_PATH') or os.environ.get('TRUSTOCR_REPO') or 'vikp/surya_rec2'}")
# ===================== Ops =====================
def _apply_auto_rotate(pil_img: Image.Image) -> Image.Image:
if not auto_rotate:
return pil_img
try:
osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT)
angle = int(osd.get("rotate", 0))
if angle and angle % 360 != 0:
return pil_img.rotate(-angle, expand=True)
return pil_img
except Exception as e:
st.warning(f"OSD rotation failed, continuing without rotation. Error: {e}")
return pil_img
def text_detection(pil_img: Image.Image):
pred: TextDetectionResult = batch_text_detection([pil_img], det_model, det_processor)[0]
polygons = [p.polygon for p in pred.bboxes]
det_img = draw_polys_simple(pil_img, polygons) # ← نسخه سبک خودمان
return det_img, pred
def layout_detection(pil_img: Image.Image):
_, det_pred = text_detection(pil_img)
pred: LayoutResult = batch_layout_detection([pil_img], layout_model, layout_processor, [det_pred])[0]
polygons = [p.polygon for p in pred.bboxes]
labels = [p.label for p in pred.bboxes]
layout_img = draw_polys_simple(pil_img, polygons, labels=labels) # ← نسخه سبک خودمان
return layout_img, pred
def order_detection(pil_img: Image.Image):
if order_model is None or order_processor is None:
raise RuntimeError("Ordering model not available.")
_, layout_pred = layout_detection(pil_img)
bboxes = [l.bbox for l in layout_pred.bboxes]
pred: OrderResult = batch_ordering([pil_img], [bboxes], order_model, order_processor)[0]
polys = [l.polygon for l in pred.bboxes]
positions = [str(l.position) for l in pred.bboxes]
order_img = draw_polys_simple(pil_img, polys, labels=positions) # ← نسخه سبک خودمان
return order_img, pred
def ocr_page(pil_img: Image.Image, langs: List[str]):
langs = list(langs) if langs else ["Persian"]
replace_lang_with_code(langs)
# مهم: دیگر draw_text_on_image نمی‌سازیم تا نیازی به فونت/استاتیک نباشد
if det_model and det_processor and rec_model and rec_processor:
img_pred: OCRResult = run_ocr([pil_img], [langs], det_model, det_processor, rec_model, rec_processor)[0]
else:
img_pred: OCRResult = run_ocr([pil_img], [langs], rec_model=rec_model, rec_processor=rec_processor)[0]
# برای نمایش، فقط متن را می‌گذاریم؛ تصویر چسبانده نمی‌شود تا وابستگی به فونت نباشد
return None, img_pred
# ===================== Input Handling =====================
if "pdf" in filetype:
try:
pg_cnt = page_count(in_file)
except Exception as e:
st.error(f"خواندن PDF ناموفق بود: {e}"); st.stop()
page_number = st.sidebar.number_input("صفحه:", min_value=1, value=1, max_value=pg_cnt)
pil_image = get_page_image(in_file, page_number)
else:
bytes_data = in_file.getvalue()
temp_dir = os.path.join(tempfile.gettempdir(), "trustocr_temp"); os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, in_file.name)
with open(file_path, "wb") as f: f.write(bytes_data)
out_file = os.path.splitext(file_path)[0] + "-1.JPG"
try:
if auto_border:
_ = remove_border(file_path, out_file)
pil_image = Image.open(out_file).convert("RGB")
else:
pil_image = Image.open(file_path).convert("RGB")
except Exception as e:
st.warning(f"حذف قاب/بازخوانی تصویر با خطا؛ تصویر اصلی استفاده می‌شود. Error: {e}")
pil_image = Image.open(file_path).convert("RGB")
# Auto-rotate
pil_image = _apply_auto_rotate(pil_image)
# ===================== Buttons =====================
with col1:
if text_det_btn and DET_AVAILABLE:
try:
det_img, det_pred = text_detection(pil_image)
st.image(det_img, caption="تشخیص متن (Detection)", use_container_width=True)
except Exception as e:
st.error(f"خطا در تشخیص متن: {e}")
if layout_det_btn and DET_AVAILABLE:
try:
layout_img, layout_pred = layout_detection(pil_image)
st.image(layout_img, caption="آنالیز صفحه (Layout)", use_container_width=True)
except Exception as e:
st.error(f"خطا در آنالیز صفحه: {e}")
if order_det_btn and DET_AVAILABLE:
try:
order_img, order_pred = order_detection(pil_image)
st.image(order_img, caption="ترتیب خوانش (Reading Order)", use_container_width=True)
except Exception as e:
st.error(f"خطا در ترتیب خوانش: {e}")
if text_rec_btn:
try:
rec_img, ocr_pred = ocr_page(pil_image, languages)
text_tab, json_tab = st.tabs(["متن صفحه | Page Text", "JSON"])
with text_tab:
st.text("\n".join([p.text for p in ocr_pred.text_lines]))
with json_tab:
st.json(ocr_pred.model_dump(), expanded=False)
except Exception as e:
st.error(f"خطا در بازشناسی متن (Recognition): {e}")
with col2:
st.image(pil_image, caption="تصویر ورودی | Input Preview", use_container_width=True)