############################## # 1) 기존 PDF 처리 코드 ############################## import base64 import json import os import time import zipfile from pathlib import Path import re import uuid import pymupdf # 원래 코드에 있던 os.system() 호출들 os.system('pip uninstall -y magic-pdf') os.system('pip install git+https://github.com/opendatalab/MinerU.git@dev') os.system('wget https://github.com/opendatalab/MinerU/raw/dev/scripts/download_models_hf.py -O download_models_hf.py') os.system('python download_models_hf.py') with open('/home/user/magic-pdf.json', 'r') as file: data = json.load(file) data['device-mode'] = "cuda" if os.getenv('apikey'): data['llm-aided-config']['title_aided']['api_key'] = os.getenv('apikey') data['llm-aided-config']['title_aided']['enable'] = True with open('/home/user/magic-pdf.json', 'w') as file: json.dump(data, file, indent=4) os.system('cp -r paddleocr /home/user/.paddleocr') # from gradio_pdf import PDF # PDF 미리보기를 위한 컴포넌트이지만, 지금은 숨길 예정 import gradio as gr from loguru import logger from magic_pdf.data.data_reader_writer import FileBasedDataReader from magic_pdf.libs.hash_utils import compute_sha256 from magic_pdf.tools.common import do_parse, prepare_env def create_css(): return """ /* 전체 스타일 */ .gradio-container { background: linear-gradient(135deg, #EFF6FF 0%, #F5F3FF 100%); max-width: 1200px !important; margin: 0 auto !important; padding: 2rem !important; } /* 제목 스타일 */ .title-area { text-align: center; margin-bottom: 2rem; padding: 1rem; background: white; border-radius: 1rem; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); } .title-area h1 { background: linear-gradient(90deg, #2563EB 0%, #7C3AED 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5rem; font-weight: bold; margin-bottom: 0.5rem; } .title-area p { color: #6B7280; font-size: 1.1rem; } /* 숨길 예정인 컴포넌트 스타일 */ .invisible { display: none !important; } """ def read_fn(path): disk_rw = FileBasedDataReader(os.path.dirname(path)) return disk_rw.read(os.path.basename(path)) def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_enable, table_enable, language): os.makedirs(output_dir, exist_ok=True) try: file_name = f"{str(Path(doc_path).stem)}_{time.time()}" pdf_data = read_fn(doc_path) if is_ocr: parse_method = "ocr" else: parse_method = "auto" local_image_dir, local_md_dir = prepare_env(output_dir, file_name, parse_method) do_parse( output_dir, file_name, pdf_data, [], parse_method, False, end_page_id=end_page_id, layout_model=layout_mode, formula_enable=formula_enable, table_enable=table_enable, lang=language, f_dump_orig_pdf=False, ) return local_md_dir, file_name except Exception as e: logger.exception(e) def compress_directory_to_zip(directory_path, output_zip_path): try: with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, directory_path) zipf.write(file_path, arcname) return 0 except Exception as e: logger.exception(e) return -1 def image_to_base64(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def replace_image_with_base64(markdown_text, image_dir_path): pattern = r'\!\[(?:[^\]]*)\]\(([^)]+)\)' def replace(match): relative_path = match.group(1) full_path = os.path.join(image_dir_path, relative_path) base64_image = image_to_base64(full_path) return f"![{relative_path}](data:image/jpeg;base64,{base64_image})" return re.sub(pattern, replace, markdown_text) def to_pdf(file_path): """ PDF가 아닌 경우(예: PNG, JPG 파일)에도 pymupdf를 이용하여 PDF로 변환하기 위한 함수. """ with pymupdf.open(file_path) as f: if f.is_pdf: return file_path else: pdf_bytes = f.convert_to_pdf() unique_filename = f"{uuid.uuid4()}.pdf" tmp_file_path = os.path.join(os.path.dirname(file_path), unique_filename) with open(tmp_file_path, 'wb') as tmp_pdf_file: tmp_pdf_file.write(pdf_bytes) return tmp_file_path def to_markdown(file_path, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language): """ 파일을 받아 최대 end_pages 페이지까지 마크다운 추출 후, base64 이미지가 포함된 md_content를 반환. """ file_path = to_pdf(file_path) if end_pages > 20: end_pages = 20 local_md_dir, file_name = parse_pdf(file_path, './output', end_pages - 1, is_ocr, layout_mode, formula_enable, table_enable, language) archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip") zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path) if zip_archive_success == 0: logger.info("압축 성공") else: logger.error("압축 실패") md_path = os.path.join(local_md_dir, file_name + ".md") with open(md_path, 'r', encoding='utf-8') as f: txt_content = f.read() md_content = replace_image_with_base64(txt_content, local_md_dir) # new_pdf_path = os.path.join(local_md_dir, file_name + "_layout.pdf") # 원래 pdf 미리보기용 return md_content # base64 이미지가 포함된 최종 마크다운 텍스트만 반환 latex_delimiters = [ {"left": "$$", "right": "$$", "display": True}, {"left": '$', "right": '$', "display": False} ] def init_model(): """ magic_pdf의 모델을 미리 초기화. """ from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton try: model_manager = ModelSingleton() txt_model = model_manager.get_model(False, False) logger.info(f"txt_model init final") ocr_model = model_manager.get_model(True, False) logger.info(f"ocr_model init final") return 0 except Exception as e: logger.exception(e) return -1 model_init = init_model() logger.info(f"model_init: {model_init}") latin_lang = [ 'af', 'az', 'bs', 'cs', 'cy', 'da', 'de', 'es', 'et', 'fr', 'ga', 'hr', 'hu', 'id', 'is', 'it', 'ku', 'la', 'lt', 'lv', 'mi', 'ms', 'mt', 'nl', 'no', 'oc', 'pi', 'pl', 'pt', 'ro', 'rs_latin', 'sk', 'sl', 'sq', 'sv', 'sw', 'tl', 'tr', 'uz', 'vi', 'french', 'german' ] arabic_lang = ['ar', 'fa', 'ug', 'ur'] cyrillic_lang = [ 'ru', 'rs_cyrillic', 'be', 'bg', 'uk', 'mn', 'abq', 'ady', 'kbd', 'ava', 'dar', 'inh', 'che', 'lbe', 'lez', 'tab' ] devanagari_lang = [ 'hi', 'mr', 'ne', 'bh', 'mai', 'ang', 'bho', 'mah', 'sck', 'new', 'gom', 'sa', 'bgc' ] other_lang = ['ch', 'en', 'korean', 'japan', 'chinese_cht', 'ta', 'te', 'ka'] all_lang = ['', 'auto'] all_lang.extend([*other_lang, *latin_lang, *arabic_lang, *cyrillic_lang, *devanagari_lang]) ############################## # 2) Gemini LLM 챗 코드 ############################## # (중복 import이지만 "누락 없이" 출력해야 하므로 주석 처리) # import os # import gradio as gr from gradio import ChatMessage from typing import Iterator import google.generativeai as genai import time # get Gemini API Key from the environ variable GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) # we will be using the Gemini 2.0 Flash model with Thinking capabilities model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-1219") def format_chat_history(messages: list) -> list: """ Formats the chat history into a structure Gemini can understand """ formatted_history = [] for message in messages: # Skip thinking messages (messages with metadata) if not (message.get("role") == "assistant" and "metadata" in message): formatted_history.append({ "role": "user" if message.get("role") == "user" else "assistant", "parts": [message.get("content", "")] }) return formatted_history def stream_gemini_response(user_message: str, messages: list) -> Iterator[list]: """ Streams thoughts and response with conversation history support for text input only. """ if not user_message.strip(): messages.append(ChatMessage(role="assistant", content="Please provide a non-empty text message. Empty input is not allowed.")) yield messages return try: print(f"\n=== New Request (Text) ===") print(f"User message: {user_message}") chat_history = format_chat_history(messages) chat = model.start_chat(history=chat_history) response = chat.send_message(user_message, stream=True) thought_buffer = "" response_buffer = "" thinking_complete = False messages.append( ChatMessage( role="assistant", content="", metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) ) for chunk in response: parts = chunk.candidates[0].content.parts current_chunk = parts[0].text if len(parts) == 2 and not thinking_complete: # Complete thought and start response thought_buffer += current_chunk print(f"\n=== Complete Thought ===\n{thought_buffer}") messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) yield messages # Start response response_buffer = parts[1].text print(f"\n=== Starting Response ===\n{response_buffer}") messages.append( ChatMessage( role="assistant", content=response_buffer ) ) thinking_complete = True elif thinking_complete: response_buffer += current_chunk print(f"\n=== Response Chunk ===\n{current_chunk}") messages[-1] = ChatMessage( role="assistant", content=response_buffer ) else: thought_buffer += current_chunk print(f"\n=== Thinking Chunk ===\n{current_chunk}") messages[-1] = ChatMessage( role="assistant", content=thought_buffer, metadata={"title": "⚙️ Thinking: *The thoughts produced by the model are experimental"} ) # time.sleep(0.05) #Optional debugging delay yield messages print(f"\n=== Final Response ===\n{response_buffer}") except Exception as e: print(f"\n=== Error ===\n{str(e)}") messages.append( ChatMessage( role="assistant", content=f"I apologize, but I encountered an error: {str(e)}" ) ) yield messages def user_message(msg: str, history: list) -> tuple[str, list]: """Adds user message to chat history""" history.append(ChatMessage(role="user", content=msg)) return "", history ###################################################### # 3) 통합 Gradio 앱 구성 # - PDF 업로드만 보이게 하고(나머지는 hidden) # - 업로드 후 "변환" 버튼 클릭 시, 마크다운을 만들어 # Chatbot과 대화할 수 있도록 전달 ###################################################### with gr.Blocks(title="통합 OCR & Gemini Chat", css=create_css(), theme=gr.themes.Soft(primary_hue="teal", secondary_hue="slate", neutral_hue="neutral")) as demo: gr.HTML("""

