Spaces:
Sleeping
Sleeping
import gradio as gr | |
from groq import Groq, RateLimitError | |
import pandas as pd | |
from PIL import Image | |
import pytesseract | |
import pdfplumber | |
from pdf2image import convert_from_path | |
import os | |
import time | |
from dotenv import load_dotenv | |
# Load environment variables from .env file | |
load_dotenv() | |
# Set the path to Tesseract executable | |
pytesseract.pytesseract.tesseract_cmd = os.getenv("TESSERACT_CMD") | |
# Set the path to Poppler for PDF image extraction | |
poppler_path = os.getenv("POPPLER_PATH") | |
# Your Groq API key | |
YOUR_GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
# Initialize Groq client | |
client = Groq(api_key=YOUR_GROQ_API_KEY) | |
# Global variable to store extracted text | |
extracted_text = "" | |
def extract_text_from_image(image): | |
return pytesseract.image_to_string(image) | |
def remove_header_footer(image, header_height=3.9, footer_height=2.27): | |
width, height = image.size | |
header_height_pixels = int(header_height * 96) # Convert inches to pixels (assuming 96 DPI) | |
footer_height_pixels = int(footer_height * 96) | |
cropping_box = (0, header_height_pixels, width, height - footer_height_pixels) | |
return image.crop(cropping_box) | |
def handle_file(file, page_range=None): | |
global extracted_text | |
extracted_text = "" | |
if file is None: | |
return None, "No file uploaded" | |
file_name = file.name.lower() | |
if file_name.endswith(('png', 'jpg', 'jpeg')): | |
image = Image.open(file) | |
extracted_text = extract_text_from_image(image) | |
return image, extracted_text | |
elif file_name.endswith('pdf'): | |
text = "" | |
pdf_images = [] | |
start_page = 1 | |
end_page = None | |
if page_range: | |
try: | |
start_page, end_page = map(int, page_range.split('-')) | |
except ValueError: | |
start_page = int(page_range) | |
end_page = start_page | |
with pdfplumber.open(file) as pdf_file: | |
total_pages = len(pdf_file.pages) | |
end_page = end_page or total_pages | |
for page_number in range(start_page - 1, end_page): | |
page = pdf_file.pages[page_number] | |
page_text = page.extract_text() or "" | |
text += f"Page {page_number + 1}:\n{page_text}\n" | |
try: | |
page_images = convert_from_path(file.name, first_page=page_number + 1, last_page=page_number + 1, poppler_path=poppler_path) | |
page_images = [remove_header_footer(img) for img in page_images] | |
pdf_images.extend(page_images) | |
for img in page_images: | |
image_text = extract_text_from_image(img) | |
text += f"Page {page_number + 1} (Image):\n{image_text}\n" | |
except Exception as e: | |
text += f"Error processing images on page {page_number + 1}: {e}\n" | |
extracted_text = text | |
if pdf_images: | |
return pdf_images[0], extracted_text | |
else: | |
return None, extracted_text | |
elif file_name.endswith(('xls', 'xlsx')): | |
df = pd.read_excel(file) | |
extracted_text = df.to_string() | |
return None, extracted_text | |
elif file_name.endswith('csv'): | |
df = pd.read_csv(file) | |
extracted_text = df.to_string() | |
return None, extracted_text | |
else: | |
return None, "Unsupported file type" | |
def split_text(text, max_length=2000): | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for word in words: | |
word_length = len(word) + 1 # +1 for the space or punctuation | |
if current_length + word_length > max_length: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [word] | |
current_length = word_length | |
else: | |
current_chunk.append(word) | |
current_length += word_length | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
def is_rate_limited(): | |
# Implement a method to check rate limit status if needed | |
return False | |
def chat_groq_sync(user_input, history, extracted_text): | |
retries = 5 | |
while retries > 0: | |
rate_limit_status = is_rate_limited() | |
if rate_limit_status: | |
return f"{rate_limit_status} Please try again later." | |
messages = [{"role": "system", "content": "The following text is extracted from the uploaded file:\n" + extracted_text}] | |
for msg in history: | |
messages.append({"role": "user", "content": msg[0]}) | |
messages.append({"role": "assistant", "content": msg[1]}) | |
messages.append({"role": "user", "content": user_input}) | |
try: | |
response = client.chat.completions.create( | |
model="llama3-70b-8192", | |
messages=messages, | |
max_tokens=1000, | |
temperature=0.4 | |
) | |
response_content = response.choices[0].message.content | |
return response_content | |
except RateLimitError as e: | |
error_info = e.args[0] if e.args else {} | |
error_message = error_info.get('error', {}).get('message', '') if isinstance(error_info, dict) else str(error_info) | |
wait_time = 60 | |
if 'try again in' in error_message: | |
try: | |
wait_time = float(error_message.split('try again in ')[-1].split('s')[0]) | |
except ValueError: | |
pass | |
print(f"Rate limit error: {error_message}") | |
print(f"Retrying in {wait_time:.2f} seconds...") | |
retries -= 1 | |
if retries > 0: | |
time.sleep(wait_time) | |
else: | |
return "Rate limit exceeded. Please try again later." | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
return "An unexpected error occurred. Please try again later." | |
def update_chat(user_input, history): | |
global extracted_text | |
response = chat_groq_sync(user_input, history, extracted_text) | |
history.append((user_input, response)) | |
return history, history, "" | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("# RAG Chatbot") | |
gr.Markdown("Check out the [GitHub](https://github.com/anshh-arora?tab=repositories) for more information.") | |
file = gr.File(label="Upload your file") | |
page_range = gr.Textbox(label="If the uploaded document is a PDF and has more than 10 pages, enter the page range (e.g., 1-3) or specific page number (e.g., 2):", lines=1, visible=False, interactive=True) | |
file_upload_button = gr.Button("Upload File") | |
image_display = gr.Image(label="Uploaded Image", visible=False) | |
extracted_text_display = gr.Textbox(label="Extracted Text", interactive=False) | |
with gr.Column(scale=3): | |
gr.Markdown("# Chat with your file") | |
history = gr.State([]) | |
with gr.Column(): | |
chatbot = gr.Chatbot(height=500, bubble_full_width=False) | |
user_input = gr.Textbox(placeholder="Enter Your Query", visible=True, scale=7, interactive=True) | |
clear_btn = gr.Button("Clear") | |
undo_btn = gr.Button("Undo") | |
user_input.submit(update_chat, [user_input, history], [chatbot, history, user_input]) | |
clear_btn.click(lambda: ([], []), None, [chatbot, history]) | |
undo_btn.click(lambda h: h[:-2], history, history) | |
def show_page_range_input(file): | |
if file and file.name.lower().endswith('pdf'): | |
with pdfplumber.open(file) as pdf_file: | |
if len(pdf_file.pages) > 10: | |
return gr.update(visible=True) | |
return gr.update(visible=False) | |
file.change(show_page_range_input, inputs=file, outputs=page_range) | |
file_upload_button.click(handle_file, [file, page_range], [image_display, extracted_text_display]) | |
if __name__ == "__main__": | |
demo.launch(share=True) | |