Spaces:
Sleeping
Sleeping
import PyPDF2 | |
from openpyxl import load_workbook | |
from pptx import Presentation | |
import gradio as gr | |
import io | |
from huggingface_hub import InferenceClient | |
import re | |
import zipfile | |
import xml.etree.ElementTree as ET | |
def xml2text(xml): | |
text = u'' | |
root = ET.fromstring(xml) | |
for child in root.iter(): | |
text += child.text + " " if child.text is not None else '' | |
return text | |
def extract_text_from_docx(docx_data): | |
text = u'' | |
zipf = zipfile.ZipFile(io.BytesIO(docx_data)) | |
filelist = zipf.namelist() | |
header_xmls = 'word/header[0-9]*.xml' | |
for fname in filelist: | |
if re.match(header_xmls, fname): | |
text += xml2text(zipf.read(fname)) | |
doc_xml = 'word/document.xml' | |
text += xml2text(zipf.read(doc_xml)) | |
footer_xmls = 'word/footer[0-9]*.xml' | |
for fname in filelist: | |
if re.match(footer_xmls, fname): | |
text += xml2text(zipf.read(fname)) | |
zipf.close() | |
return text.strip() | |
# Initialize the Mistral chat model | |
client = InferenceClient("mistralai/Mistral-Nemo-Instruct-2407") | |
def read_document(file): | |
file_path = file.name # Get the file path from NamedString | |
file_extension = file_path.split('.')[-1].lower() | |
with open(file_path, "rb") as f: # Open the file in binary read mode | |
file_content = f.read() | |
if file_extension == 'pdf': | |
try: | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content)) | |
content = '' | |
for page in range(len(pdf_reader.pages)): | |
content += pdf_reader.pages[page].extract_text() | |
return content | |
except Exception as e: | |
return f"Error reading PDF: {e}" | |
elif file_extension == 'xlsx': | |
try: | |
wb = load_workbook(io.BytesIO(file_content)) | |
content = '' | |
for sheet in wb.worksheets: | |
for row in sheet.rows: | |
for cell in row: | |
content += str(cell.value) + ' ' | |
return content | |
except Exception as e: | |
return f"Error reading XLSX: {e}" | |
elif file_extension == 'pptx': | |
try: | |
presentation = Presentation(io.BytesIO(file_content)) | |
content = '' | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
content += shape.text + ' ' | |
return content | |
except Exception as e: | |
return f"Error reading PPTX: {e}" | |
elif file_extension == 'doc' or file_extension == 'docx': | |
try: | |
return extract_text_from_docx(file_content) | |
except Exception as e: | |
return f"Error reading DOC/DOCX: {e}" | |
else: | |
try: | |
content = file_content.decode('utf-8') | |
return content | |
except Exception as e: | |
return f"Error reading file: {e}" | |
def split_content(content, chunk_size=32000): | |
chunks = [] | |
for i in range(0, len(content), chunk_size): | |
chunks.append(content[i:i + chunk_size]) | |
return chunks | |
def chat_document(file, question): | |
content = str(read_document(file)) | |
content = content.replace('\n', ' ') | |
content = content.replace('\r', ' ') | |
content = content.replace('\t', ' ') | |
content = content.replace(' ', '') | |
content = content.strip() | |
if len(content) > 32000: | |
content = content[:32000] | |
# Define system prompt for the chat API | |
system_prompt = """ | |
You are a helpful and informative assistant that can answer questions based on the content of documents. | |
You will receive the content of a document and a question about it. | |
Your task is to provide a concise and accurate answer to the question based solely on the provided document content. | |
If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. | |
""" | |
message = f"""[INST] [SYSTEM] {system_prompt} | |
Document Content: {content} | |
Question: {question} | |
Answer:""" | |
stream = client.text_generation(message, max_new_tokens=4096, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
if not response.token.text == "</s>": | |
output += response.token.text | |
yield output | |
def chat_document_v2(file, question): | |
content = str(read_document(file)) | |
content = content.replace('\n', ' ') | |
content = content.replace('\r', ' ') | |
content = content.replace('\t', ' ') | |
content = content.replace(' ', '') | |
content = content.strip() | |
chunks = split_content(content) | |
# Define system prompt for the chat API | |
system_prompt = """ | |
You are a helpful and informative assistant that can answer questions based on the content of documents. | |
You will receive the content of a document and a question about it. | |
Your task is to provide a concise and accurate answer to the question based solely on the provided document content. | |
If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. | |
""" | |
all_answers = [] | |
for chunk in chunks: | |
message = f"""[INST] [SYSTEM] {system_prompt} | |
Document Content: {chunk[:32000]} | |
Question: {question} | |
Answer:""" | |
stream = client.text_generation(message, max_new_tokens=4096, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
if not response.token.text == "</s>": | |
output += response.token.text | |
all_answers.append(output) | |
# Summarize all answers using Mistral | |
summary_prompt = """ | |
You are a helpful and informative assistant that can summarize multiple answers related to the same question. | |
You will receive a list of answers to a question, and your task is to generate a concise and comprehensive summary that incorporates the key information from all the answers. | |
Avoid repeating information unnecessarily and focus on providing the most relevant and accurate summary based on the provided answers. | |
Answers: | |
""" | |
all_answers_str = "\n".join(all_answers) | |
print(all_answers_str) | |
summary_message = f"""[INST] [SYSTEM] {summary_prompt} | |
{all_answers_str[:30000]} | |
Summary:""" | |
stream = client.text_generation(summary_message, max_new_tokens=4096, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
if not response.token.text == "</s>": | |
output += response.token.text | |
yield output | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.TabItem("Document Reader"): | |
iface1 = gr.Interface( | |
fn=read_document, | |
inputs=gr.File(label="Upload a Document"), | |
outputs=gr.Textbox(label="Document Content"), | |
title="Document Reader", | |
description="Upload a document (PDF, XLSX, PPTX, TXT, CSV, DOC, DOCX and Code or text file) to read its content." | |
) | |
with gr.TabItem("Document Chat"): | |
iface2 = gr.Interface( | |
fn=chat_document, | |
inputs=[gr.File(label="Upload a Document"), gr.Textbox(label="Question")], | |
outputs=gr.Markdown(label="Answer"), | |
title="Document Chat", | |
description="Upload a document and ask questions about its content." | |
) | |
with gr.TabItem("Document Chat V2"): | |
iface3 = gr.Interface( | |
fn=chat_document_v2, | |
inputs=[gr.File(label="Upload a Document"), gr.Textbox(label="Question")], | |
outputs=gr.Markdown(label="Answer"), | |
title="Document Chat V2", | |
description="Upload a document and ask questions about its content (using chunk-based approach)." | |
) | |
demo.launch() |