openfree's picture
Update app.py
a1dc3ca verified
raw
history blame
17.6 kB
##############################
# 1) ๊ธฐ์กด PDF ์ฒ˜๋ฆฌ ์ฝ”๋“œ
##############################
import base64
import json
import os
import time
import zipfile
from pathlib import Path
import re
import uuid
import pymupdf
# ์›๋ž˜ ์ฝ”๋“œ์— ์žˆ๋˜ os.system() ํ˜ธ์ถœ๋“ค
os.system('pip uninstall -y magic-pdf')
os.system('pip install git+https://github.com/opendatalab/MinerU.git@dev')
os.system('wget https://github.com/opendatalab/MinerU/raw/dev/scripts/download_models_hf.py -O download_models_hf.py')
os.system('python download_models_hf.py')
with open('/home/user/magic-pdf.json', 'r') as file:
data = json.load(file)
data['device-mode'] = "cuda"
if os.getenv('apikey'):
data['llm-aided-config']['title_aided']['api_key'] = os.getenv('apikey')
data['llm-aided-config']['title_aided']['enable'] = True
with open('/home/user/magic-pdf.json', 'w') as file:
json.dump(data, file, indent=4)
os.system('cp -r paddleocr /home/user/.paddleocr')
# from gradio_pdf import PDF # PDF ๋ฏธ๋ฆฌ๋ณด๊ธฐ๋ฅผ ์œ„ํ•œ ์ปดํฌ๋„ŒํŠธ์ด์ง€๋งŒ, ์ง€๊ธˆ์€ ์ˆจ๊ธธ ์˜ˆ์ •
import gradio as gr
from loguru import logger
from magic_pdf.data.data_reader_writer import FileBasedDataReader
from magic_pdf.libs.hash_utils import compute_sha256
from magic_pdf.tools.common import do_parse, prepare_env
def create_css():
return """
/* ์ „์ฒด ์Šคํƒ€์ผ */
.gradio-container {
background: linear-gradient(135deg, #EFF6FF 0%, #F5F3FF 100%);
max-width: 1200px !important;
margin: 0 auto !important;
padding: 2rem !important;
}
/* ์ œ๋ชฉ ์Šคํƒ€์ผ */
.title-area {
text-align: center;
margin-bottom: 2rem;
padding: 1rem;
background: white;
border-radius: 1rem;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
}
.title-area h1 {
background: linear-gradient(90deg, #2563EB 0%, #7C3AED 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-size: 2.5rem;
font-weight: bold;
margin-bottom: 0.5rem;
}
.title-area p {
color: #6B7280;
font-size: 1.1rem;
}
/* ์ˆจ๊ธธ ์˜ˆ์ •์ธ ์ปดํฌ๋„ŒํŠธ ์Šคํƒ€์ผ */
.invisible {
display: none !important;
}
"""
def read_fn(path):
disk_rw = FileBasedDataReader(os.path.dirname(path))
return disk_rw.read(os.path.basename(path))
def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_enable, table_enable, language):
os.makedirs(output_dir, exist_ok=True)
try:
file_name = f"{str(Path(doc_path).stem)}_{time.time()}"
pdf_data = read_fn(doc_path)
if is_ocr:
parse_method = "ocr"
else:
parse_method = "auto"
local_image_dir, local_md_dir = prepare_env(output_dir, file_name, parse_method)
do_parse(
output_dir,
file_name,
pdf_data,
[],
parse_method,
False,
end_page_id=end_page_id,
layout_model=layout_mode,
formula_enable=formula_enable,
table_enable=table_enable,
lang=language,
f_dump_orig_pdf=False,
)
return local_md_dir, file_name
except Exception as e:
logger.exception(e)
def compress_directory_to_zip(directory_path, output_zip_path):
try:
with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, directory_path)
zipf.write(file_path, arcname)
return 0
except Exception as e:
logger.exception(e)
return -1
def image_to_base64(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def replace_image_with_base64(markdown_text, image_dir_path):
pattern = r'\!\[(?:[^\]]*)\]\(([^)]+)\)'
def replace(match):
relative_path = match.group(1)
full_path = os.path.join(image_dir_path, relative_path)
base64_image = image_to_base64(full_path)
return f"![{relative_path}](data:image/jpeg;base64,{base64_image})"
return re.sub(pattern, replace, markdown_text)
def to_pdf(file_path):
"""
PDF๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ(์˜ˆ: PNG, JPG ํŒŒ์ผ)์—๋„ pymupdf๋ฅผ ์ด์šฉํ•˜์—ฌ PDF๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ ์œ„ํ•œ ํ•จ์ˆ˜.
"""
with pymupdf.open(file_path) as f:
if f.is_pdf:
return file_path
else:
pdf_bytes = f.convert_to_pdf()
unique_filename = f"{uuid.uuid4()}.pdf"
tmp_file_path = os.path.join(os.path.dirname(file_path), unique_filename)
with open(tmp_file_path, 'wb') as tmp_pdf_file:
tmp_pdf_file.write(pdf_bytes)
return tmp_file_path
def to_markdown(file_path, end_pages, is_ocr, layout_mode, formula_enable, table_enable, language):
"""
ํŒŒ์ผ์„ ๋ฐ›์•„ ์ตœ๋Œ€ end_pages ํŽ˜์ด์ง€๊นŒ์ง€ ๋งˆํฌ๋‹ค์šด ์ถ”์ถœ ํ›„,
base64 ์ด๋ฏธ์ง€๊ฐ€ ํฌํ•จ๋œ md_content๋ฅผ ๋ฐ˜ํ™˜.
"""
file_path = to_pdf(file_path)
if end_pages > 20:
end_pages = 20
local_md_dir, file_name = parse_pdf(file_path, './output', end_pages - 1, is_ocr,
layout_mode, formula_enable, table_enable, language)
archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip")
zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path)
if zip_archive_success == 0:
logger.info("์••์ถ• ์„ฑ๊ณต")
else:
logger.error("์••์ถ• ์‹คํŒจ")
md_path = os.path.join(local_md_dir, file_name + ".md")
with open(md_path, 'r', encoding='utf-8') as f:
txt_content = f.read()
md_content = replace_image_with_base64(txt_content, local_md_dir)
# new_pdf_path = os.path.join(local_md_dir, file_name + "_layout.pdf") # ์›๋ž˜ pdf ๋ฏธ๋ฆฌ๋ณด๊ธฐ์šฉ
return md_content # base64 ์ด๋ฏธ์ง€๊ฐ€ ํฌํ•จ๋œ ์ตœ์ข… ๋งˆํฌ๋‹ค์šด ํ…์ŠคํŠธ๋งŒ ๋ฐ˜ํ™˜
latex_delimiters = [
{"left": "$$", "right": "$$", "display": True},
{"left": '$', "right": '$', "display": False}
]
def init_model():
"""
magic_pdf์˜ ๋ชจ๋ธ์„ ๋ฏธ๋ฆฌ ์ดˆ๊ธฐํ™”.
"""
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
try:
model_manager = ModelSingleton()
txt_model = model_manager.get_model(False, False)
logger.info(f"txt_model init final")
ocr_model = model_manager.get_model(True, False)
logger.info(f"ocr_model init final")
return 0
except Exception as e:
logger.exception(e)
return -1
model_init = init_model()
logger.info(f"model_init: {model_init}")
latin_lang = [
'af', 'az', 'bs', 'cs', 'cy', 'da', 'de', 'es', 'et', 'fr', 'ga', 'hr',
'hu', 'id', 'is', 'it', 'ku', 'la', 'lt', 'lv', 'mi', 'ms', 'mt', 'nl',
'no', 'oc', 'pi', 'pl', 'pt', 'ro', 'rs_latin', 'sk', 'sl', 'sq', 'sv',
'sw', 'tl', 'tr', 'uz', 'vi', 'french', 'german'
]
arabic_lang = ['ar', 'fa', 'ug', 'ur']
cyrillic_lang = [
'ru', 'rs_cyrillic', 'be', 'bg', 'uk', 'mn', 'abq', 'ady', 'kbd', 'ava',
'dar', 'inh', 'che', 'lbe', 'lez', 'tab'
]
devanagari_lang = [
'hi', 'mr', 'ne', 'bh', 'mai', 'ang', 'bho', 'mah', 'sck', 'new', 'gom',
'sa', 'bgc'
]
other_lang = ['ch', 'en', 'korean', 'japan', 'chinese_cht', 'ta', 'te', 'ka']
all_lang = ['', 'auto']
all_lang.extend([*other_lang, *latin_lang, *arabic_lang, *cyrillic_lang, *devanagari_lang])
##############################
# 2) Gemini LLM ์ฑ— ์ฝ”๋“œ
##############################
# (์ค‘๋ณต import์ด์ง€๋งŒ "๋ˆ„๋ฝ ์—†์ด" ์ถœ๋ ฅํ•ด์•ผ ํ•˜๋ฏ€๋กœ ์ฃผ์„ ์ฒ˜๋ฆฌ)
# import os
# import gradio as gr
from gradio import ChatMessage
from typing import Iterator
import google.generativeai as genai
import time
# get Gemini API Key from the environ variable
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
# we will be using the Gemini 2.0 Flash model with Thinking capabilities
model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-1219")
def format_chat_history(messages: list) -> list:
"""
Formats the chat history into a structure Gemini can understand
"""
formatted_history = []
for message in messages:
# Skip thinking messages (messages with metadata)
if not (message.get("role") == "assistant" and "metadata" in message):
formatted_history.append({
"role": "user" if message.get("role") == "user" else "assistant",
"parts": [message.get("content", "")]
})
return formatted_history
def stream_gemini_response(user_message: str, messages: list) -> Iterator[list]:
"""
Streams thoughts and response with conversation history support for text input only.
"""
if not user_message.strip():
messages.append(ChatMessage(role="assistant", content="Please provide a non-empty text message. Empty input is not allowed."))
yield messages
return
try:
print(f"\n=== New Request (Text) ===")
print(f"User message: {user_message}")
chat_history = format_chat_history(messages)
chat = model.start_chat(history=chat_history)
response = chat.send_message(user_message, stream=True)
thought_buffer = ""
response_buffer = ""
thinking_complete = False
messages.append(
ChatMessage(
role="assistant",
content="",
metadata={"title": "โš™๏ธ Thinking: *The thoughts produced by the model are experimental"}
)
)
for chunk in response:
parts = chunk.candidates[0].content.parts
current_chunk = parts[0].text
if len(parts) == 2 and not thinking_complete:
# Complete thought and start response
thought_buffer += current_chunk
print(f"\n=== Complete Thought ===\n{thought_buffer}")
messages[-1] = ChatMessage(
role="assistant",
content=thought_buffer,
metadata={"title": "โš™๏ธ Thinking: *The thoughts produced by the model are experimental"}
)
yield messages
# Start response
response_buffer = parts[1].text
print(f"\n=== Starting Response ===\n{response_buffer}")
messages.append(
ChatMessage(
role="assistant",
content=response_buffer
)
)
thinking_complete = True
elif thinking_complete:
response_buffer += current_chunk
print(f"\n=== Response Chunk ===\n{current_chunk}")
messages[-1] = ChatMessage(
role="assistant",
content=response_buffer
)
else:
thought_buffer += current_chunk
print(f"\n=== Thinking Chunk ===\n{current_chunk}")
messages[-1] = ChatMessage(
role="assistant",
content=thought_buffer,
metadata={"title": "โš™๏ธ Thinking: *The thoughts produced by the model are experimental"}
)
# time.sleep(0.05) #Optional debugging delay
yield messages
print(f"\n=== Final Response ===\n{response_buffer}")
except Exception as e:
print(f"\n=== Error ===\n{str(e)}")
messages.append(
ChatMessage(
role="assistant",
content=f"I apologize, but I encountered an error: {str(e)}"
)
)
yield messages
def user_message(msg: str, history: list) -> tuple[str, list]:
"""Adds user message to chat history"""
history.append(ChatMessage(role="user", content=msg))
return "", history
######################################################
# 3) ํ†ตํ•ฉ Gradio ์•ฑ ๊ตฌ์„ฑ
# - PDF ์—…๋กœ๋“œ๋งŒ ๋ณด์ด๊ฒŒ ํ•˜๊ณ (๋‚˜๋จธ์ง€๋Š” hidden)
# - ์—…๋กœ๋“œ ํ›„ "๋ณ€ํ™˜" ๋ฒ„ํŠผ ํด๋ฆญ ์‹œ, ๋งˆํฌ๋‹ค์šด์„ ๋งŒ๋“ค์–ด
# Chatbot๊ณผ ๋Œ€ํ™”ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ „๋‹ฌ
######################################################
with gr.Blocks(title="ํ†ตํ•ฉ OCR & Gemini Chat", css=create_css(), theme=gr.themes.Soft(primary_hue="teal", secondary_hue="slate", neutral_hue="neutral")) as demo:
gr.HTML("""
<div class="title-area">
<h1>OCR FLEX + Gemini Chat</h1>
<p>PDF/์ด๋ฏธ์ง€ -> ํ…์ŠคํŠธ(๋งˆํฌ๋‹ค์šด) ๋ณ€ํ™˜ ํ›„, LLM Gemini์™€ ๋Œ€ํ™”</p>
</div>
""")
# ๋‚ด๋ถ€ ์ƒํƒœ(๋งˆํฌ๋‹ค์šด ํ…์ŠคํŠธ)
md_state = gr.State("")
chat_history = gr.State([]) # Gemini ์ฑ— ๊ธฐ๋ก ์ƒํƒœ
# 1) ํŒŒ์ผ ์—…๋กœ๋“œ UI
with gr.Row():
file = gr.File(
label="PDF ๋˜๋Š” ์ด๋ฏธ์ง€ ํŒŒ์ผ ์—…๋กœ๋“œ",
file_types=[".pdf", ".png", ".jpeg", ".jpg"],
interactive=True
)
convert_btn = gr.Button(
"๋ณ€ํ™˜",
elem_classes="primary-button"
)
# 2) ์›๋ž˜ ์กด์žฌํ•˜๋˜ ์Šฌ๋ผ์ด๋”, ์ฒดํฌ๋ฐ•์Šค ๋“ฑ์€ ์ „๋ถ€ hidden
max_pages = gr.Slider(
1, 20, 10,
step=1,
label='์ตœ๋Œ€ ๋ณ€ํ™˜ ํŽ˜์ด์ง€ ์ˆ˜',
elem_classes="invisible",
visible=False
)
layout_mode = gr.Dropdown(
["layoutlmv3", "doclayout_yolo"],
label="๋ ˆ์ด์•„์›ƒ ๋ชจ๋ธ",
value="doclayout_yolo",
elem_classes="invisible",
visible=False
)
language = gr.Dropdown(
all_lang,
label="์–ธ์–ด",
value='auto',
elem_classes="invisible",
visible=False
)
formula_enable = gr.Checkbox(
label="์ˆ˜์‹ ์ธ์‹ ํ™œ์„ฑํ™”",
value=True,
elem_classes="invisible",
visible=False
)
is_ocr = gr.Checkbox(
label="OCR ๊ฐ•์ œ ํ™œ์„ฑํ™”",
value=False,
elem_classes="invisible",
visible=False
)
table_enable = gr.Checkbox(
label="ํ‘œ ์ธ์‹ ํ™œ์„ฑํ™”(ํ…Œ์ŠคํŠธ)",
value=True,
elem_classes="invisible",
visible=False
)
# 3) ์ถœ๋ ฅ ๊ฒฐ๊ณผ(ํŒŒ์ผ, ๋งˆํฌ๋‹ค์šด ๋“ฑ)๋„ ์ˆจ๊น€
# ํ•„์š”ํ•˜๋ฉด ์ฃผ์„ ํ•ด์ œํ•˜์—ฌ ํ™•์ธ ๊ฐ€๋Šฅ
# output_file = gr.File(
# label="๋ณ€ํ™˜ ๊ฒฐ๊ณผ",
# interactive=False,
# visible=False
# )
# md = gr.Markdown(
# label="๋งˆํฌ๋‹ค์šด ๋ Œ๋”๋ง",
# visible=False
# )
# md_text = gr.TextArea(
# lines=45,
# visible=False
# )
# pdf_show = PDF(
# label='PDF ๋ฏธ๋ฆฌ๋ณด๊ธฐ',
# interactive=False,
# visible=False,
# height=800
# )
# 4) ํŒŒ์ผ ์—…๋กœ๋“œ -> '๋ณ€ํ™˜' ๋ฒ„ํŠผ ํด๋ฆญ์‹œ ๋™์ž‘:
# to_markdown ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด md_state์— ๋งˆํฌ๋‹ค์šด ์ €์žฅ
convert_btn.click(
fn=to_markdown,
inputs=[file, max_pages, is_ocr, layout_mode, formula_enable, table_enable, language],
outputs=md_state
)
# ==========================
# Gemini Chat ๋ถ€๋ถ„
# ==========================
gr.Markdown("## Gemini 2.0 Flash (With Thinking) Chat")
chatbot = gr.Chatbot(
label="Gemini2.0 Chatbot (Streaming Output)",
render_markdown=True,
height=400
)
with gr.Row():
chat_input = gr.Textbox(
lines=1,
label="์งˆ๋ฌธ ์ž…๋ ฅ",
placeholder="์ถ”์ถœ๋œ ๋ฌธ์„œ(๋งˆํฌ๋‹ค์šด ๋‚ด์šฉ)์— ๋Œ€ํ•ด ๊ถ๊ธˆํ•œ ์ ์„ ๋ฌผ์–ด๋ณด์„ธ์š”..."
)
clear_button = gr.Button("๋Œ€ํ™” ์ดˆ๊ธฐํ™”")
# ์‚ฌ์šฉ์ž๊ฐ€ ์งˆ๋ฌธ -> user_message -> Gemini ์ฒ˜๋ฆฌ -> stream_gemini_response
def user_message_wrapper(msg, history, doc_text):
"""
์‚ฌ์šฉ์ž๊ฐ€ ์ž…๋ ฅํ•  ๋•Œ๋งˆ๋‹ค, doc_text(๋งˆํฌ๋‹ค์šด)๋ฅผ ์ฐธ๊ณ ํ•˜๋„๋ก
์งˆ๋ฌธ์„ ์•ฝ๊ฐ„ ๋ณ€ํ˜•ํ•ด์„œ history์— ์ถ”๊ฐ€ํ•˜๋Š” ๋ฐฉ์‹(๊ฐ„๋‹จ ์˜ˆ์‹œ).
"""
if not doc_text:
# ์•„์ง ๋ณ€ํ™˜๋œ ๋ฌธ์„œ๊ฐ€ ์—†๋‹ค๋ฉด ๊ทธ๋ƒฅ ์งˆ๋ฌธ
user_query = msg
else:
# ๋ฌธ์„œ ๋‚ด์šฉ(doc_text)์„ "์ฐธ๊ณ " ์š”์ฒญํ•˜๋Š” ๊ฐ„๋‹จ ํ”„๋กฌํ”„ํŠธ ์˜ˆ์‹œ
user_query = f"๋‹ค์Œ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•˜์—ฌ ๋‹ต๋ณ€:\n\n{doc_text}\n\n์งˆ๋ฌธ: {msg}"
history.append(ChatMessage(role="user", content=user_query))
return "", history
chat_input.submit(
fn=user_message_wrapper,
inputs=[chat_input, chat_history, md_state],
outputs=[chat_input, chat_history]
).then(
fn=stream_gemini_response,
inputs=[chat_input, chat_history],
outputs=chat_history
).then(
fn=lambda h: h,
inputs=chat_history,
outputs=chatbot
)
clear_button.click(
fn=lambda: ([], ""),
inputs=[],
outputs=[chat_history, md_state]
).then(
fn=lambda: [],
inputs=[],
outputs=chatbot
)
##############################
# 4) ์‹ค์ œ ์‹คํ–‰
##############################
if __name__ == "__main__":
# ์ฒซ ๋ฒˆ์งธ demo.launch() - ํ†ตํ•ฉ ์•ฑ ์‹คํ–‰
demo.launch(ssr_mode=True, debug=True)
###############################################
# ์•„๋ž˜๋Š” "Gemini ์ฑ— ์ฝ”๋“œ" ์›๋ณธ์— ์žˆ๋˜
# ๋ณ„๋„์˜ demo.launch() ๋ถ€๋ถ„ (๋ˆ„๋ฝ ์—†์ด ์ฃผ์„ ๋ณด์กด)
###############################################
# if __name__ == "__main__":
# demo.launch(debug=True)