neuralchat / app.py
ishans24's picture
Update app.py
07463a5 verified
import os
import shutil
import gradio as gr
from typing import List
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import requests
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.faiss import FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
import google.generativeai as genai
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure Google API
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
proxy_host = os.getenv('PROXY_HOST')
proxy_port = os.getenv('PROXY_PORT')
proxy_username = os.getenv('PROXY_USERNAME')
proxy_password = os.getenv('PROXY_PASSWORD')
# Format the proxy URL
proxy_url = f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}'
def extract_pdf_text(pdf_files):
all_text = ""
for pdf in pdf_files:
pdf_reader = PdfReader(pdf)
for page in pdf_reader.pages:
all_text += page.extract_text()
return all_text
def extract_video_id(url):
parsed_url = urlparse(url)
if parsed_url.hostname == 'youtu.be':
return parsed_url.path[1:]
elif parsed_url.hostname in ['www.youtube.com', 'youtube.com']:
query_params = parse_qs(parsed_url.query)
return query_params.get('v', [None])[0]
return None
def extract_youtube_transcript(video_id):
try:
srt = YouTubeTranscriptApi.get_transcript(video_id, proxies={'https': proxy_url})
all_text = ""
for dic in srt:
all_text += dic['text'] + ' '
return all_text
except Exception as e:
print(f"Error extracting YouTube transcript: {e}")
return str(e)
def get_youtube_video_title(video_id):
try:
url = f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json"
response = requests.get(url)
data = response.json()
return data['title']
except Exception:
return "Untitled YouTube Video"
def split_text_into_chunks(text):
splitter = RecursiveCharacterTextSplitter(chunk_size=12000, chunk_overlap=1200)
text_chunks = splitter.split_text(text)
return text_chunks
def create_vector_store(chunks):
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector_store = FAISS.from_texts(chunks, embedding=embeddings)
vector_store.save_local("faiss_index")
def setup_conversation_chain(template):
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3)
prompt = PromptTemplate(template=template, input_variables=["context", "question"])
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
def process_files(files, youtube_url):
all_text = ""
uploaded_files = []
# Process PDF files
if files:
os.makedirs("uploads", exist_ok=True)
for file in files:
# Extract just the filename from the full path
filename = os.path.basename(file.name)
file_path = os.path.join("uploads", filename)
# Copy the file from the temporary location to our uploads directory
shutil.copy(file.name, file_path)
all_text += extract_pdf_text([file_path])
uploaded_files.append({"name": filename, "type": "pdf"})
# Process YouTube URL
if youtube_url:
video_id = extract_video_id(youtube_url)
if video_id:
transcript = extract_youtube_transcript(video_id)
all_text += transcript
video_title = get_youtube_video_title(video_id)
uploaded_files.append({"name": video_title, "url": youtube_url})
else:
return "Invalid YouTube URL", ""
if not all_text:
return "No content to process", ""
chunks = split_text_into_chunks(all_text)
create_vector_store(chunks)
# Remove uploaded files after processing
if os.path.exists("uploads"):
for file in os.listdir("uploads"):
file_path = os.path.join("uploads", file)
if os.path.exists(file_path):
os.remove(file_path)
# Format the file list for display
file_list_text = "\n".join(
[f"- **{file['name']}**" + (f" ([Link]({file['url']}))" if 'url' in file else "") for file in uploaded_files]
)
return "Content uploaded and processed successfully", file_list_text
def ask_question(question):
try:
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
indexed_data = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
docs = indexed_data.similarity_search(question)
prompt_template = """
Your alias is NeuralChat. Your task is to provide a thorough response based on the given context, ensuring all relevant details are included.
If the requested information isn't available, simply state, "answer not available in context," then answer based on your understanding, connecting with the context.
Don't provide incorrect information.\n\n
Context: \n {context}?\n
Question: \n {question}\n
Answer:
"""
chain = setup_conversation_chain(prompt_template)
response = chain({"input_documents": docs, "question": question}, return_only_outputs=True)
return response["output_text"]
except Exception as e:
return f"An error occurred: {str(e)}"
def chat(message, history):
response = ask_question(message)
history.append((message, response))
return history, ""
theme = gr.themes.Monochrome().set(
button_primary_background_fill="#FF0000",
button_primary_background_fill_hover="#FF0000",
)
# Gradio interface
with gr.Blocks(theme=theme) as demo:
gr.Markdown("# NeuralChat", elem_id="header")
with gr.Row():
with gr.Column(scale=2):
files = gr.File(label="Upload PDF Files", file_count="multiple")
youtube_url = gr.Textbox(label="YouTube URL")
upload_button = gr.Button("Upload and Process")
upload_output = gr.Textbox(label="Upload Status")
file_list = gr.Markdown(label="Uploaded Files")
with gr.Column(scale=5):
chatbot = gr.Chatbot(show_copy_button=True, scale=1.5)
msg = gr.Textbox(label="Ask a question", lines=1)
upload_button.click(process_files, inputs=[files, youtube_url], outputs=[upload_output, file_list])
msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot, msg])
if __name__ == "__main__":
demo.launch()