import base64 import json import os import time import zipfile from pathlib import Path import re import uuid import pymupdf ############################### # 환경 설정 ############################### os.system('pip uninstall -y magic-pdf') os.system('pip install git+https://github.com/opendatalab/MinerU.git@dev') os.system('wget https://github.com/opendatalab/MinerU/raw/dev/scripts/download_models_hf.py -O download_models_hf.py') os.system('python download_models_hf.py') with open('/home/user/magic-pdf.json', 'r') as file: data = json.load(file) data['device-mode'] = "cuda" if os.getenv('apikey'): data['llm-aided-config']['title_aided']['api_key'] = os.getenv('apikey') data['llm-aided-config']['title_aided']['enable'] = True with open('/home/user/magic-pdf.json', 'w') as file: json.dump(data, file, indent=4) os.system('cp -r paddleocr /home/user/.paddleocr') ############################### # 그 외 라이브러리 ############################### import gradio as gr from loguru import logger from gradio_pdf import PDF ############################### # magic_pdf 관련 모듈 ############################### from magic_pdf.data.data_reader_writer import FileBasedDataReader from magic_pdf.libs.hash_utils import compute_sha256 from magic_pdf.tools.common import do_parse, prepare_env ############################### # 공통 함수들 ############################### def create_css(): """ 기본 CSS 스타일. """ return """ .gradio-container { width: 100vw !important; min-height: 100vh !important; margin: 0 !important; padding: 0 !important; background: linear-gradient(135deg, #EFF6FF 0%, #F5F3FF 100%); display: flex; flex-direction: column; overflow-y: auto !important; } .title-area { text-align: center; margin: 1rem auto; padding: 1rem; background: white; border-radius: 1rem; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); max-width: 800px; } .title-area h1 { background: linear-gradient(90deg, #2563EB 0%, #7C3AED 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5rem; font-weight: bold; margin-bottom: 0.5rem; } .title-area p { color: #6B7280; font-size: 1.1rem; } .invisible { display: none !important; } .gr-block, .gr-box { padding: 0.5rem !important; } """ def read_fn(path): disk_rw = FileBasedDataReader(os.path.dirname(path)) return disk_rw.read(os.path.basename(path)) def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_enable, table_enable, language): os.makedirs(output_dir, exist_ok=True) try: file_name = f"{str(Path(doc_path).stem)}_{time.time()}" pdf_data = read_fn(doc_path) parse_method = "ocr" if is_ocr else "auto" local_image_dir, local_md_dir = prepare_env(output_dir, file_name, parse_method) do_parse( output_dir, file_name, pdf_data, [], parse_method, False, end_page_id=end_page_id, layout_model=layout_mode, formula_enable=formula_enable, table_enable=table_enable, lang=language, f_dump_orig_pdf=False ) return local_md_dir, file_name except Exception as e: logger.exception(e) def compress_directory_to_zip(directory_path, output_zip_path): try: with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, directory_path) zipf.write(file_path, arcname) return 0 except Exception as e: logger.exception(e) return -1 def image_to_base64(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def replace_image_with_base64(markdown_text, image_dir_path): pattern = r'\!\[(?:[^\]]*)\]\(([^)]+)\)' def replace(match): relative_path = match.group(1) full_path = os.path.join(image_dir_path, relative_path) base64_image = image_to_base64(full_path) return f"" return re.sub(pattern, replace, markdown_text) def to_pdf(file_path): """ 이미지(JPG/PNG 등)를 PDF로 컨버팅. """ with pymupdf.open(file_path) as f: if f.is_pdf: return file_path else: pdf_bytes = f.convert_to_pdf() unique_filename = f"{uuid.uuid4()}.pdf" tmp_file_path = os.path.join(os.path.dirname(file_path), unique_filename) with open(tmp_file_path, 'wb') as tmp_pdf_file: tmp_pdf_file.write(pdf_bytes) return tmp_file_path def to_markdown(file_path, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language, progress=gr.Progress(track_tqdm=False)): """ 업로드된 PDF/이미지 -> PDF 변환 -> 마크다운 변환 (프로그레스 바 표시용) """ progress(0, "PDF로 변환 중...") file_path = to_pdf(file_path) time.sleep(0.5) if end_pages > 20: end_pages = 20 progress(20, "문서 파싱 중...") local_md_dir, file_name = parse_pdf(file_path, './output', end_pages - 1, is_ocr, layout_mode, formula_enable, table_enable, language) time.sleep(0.5) progress(50, "압축(zip) 생성 중...") archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip") zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path) if zip_archive_success == 0: logger.info("압축 성공") else: logger.error("압축 실패") time.sleep(0.5) progress(70, "마크다운 읽는 중...") md_path = os.path.join(local_md_dir, file_name + ".md") with open(md_path, 'r', encoding='utf-8') as f: txt_content = f.read() time.sleep(0.5) progress(90, "이미지 base64 변환 중...") md_content = replace_image_with_base64(txt_content, local_md_dir) time.sleep(0.5) progress(100, "변환 완료!") return md_content def init_model(): """ magic-pdf 모델 초기화 """ from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton try: model_manager = ModelSingleton() txt_model = model_manager.get_model(False, False) logger.info("txt_model init final") ocr_model = model_manager.get_model(True, False) logger.info("ocr_model init final") return 0 except Exception as e: logger.exception(e) return -1 model_init = init_model() logger.info(f"model_init: {model_init}") ############################### # 언어 목록 ############################### latin_lang = [ 'af','az','bs','cs','cy','da','de','es','et','fr','ga','hr','hu','id','is','it','ku', 'la','lt','lv','mi','ms','mt','nl','no','oc','pi','pl','pt','ro','rs_latin','sk','sl', 'sq','sv','sw','tl','tr','uz','vi','french','german' ] arabic_lang = ['ar','fa','ug','ur'] cyrillic_lang = ['ru','rs_cyrillic','be','bg','uk','mn','abq','ady','kbd','ava','dar','inh','che','lbe','lez','tab'] devanagari_lang = ['hi','mr','ne','bh','mai','ang','bho','mah','sck','new','gom','sa','bgc'] other_lang = ['ch','en','korean','japan','chinese_cht','ta','te','ka'] all_lang = ['', 'auto'] all_lang.extend([*other_lang, *latin_lang, *arabic_lang, *cyrillic_lang, *devanagari_lang]) ############################### # (1) PDF Chat 용 LLM 관련 ############################### import google.generativeai as genai from gradio import ChatMessage from typing import Iterator GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-1219") def format_chat_history(messages: list) -> list: """ Gemini가 이해할 수 있는 (role, parts[]) 형식으로 변환 """ formatted_history = [] for message in messages: if not (message.role == "assistant" and hasattr(message, "metadata")): formatted_history.append({ "role": "user" if message.role == "user" else "assistant", "parts": [message.content] }) return formatted_history def convert_chat_messages_to_gradio_format(messages): """ ChatMessage list -> [ (유저발화, 봇응답), (...), ... ] """ gradio_chat = [] user_text, assistant_text = None, None for msg in messages: if msg.role == "user": if user_text is not None or assistant_text is not None: gradio_chat.append((user_text or "", assistant_text or "")) user_text = msg.content assistant_text = None else: if user_text is None: user_text = "" if assistant_text is None: assistant_text = msg.content else: assistant_text += msg.content if user_text is not None or assistant_text is not None: gradio_chat.append((user_text or "", assistant_text or "")) return gradio_chat def stream_gemini_response(user_message: str, messages: list) -> Iterator[list]: """ Gemini 응답 스트리밍 (user_message가 공백이면 기본 문구로 대체) """ if not user_message.strip(): user_message = "...(No content from user)..." try: print(f"\n=== [Gemini] New Request ===\nUser message: '{user_message}'") chat_history = format_chat_history(messages) chat = model.start_chat(history=chat_history) response = chat.send_message(user_message, stream=True) thought_buffer = "" response_buffer = "" thinking_complete = False # "Thinking" 역할 messages.append( ChatMessage( role="assistant", content="", metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) ) yield convert_chat_messages_to_gradio_format(messages) for chunk in response: parts = chunk.candidates[0].content.parts current_chunk = parts[0].text # 만약 parts 가 2개라면, parts[0]는 thinking, parts[1]은 최종답변 if len(parts) == 2 and not thinking_complete: thought_buffer += current_chunk messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield convert_chat_messages_to_gradio_format(messages) response_buffer = parts[1].text messages.append(ChatMessage(role="assistant", content=response_buffer)) thinking_complete = True elif thinking_complete: # 이미 최종답변 중 response_buffer += current_chunk messages[-1] = ChatMessage(role="assistant", content=response_buffer) else: # 아직 thinking 중 thought_buffer += current_chunk messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield convert_chat_messages_to_gradio_format(messages) print(f"\n=== [Gemini] Final Response ===\n{response_buffer}") except Exception as e: print(f"\n=== [Gemini] Error ===\n{str(e)}") messages.append(ChatMessage(role="assistant", content=f"I encountered an error: {str(e)}")) yield convert_chat_messages_to_gradio_format(messages) def user_message(msg: str, history: list, doc_text: str) -> tuple[str, list]: """ doc_text(마크다운) 사용해 질문 자동 변형 """ if doc_text.strip(): user_query = f"다음 문서를 참고하여 답변:\n\n{doc_text}\n\n질문: {msg}" else: user_query = msg history.append(ChatMessage(role="user", content=user_query)) return "", history def reset_states(_): """ 새 파일 업로드 시 - chat_history -> 빈 리스트 - md_state -> 빈 문자열 - chatbot -> 빈 list of tuples """ return [], "", [] ############################### # (2) OCR FLEX 전용 (스니펫) ############################### # 별도의 LaTeX 설정 latex_delimiters = [ {"left": "$$", "right": "$$", "display": True}, {"left": '$', "right": '$', "display": False} ] def to_markdown_ocr_flex(file_path, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language): """ 스니펫에서 사용: 업로드된 PDF/이미지를 변환 후 (마크다운 렌더링 / 마크다운 텍스트 / 압축파일 / PDF미리보기) 반환 """ file_path = to_pdf(file_path) if end_pages > 20: end_pages = 20 local_md_dir, file_name = parse_pdf( file_path, './output', end_pages - 1, is_ocr, layout_mode, formula_enable, table_enable, language ) archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip") zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path) if zip_archive_success == 0: logger.info("압축 성공") else: logger.error("압축 실패") md_path = os.path.join(local_md_dir, file_name + ".md") with open(md_path, 'r', encoding='utf-8') as f: txt_content = f.read() md_content = replace_image_with_base64(txt_content, local_md_dir) new_pdf_path = os.path.join(local_md_dir, file_name + "_layout.pdf") return md_content, txt_content, archive_zip_path, new_pdf_path ############################### # UI 통합 ############################### if __name__ == "__main__": with gr.Blocks(title="VisionOCR", css=create_css()) as demo: # 탭 영역 with gr.Tabs(): ######################################################### # Tab (1) : PDF -> Markdown 변환 + Chat ######################################################### with gr.Tab("PDF Chat with LLM"): gr.HTML("""
PDF/이미지 -> 텍스트(마크다운) 변환 후, 추 LLM과 대화
PDF와 이미지에서 텍스트를 빠르고 정확하게 추출하세요