Spaces:
Running
Running
import os | |
from typing import Optional, Tuple | |
import gradio as gr | |
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
import torch | |
import tempfile | |
import time | |
# Configurações | |
LLM_MODEL = "google/flan-t5-large" | |
DOCS_DIR = "documents" | |
class DocumentQA: | |
def __init__(self): | |
# Carrega o modelo e o tokenizador | |
self.tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
LLM_MODEL, | |
device_map="auto", | |
torch_dtype=torch.float32 | |
) | |
# Carrega a base de conhecimento | |
self.base_texts = self.load_base_knowledge() | |
def load_base_knowledge(self) -> Optional[list]: | |
try: | |
if not os.path.exists(DOCS_DIR): | |
os.makedirs(DOCS_DIR) | |
return None | |
# Carrega documentos da pasta | |
loader = DirectoryLoader( | |
DOCS_DIR, | |
glob="**/*.pdf", | |
loader_cls=PyPDFLoader | |
) | |
documents = loader.load() | |
if not documents: | |
return None | |
# Divide os documentos em trechos menores | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
# Extrai o texto dos trechos | |
return [doc.page_content for doc in texts] | |
except Exception as e: | |
print(f"Erro ao carregar base de conhecimento: {str(e)}") | |
return None | |
def process_pdf(self, file_content: bytes) -> Optional[list]: | |
try: | |
# Salva o PDF temporariamente | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(file_content) | |
tmp_path = tmp_file.name | |
# Carrega o PDF | |
loader = PyPDFLoader(tmp_path) | |
documents = loader.load() | |
os.unlink(tmp_path) | |
if not documents: | |
return None | |
# Divide o PDF em trechos menores | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", ".", " ", ""] | |
) | |
texts = text_splitter.split_documents(documents) | |
# Extrai o texto dos trechos | |
return [doc.page_content for doc in texts] | |
except Exception as e: | |
print(f"Erro ao processar PDF: {str(e)}") | |
return None | |
def find_relevant_texts(self, query: str, texts: list) -> list: | |
"""Encontra trechos relevantes com base em palavras-chave da pergunta.""" | |
relevant_texts = [] | |
query_keywords = set(query.lower().split()) | |
for text in texts: | |
text_keywords = set(text.lower().split()) | |
if query_keywords.intersection(text_keywords): | |
relevant_texts.append(text) | |
return relevant_texts | |
def generate_response(self, file_obj, query: str, progress=gr.Progress()) -> Tuple[str, str, str]: | |
"""Retorna (resposta, status, tempo_decorrido)""" | |
if not query.strip(): | |
return "Por favor, insira uma pergunta.", "⚠️ Aguardando pergunta", "0s" | |
start_time = time.time() | |
try: | |
progress(0.2, desc="Processando documentos...") | |
# Determina a fonte dos documentos | |
has_pdf = file_obj is not None | |
has_base = self.base_texts is not None | |
source_type = "both" if has_pdf and has_base else "pdf" if has_pdf else "base" if has_base else None | |
if not source_type: | |
return "Nenhuma fonte de documentos disponível.", "❌ Sem documentos", "0s" | |
# Processa documento | |
if has_pdf: | |
pdf_texts = self.process_pdf(file_obj) | |
if pdf_texts is None: | |
return "Não foi possível processar o PDF.", "❌ Erro no processamento", "0s" | |
else: | |
pdf_texts = [] | |
# Combina os textos | |
all_texts = pdf_texts + (self.base_texts if self.base_texts else []) | |
progress(0.4, desc="Buscando informações relevantes...") | |
# Encontra trechos relevantes | |
relevant_texts = self.find_relevant_texts(query, all_texts) | |
# Verifica se há trechos relevantes | |
if not relevant_texts: | |
return "🔍 Não foram encontradas informações suficientes nos documentos para responder esta pergunta.", "⚠️ Contexto insuficiente", f"{time.time() - start_time:.1f}s" | |
# Prepara o contexto para o prompt | |
context = "\n\n".join(relevant_texts) | |
progress(0.6, desc="Gerando resposta...") | |
# Cria o prompt | |
prompt = f"""Instruções: | |
1. Analise cuidadosamente o contexto fornecido. | |
2. Responda à seguinte pergunta em português de forma clara e direta: {query} | |
3. Use apenas informações encontradas no contexto. | |
4. Se não houver informações suficientes, indique explicitamente. | |
5. Mantenha a resposta objetiva e baseada em fatos. | |
6. Cite exemplos específicos do contexto quando relevante. | |
Contexto: | |
{context} | |
Pergunta: {query}""" | |
# Gera a resposta usando o modelo diretamente | |
inputs = self.tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True) | |
outputs = self.model.generate( | |
inputs["input_ids"], | |
max_length=512, | |
temperature=0.3, | |
top_p=0.9, | |
repetition_penalty=1.2 | |
) | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
progress(1.0, desc="Concluído!") | |
return response, "✅ Sucesso", elapsed_time | |
except Exception as e: | |
elapsed_time = f"{time.time() - start_time:.1f}s" | |
return f"Erro ao gerar resposta: {str(e)}", "❌ Erro", elapsed_time | |
def create_demo(): | |
qa_system = DocumentQA() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
with gr.Column(elem_id="container"): | |
# Cabeçalho | |
gr.Markdown( | |
""" | |
# 🤖 Assistente de Documentos Inteligente | |
Sistema de consulta avançada que responde perguntas sobre seus documentos. | |
""" | |
) | |
# Área principal | |
with gr.Row(): | |
# Coluna de entrada | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📄 Documentos") | |
file_input = gr.File( | |
label="Upload de PDF (opcional)", | |
type="binary", | |
file_types=[".pdf"], | |
height=100, | |
) | |
info = gr.Markdown( | |
f""" | |
ℹ️ O sistema consulta: | |
- PDFs enviados por você | |
- Documentos na pasta `{DOCS_DIR}` | |
""" | |
) | |
with gr.Group(): | |
gr.Markdown("### ❓ Sua Pergunta") | |
query_input = gr.Textbox( | |
placeholder="Digite sua pergunta sobre os documentos...", | |
lines=3, | |
max_lines=6, | |
show_label=False, | |
) | |
with gr.Row(): | |
clear_btn = gr.Button("🗑️ Limpar", variant="secondary") | |
submit_btn = gr.Button("🔍 Enviar Pergunta", variant="primary") | |
# Coluna de saída | |
with gr.Column(): | |
with gr.Group(): | |
gr.Markdown("### 📝 Resposta") | |
with gr.Row(): | |
status_output = gr.Textbox( | |
label="Status", | |
value="⏳ Aguardando...", | |
interactive=False, | |
show_label=False, | |
) | |
time_output = gr.Textbox( | |
label="Tempo", | |
value="0s", | |
interactive=False, | |
show_label=False, | |
) | |
response_output = gr.Textbox( | |
label="Resposta", | |
placeholder="A resposta aparecerá aqui...", | |
interactive=False, | |
lines=12, | |
show_label=False, | |
) | |
# Exemplos | |
with gr.Accordion("📚 Exemplos de Perguntas", open=False): | |
gr.Examples( | |
examples=[ | |
[None, "Quais são os principais tópicos abordados neste documento?"], | |
[None, "Resuma as conclusões mais importantes."], | |
[None, "O que o documento diz sobre [tema específico]?"], | |
[None, "Quais são as recomendações apresentadas?"], | |
], | |
inputs=[file_input, query_input], | |
) | |
# Rodapé | |
gr.Markdown( | |
""" | |
--- | |
### 🔧 Informações do Sistema | |
* Respostas geradas usando tecnologia de processamento de linguagem natural | |
* Processamento inteligente de documentos PDF | |
* Respostas baseadas exclusivamente no conteúdo dos documentos | |
* Suporte a múltiplos documentos e contextos | |
""" | |
) | |
# Eventos | |
submit_btn.click( | |
fn=qa_system.generate_response, | |
inputs=[file_input, query_input], | |
outputs=[response_output, status_output, time_output], | |
) | |
clear_btn.click( | |
lambda: (None, "", "⏳ Aguardando...", "0s"), | |
outputs=[file_input, query_input, status_output, time_output], | |
) | |
# Limpa a resposta quando a pergunta muda | |
query_input.change( | |
lambda: ("", "⏳ Aguardando...", "0s"), | |
outputs=[response_output, status_output, time_output], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.launch() |