OCR FLEX + Gemini Chat

PDF/이미지 -> 텍스트(마크다운) 변환 후, LLM Gemini와 대화

""") # 내부 상태(마크다운 텍스트) md_state = gr.State("") chat_history = gr.State([]) # Gemini 챗 기록 상태 # 1) 파일 업로드 UI with gr.Row(): file = gr.File( label="PDF 또는 이미지 파일 업로드", file_types=[".pdf", ".png", ".jpeg", ".jpg"], interactive=True ) convert_btn = gr.Button( "변환", elem_classes="primary-button" ) # 2) 원래 존재하던 슬라이더, 체크박스 등은 전부 hidden max_pages = gr.Slider( 1, 20, 10, step=1, label='최대 변환 페이지 수', elem_classes="invisible", visible=False ) layout_mode = gr.Dropdown( ["layoutlmv3", "doclayout_yolo"], label="레이아웃 모델", value="doclayout_yolo", elem_classes="invisible", visible=False ) language = gr.Dropdown( all_lang, label="언어", value='auto', elem_classes="invisible", visible=False ) formula_enable = gr.Checkbox( label="수식 인식 활성화", value=True, elem_classes="invisible", visible=False ) is_ocr = gr.Checkbox( label="OCR 강제 활성화", value=False, elem_classes="invisible", visible=False ) table_enable = gr.Checkbox( label="표 인식 활성화(테스트)", value=True, elem_classes="invisible", visible=False ) # 3) 출력 결과(파일, 마크다운 등)도 숨김 # 필요하면 주석 해제하여 확인 가능 # output_file = gr.File( # label="변환 결과", # interactive=False, # visible=False # ) # md = gr.Markdown( # label="마크다운 렌더링", # visible=False # ) # md_text = gr.TextArea( # lines=45, # visible=False # ) # pdf_show = PDF( # label='PDF 미리보기', # interactive=False, # visible=False, # height=800 # ) # 4) 파일 업로드 -> '변환' 버튼 클릭시 동작: # to_markdown 함수를 통해 md_state에 마크다운 저장 convert_btn.click( fn=to_markdown, inputs=[file, max_pages, is_ocr, layout_mode, formula_enable, table_enable, language], outputs=md_state ) # ========================== # Gemini Chat 부분 # ========================== gr.Markdown("## Gemini 2.0 Flash (With Thinking) Chat") chatbot = gr.Chatbot( label="Gemini2.0 Chatbot (Streaming Output)", render_markdown=True, height=400 ) with gr.Row(): chat_input = gr.Textbox( lines=1, label="질문 입력", placeholder="추출된 문서(마크다운 내용)에 대해 궁금한 점을 물어보세요..." ) clear_button = gr.Button("대화 초기화") # 사용자가 질문 -> user_message -> Gemini 처리 -> stream_gemini_response def user_message_wrapper(msg, history, doc_text): """ 사용자가 입력할 때마다, doc_text(마크다운)를 참고하도록 질문을 약간 변형해서 history에 추가하는 방식(간단 예시). """ if not doc_text: # 아직 변환된 문서가 없다면 그냥 질문 user_query = msg else: # 문서 내용(doc_text)을 "참고" 요청하는 간단 프롬프트 예시 user_query = f"다음 문서를 참고하여 답변:\n\n{doc_text}\n\n질문: {msg}" history.append(ChatMessage(role="user", content=user_query)) return "", history chat_input.submit( fn=user_message_wrapper, inputs=[chat_input, chat_history, md_state], outputs=[chat_input, chat_history] ).then( fn=stream_gemini_response, inputs=[chat_input, chat_history], outputs=chat_history ).then( fn=lambda h: h, inputs=chat_history, outputs=chatbot ) clear_button.click( fn=lambda: ([], ""), inputs=[], outputs=[chat_history, md_state] ).then( fn=lambda: [], inputs=[], outputs=chatbot ) ############################## # 4) 실제 실행 ############################## if __name__ == "__main__": # 첫 번째 demo.launch() - 통합 앱 실행 demo.launch(ssr_mode=True, debug=True) ############################################### # 아래는 "Gemini 챗 코드" 원본에 있던 # 별도의 demo.launch() 부분 (누락 없이 주석 보존) ############################################### # if __name__ == "__main__": # demo.launch(debug=True)