import os import shutil import gradio as gr from typing import List from youtube_transcript_api import YouTubeTranscriptApi from urllib.parse import urlparse, parse_qs import requests from PyPDF2 import PdfReader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores.faiss import FAISS from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI import google.generativeai as genai from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure Google API genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) proxy_host = os.getenv('PROXY_HOST') proxy_port = os.getenv('PROXY_PORT') proxy_username = os.getenv('PROXY_USERNAME') proxy_password = os.getenv('PROXY_PASSWORD') # Format the proxy URL proxy_url = f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}' def extract_pdf_text(pdf_files): all_text = "" for pdf in pdf_files: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: all_text += page.extract_text() return all_text def extract_video_id(url): parsed_url = urlparse(url) if parsed_url.hostname == 'youtu.be': return parsed_url.path[1:] elif parsed_url.hostname in ['www.youtube.com', 'youtube.com']: query_params = parse_qs(parsed_url.query) return query_params.get('v', [None])[0] return None def extract_youtube_transcript(video_id): try: srt = YouTubeTranscriptApi.get_transcript(video_id, proxies={'https': proxy_url}) all_text = "" for dic in srt: all_text += dic['text'] + ' ' return all_text except Exception as e: print(f"Error extracting YouTube transcript: {e}") return str(e) def get_youtube_video_title(video_id): try: url = f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json" response = requests.get(url) data = response.json() return data['title'] except Exception: return "Untitled YouTube Video" def split_text_into_chunks(text): splitter = RecursiveCharacterTextSplitter(chunk_size=12000, chunk_overlap=1200) text_chunks = splitter.split_text(text) return text_chunks def create_vector_store(chunks): embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") vector_store = FAISS.from_texts(chunks, embedding=embeddings) vector_store.save_local("faiss_index") def setup_conversation_chain(template): model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3) prompt = PromptTemplate(template=template, input_variables=["context", "question"]) chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) return chain def process_files(files, youtube_url): all_text = "" uploaded_files = [] # Process PDF files if files: os.makedirs("uploads", exist_ok=True) for file in files: # Extract just the filename from the full path filename = os.path.basename(file.name) file_path = os.path.join("uploads", filename) # Copy the file from the temporary location to our uploads directory shutil.copy(file.name, file_path) all_text += extract_pdf_text([file_path]) uploaded_files.append({"name": filename, "type": "pdf"}) # Process YouTube URL if youtube_url: video_id = extract_video_id(youtube_url) if video_id: transcript = extract_youtube_transcript(video_id) all_text += transcript video_title = get_youtube_video_title(video_id) uploaded_files.append({"name": video_title, "url": youtube_url}) else: return "Invalid YouTube URL", "" if not all_text: return "No content to process", "" chunks = split_text_into_chunks(all_text) create_vector_store(chunks) # Remove uploaded files after processing if os.path.exists("uploads"): for file in os.listdir("uploads"): file_path = os.path.join("uploads", file) if os.path.exists(file_path): os.remove(file_path) # Format the file list for display file_list_text = "\n".join( [f"- **{file['name']}**" + (f" ([Link]({file['url']}))" if 'url' in file else "") for file in uploaded_files] ) return "Content uploaded and processed successfully", file_list_text def ask_question(question): try: embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") indexed_data = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) docs = indexed_data.similarity_search(question) prompt_template = """ Your alias is NeuralChat. Your task is to provide a thorough response based on the given context, ensuring all relevant details are included. If the requested information isn't available, simply state, "answer not available in context," then answer based on your understanding, connecting with the context. Don't provide incorrect information.\n\n Context: \n {context}?\n Question: \n {question}\n Answer: """ chain = setup_conversation_chain(prompt_template) response = chain({"input_documents": docs, "question": question}, return_only_outputs=True) return response["output_text"] except Exception as e: return f"An error occurred: {str(e)}" def chat(message, history): response = ask_question(message) history.append((message, response)) return history, "" theme = gr.themes.Monochrome().set( button_primary_background_fill="#FF0000", button_primary_background_fill_hover="#FF0000", ) # Gradio interface with gr.Blocks(theme=theme) as demo: gr.Markdown("# NeuralChat", elem_id="header") with gr.Row(): with gr.Column(scale=2): files = gr.File(label="Upload PDF Files", file_count="multiple") youtube_url = gr.Textbox(label="YouTube URL") upload_button = gr.Button("Upload and Process") upload_output = gr.Textbox(label="Upload Status") file_list = gr.Markdown(label="Uploaded Files") with gr.Column(scale=5): chatbot = gr.Chatbot(show_copy_button=True, scale=1.5) msg = gr.Textbox(label="Ask a question", lines=1) upload_button.click(process_files, inputs=[files, youtube_url], outputs=[upload_output, file_list]) msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot, msg]) if __name__ == "__main__": demo.launch()