File size: 6,778 Bytes
8108759
 
5e4acb0
8108759
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
07463a5
 
 
 
 
 
 
8108759
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
07463a5
8108759
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f1b8dbf
 
 
 
 
8108759
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4acb0
 
8108759
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e4acb0
 
8108759
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import shutil
import gradio as gr
from typing import List
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import requests
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.faiss import FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
import google.generativeai as genai
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configure Google API
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
proxy_host = os.getenv('PROXY_HOST')
proxy_port = os.getenv('PROXY_PORT')
proxy_username = os.getenv('PROXY_USERNAME')
proxy_password = os.getenv('PROXY_PASSWORD')

# Format the proxy URL
proxy_url = f'http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}'

def extract_pdf_text(pdf_files):
    all_text = ""
    for pdf in pdf_files:
        pdf_reader = PdfReader(pdf)
        for page in pdf_reader.pages:
            all_text += page.extract_text()
    return all_text

def extract_video_id(url):
    parsed_url = urlparse(url)
    if parsed_url.hostname == 'youtu.be':
        return parsed_url.path[1:]
    elif parsed_url.hostname in ['www.youtube.com', 'youtube.com']:
        query_params = parse_qs(parsed_url.query)
        return query_params.get('v', [None])[0]
    return None

def extract_youtube_transcript(video_id):
    try:
        srt = YouTubeTranscriptApi.get_transcript(video_id, proxies={'https': proxy_url})
        all_text = ""
        for dic in srt:
            all_text += dic['text'] + ' '
        return all_text
    except Exception as e:
        print(f"Error extracting YouTube transcript: {e}")
        return str(e)

def get_youtube_video_title(video_id):
    try:
        url = f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json"
        response = requests.get(url)
        data = response.json()
        return data['title']
    except Exception:
        return "Untitled YouTube Video"

def split_text_into_chunks(text):
    splitter = RecursiveCharacterTextSplitter(chunk_size=12000, chunk_overlap=1200)
    text_chunks = splitter.split_text(text)
    return text_chunks

def create_vector_store(chunks):
    embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
    vector_store = FAISS.from_texts(chunks, embedding=embeddings)
    vector_store.save_local("faiss_index")

def setup_conversation_chain(template):
    model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3)
    prompt = PromptTemplate(template=template, input_variables=["context", "question"])
    chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
    return chain

def process_files(files, youtube_url):
    all_text = ""
    uploaded_files = []

    # Process PDF files
    if files:
        os.makedirs("uploads", exist_ok=True)
        for file in files:
            # Extract just the filename from the full path
            filename = os.path.basename(file.name)
            file_path = os.path.join("uploads", filename)

            # Copy the file from the temporary location to our uploads directory
            shutil.copy(file.name, file_path)

            all_text += extract_pdf_text([file_path])
            uploaded_files.append({"name": filename, "type": "pdf"})

    # Process YouTube URL
    if youtube_url:
        video_id = extract_video_id(youtube_url)
        if video_id:
            transcript = extract_youtube_transcript(video_id)
            all_text += transcript
            video_title = get_youtube_video_title(video_id)
            uploaded_files.append({"name": video_title, "url": youtube_url})
        else:
            return "Invalid YouTube URL", ""

    if not all_text:
        return "No content to process", ""

    chunks = split_text_into_chunks(all_text)
    create_vector_store(chunks)

    # Remove uploaded files after processing
    if os.path.exists("uploads"):
        for file in os.listdir("uploads"):
            file_path = os.path.join("uploads", file)
            if os.path.exists(file_path):
                os.remove(file_path)

    # Format the file list for display
    file_list_text = "\n".join(
        [f"- **{file['name']}**" + (f" ([Link]({file['url']}))" if 'url' in file else "") for file in uploaded_files]
    )
    
    return "Content uploaded and processed successfully", file_list_text

def ask_question(question):
    try:
        embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
        indexed_data = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
        docs = indexed_data.similarity_search(question)

        prompt_template = """
        Your alias is NeuralChat. Your task is to provide a thorough response based on the given context, ensuring all relevant details are included.
        If the requested information isn't available, simply state, "answer not available in context," then answer based on your understanding, connecting with the context.
        Don't provide incorrect information.\n\n
        Context: \n {context}?\n
        Question: \n {question}\n
        Answer:
        """

        chain = setup_conversation_chain(prompt_template)
        response = chain({"input_documents": docs, "question": question}, return_only_outputs=True)

        return response["output_text"]
    except Exception as e:
        return f"An error occurred: {str(e)}"

def chat(message, history):
    response = ask_question(message)
    history.append((message, response))
    return history, ""

theme = gr.themes.Monochrome().set(
    button_primary_background_fill="#FF0000",
    button_primary_background_fill_hover="#FF0000",
)

# Gradio interface
with gr.Blocks(theme=theme) as demo:
    gr.Markdown("# NeuralChat", elem_id="header")

    with gr.Row():
        with gr.Column(scale=2):
            files = gr.File(label="Upload PDF Files", file_count="multiple")
            youtube_url = gr.Textbox(label="YouTube URL")
            upload_button = gr.Button("Upload and Process")
            upload_output = gr.Textbox(label="Upload Status")
            file_list = gr.Markdown(label="Uploaded Files")

        with gr.Column(scale=5):
            chatbot = gr.Chatbot(show_copy_button=True, scale=1.5)
            msg = gr.Textbox(label="Ask a question", lines=1)
            upload_button.click(process_files, inputs=[files, youtube_url], outputs=[upload_output, file_list])
            msg.submit(chat, inputs=[msg, chatbot], outputs=[chatbot, msg])

if __name__ == "__main__":
    demo.launch()