Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# app.py | |
import io | |
import os | |
import re | |
import base64 | |
import glob | |
import logging | |
import random | |
import shutil | |
import time | |
import zipfile | |
import json | |
import asyncio | |
from pathlib import Path | |
from datetime import datetime | |
from typing import Any, List, Dict, Optional | |
import pandas as pd | |
import pytz | |
import streamlit as st | |
import aiofiles | |
import requests | |
from PIL import Image, ImageDraw, UnidentifiedImageError | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.utils import ImageReader | |
from reportlab.lib.pagesizes import letter | |
import fitz # PyMuPDF | |
from huggingface_hub import InferenceClient | |
from huggingface_hub.utils import RepositoryNotFoundError, GatedRepoError | |
# Optional AI/ML imports | |
try: | |
import torch | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
AutoProcessor, | |
AutoModelForVision2Seq, | |
pipeline | |
) | |
_transformers_available = True | |
except ImportError: | |
_transformers_available = False | |
try: | |
from diffusers import StableDiffusionPipeline | |
_diffusers_available = True | |
except ImportError: | |
_diffusers_available = False | |
# --- Page Configuration --- | |
st.set_page_config( | |
page_title="Vision & Layout Titans (HF) 🚀🖼️", | |
page_icon="🤖", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
menu_items={ | |
'Get Help': 'https://huggingface.co/docs', | |
'About': "Combined App: Image→PDF Layout + HF AI Tools 🌌" | |
} | |
) | |
# --- Logging Setup --- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
log_records: List[logging.LogRecord] = [] | |
class LogCaptureHandler(logging.Handler): | |
def emit(self, record): | |
log_records.append(record) | |
logger.addHandler(LogCaptureHandler()) | |
# --- Constants & Defaults --- | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
DEFAULT_PROVIDER = "hf-inference" | |
FEATURED_MODELS_LIST = [ | |
"meta-llama/Meta-Llama-3.1-8B-Instruct", | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"google/gemma-2-9b-it", | |
"Qwen/Qwen2-7B-Instruct", | |
"microsoft/Phi-3-mini-4k-instruct", | |
"HuggingFaceH4/zephyr-7b-beta", | |
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
"HuggingFaceTB/SmolLM-1.7B-Instruct" | |
] | |
# --- Session State Initialization --- | |
def _init_state(key: str, default: Any): | |
if key not in st.session_state: | |
st.session_state[key] = default | |
for k, v in { | |
'layout_snapshots': [], | |
'layout_new_uploads': [], | |
'layout_last_capture': None, | |
'history': [], | |
'processing': {}, | |
'asset_checkboxes': {}, | |
'downloaded_pdfs': {}, | |
'unique_counter': 0, | |
'cam0_file': None, | |
'cam1_file': None, | |
'characters': [], | |
'char_form_reset_key': 0, | |
'gallery_size': 10, | |
'hf_inference_client': None, | |
'hf_provider': DEFAULT_PROVIDER, | |
'hf_custom_key': "", | |
'hf_selected_api_model': FEATURED_MODELS_LIST[0], | |
'hf_custom_api_model': "", | |
'local_models': {}, | |
'selected_local_model_path': None, | |
'gen_max_tokens': 512, | |
'gen_temperature': 0.7, | |
'gen_top_p': 0.95, | |
'gen_frequency_penalty': 0.0, | |
'gen_seed': -1 | |
}.items(): | |
_init_state(k, v) | |
# --- Utility Functions --- | |
def generate_filename(seq: str, ext: str = "png") -> str: | |
ts = time.strftime('%Y%m%d_%H%M%S') | |
safe = re.sub(r'[^\w\-]+', '_', seq) | |
return f"{safe}_{ts}.{ext}" | |
def clean_stem(fn: str) -> str: | |
return os.path.splitext(os.path.basename(fn))[0].replace('-', ' ').replace('_', ' ').title() | |
def get_download_link(path: str, mime: str, label: str = "Download") -> str: | |
if not os.path.exists(path): return f"{label} (not found)" | |
data = open(path,'rb').read() | |
b64 = base64.b64encode(data).decode() | |
return f'<a href="data:{mime};base64,{b64}" download="{os.path.basename(path)}">{label}</a>' | |
def get_gallery_files(types: List[str] = ['png','jpg','jpeg','pdf','md','txt']) -> List[str]: | |
files = set() | |
for ext in types: | |
files.update(glob.glob(f"*.{ext}")) | |
files.update(glob.glob(f"*.{ext.upper()}")) | |
return sorted(files) | |
# Delete with rerun | |
def delete_asset(path: str): | |
try: | |
os.remove(path) | |
st.session_state['asset_checkboxes'].pop(path, None) | |
if path in st.session_state['layout_snapshots']: | |
st.session_state['layout_snapshots'].remove(path) | |
st.toast(f"Deleted {os.path.basename(path)}", icon="✅") | |
except OSError as e: | |
st.error(f"Delete failed: {e}") | |
st.rerun() | |
# Sidebar gallery updater | |
def update_gallery(): | |
st.sidebar.markdown("### Asset Gallery 📸📖") | |
files = get_gallery_files() | |
if not files: | |
st.sidebar.info("No assets.") | |
return | |
st.sidebar.caption(f"Found {len(files)} assets.") | |
for f in files[:st.session_state['gallery_size']]: | |
name = os.path.basename(f) | |
ext = os.path.splitext(f)[1].lower() | |
st.sidebar.markdown(f"**{name}**") | |
with st.sidebar.expander("Preview", expanded=False): | |
try: | |
if ext in ['.png','.jpg','.jpeg']: | |
st.image(Image.open(f), use_container_width=True) | |
elif ext == '.pdf': | |
doc = fitz.open(f) | |
if doc.page_count: | |
pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5,0.5)) | |
img = Image.frombytes('RGB',[pix.width,pix.height],pix.samples) | |
st.image(img, use_container_width=True) | |
doc.close() | |
else: | |
txt = Path(f).read_text(errors='ignore') | |
st.code(txt[:200]+'…') | |
except: | |
st.warning("Preview error") | |
c1,c2,c3 = st.sidebar.columns(3) | |
sel = st.session_state['asset_checkboxes'].get(f, False) | |
c1.checkbox("Select", value=sel, key=f"cb_{f}") | |
st.session_state['asset_checkboxes'][f] = st.session_state.get(f"cb_{f}") | |
mime = {'png':'image/png','jpg':'image/jpeg','jpeg':'image/jpeg','pdf':'application/pdf','md':'text/markdown','txt':'text/plain'}.get(ext[1:], 'application/octet-stream') | |
with open(f,'rb') as fp: | |
c2.download_button("📥", data=fp, file_name=name, mime=mime, key=f"dl_{f}") | |
c3.button("🗑️", key=f"del_{f}", on_click=delete_asset, args=(f,)) | |
st.sidebar.markdown("---") | |
# --- PDF Snapshot & Generation --- | |
async def process_pdf_snapshot(path: str, mode: str='single', resF: float=2.0) -> List[str]: | |
status = st.empty() | |
status.text("Snapshot start...") | |
out_files: List[str] = [] | |
try: | |
doc = fitz.open(path) | |
mat = fitz.Matrix(resF,resF) | |
cnt = {'single':1,'twopage':2,'allpages':len(doc)}.get(mode,1) | |
for i in range(min(cnt,len(doc))): | |
s = time.time() | |
page = doc[i] | |
pix = page.get_pixmap(matrix=mat) | |
base = os.path.splitext(os.path.basename(path))[0] | |
fname = generate_filename(f"{base}_pg{i+1}_{mode}","png") | |
await asyncio.to_thread(pix.save, fname) | |
out_files.append(fname) | |
status.text(f"Saved {fname} ({int(time.time()-s)}s)") | |
doc.close() | |
status.success(f"Snapshot done: {len(out_files)} files") | |
except Exception as e: | |
status.error(f"Snapshot error: {e}") | |
for f in out_files: | |
if os.path.exists(f): os.remove(f) | |
out_files = [] | |
return out_files | |
from reportlab.lib.pagesizes import letter | |
def make_image_sized_pdf(sources: List[Any]) -> Optional[bytes]: | |
# dedupe | |
seen, uniq = set(), [] | |
for s in sources: | |
key = s if isinstance(s,str) else getattr(s,'name',None) | |
if key and key not in seen: | |
seen.add(key) | |
uniq.append(s) | |
if not uniq: | |
st.warning("No images for PDF") | |
return None | |
buf = io.BytesIO() | |
c = canvas.Canvas(buf, pagesize=letter) | |
status = st.empty() | |
for idx,s in enumerate(uniq,1): | |
try: | |
img = Image.open(s) if isinstance(s,str) else Image.open(s) | |
w,h = img.size | |
cap = 30 | |
c.setPageSize((w,h+cap)) | |
c.drawImage(ImageReader(img),0,cap,w,h,mask='auto') | |
cap_txt = clean_stem(s if isinstance(s,str) else s.name) | |
c.setFont('Helvetica',12) | |
c.drawCentredString(w/2,cap/2,cap_txt) | |
c.setFont('Helvetica',8) | |
c.drawRightString(w-10,10,str(idx)) | |
c.showPage() | |
status.text(f"Page {idx}/{len(uniq)} added") | |
except Exception as e: | |
status.error(f"Error page {idx}: {e}") | |
c.save() | |
buf.seek(0) | |
return buf.getvalue() | |
# --- HF Inference Client --- | |
def get_hf_client() -> Optional[InferenceClient]: | |
provider = st.session_state['hf_provider'] | |
token = st.session_state['hf_custom_key'].strip() or HF_TOKEN | |
if provider!='hf-inference' and not token: | |
st.error(f"Provider {provider} needs token") | |
return None | |
client = st.session_state['hf_inference_client'] | |
if not client: | |
st.session_state['hf_inference_client'] = InferenceClient(token=token, provider=provider) | |
return st.session_state['hf_inference_client'] | |
# --- HF Processing --- | |
def process_text_hf(text: str, prompt: str, use_api: bool) -> str: | |
stp = st.empty(); stp.text("Processing...") | |
msgs = [{"role":"system","content":"You are an assistant."}, | |
{"role":"user","content":f"{prompt}\n\n{text}"}] | |
out = "" | |
if use_api: | |
client = get_hf_client() | |
if not client: return "Client error" | |
model = st.session_state['hf_custom_api_model'] or st.session_state['hf_selected_api_model'] | |
try: | |
resp = client.chat_completion( | |
model=model, | |
messages=msgs, | |
max_tokens=st.session_state['gen_max_tokens'], | |
temperature=st.session | |
]}]} | |