Spaces:
Sleeping
Sleeping
import streamlit as st | |
from langchain_core.messages import AIMessage, HumanMessage | |
from langchain_openai import ChatOpenAI | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.prompts import ChatPromptTemplate | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from tavily import TavilyClient | |
from streamlit_pdf_viewer import pdf_viewer | |
import hashlib | |
import io | |
import os | |
import pickle | |
import tempfile | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
import getpass | |
# Initialize API keys | |
google_api_key = 'AIzaSyDiZjRdBVZNqmhCQHnqDjz_fjgdfARyZp4' | |
tvly_api_key = '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' | |
openai_api_key = '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' | |
# Validate API keys | |
if not all([google_api_key, tvly_api_key, openai_api_key]): | |
st.error("Please set up your API keys.") | |
st.stop() | |
# Initialize Tavily client | |
web_tool_search = TavilyClient(api_key=tvly_api_key) | |
# Set up Streamlit page | |
st.set_page_config(page_title="AI Professor", page_icon="๐จโ๐ซ") | |
st.title("๐จโ๐ซ AI Professor") | |
# Authentication function for Google Drive | |
SCOPES = ['https://www.googleapis.com/auth/drive.file'] | |
def authenticate_google_drive(): | |
creds = None | |
if os.path.exists('token.pickle'): | |
with open('token.pickle', 'rb') as token: | |
creds = pickle.load(token) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'credentials.json', SCOPES) | |
creds = flow.run_local_server(port=0) | |
with open('token.pickle', 'wb') as token: | |
pickle.dump(creds, token) | |
return build('drive', 'v3', credentials=creds) | |
def upload_to_drive(content, filename="conversation.txt"): | |
service = authenticate_google_drive() | |
file_metadata = {'name': filename} | |
media = MediaFileUpload(filename, mimetype='text/plain') | |
with open(filename, 'w') as f: | |
f.write(content) | |
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
st.success(f"Conversation uploaded to Google Drive! File ID: {file.get('id')}") | |
return file.get('id') | |
# Simple login system | |
def login(): | |
username = st.text_input("Username", "") | |
password = st.text_input("Password", "", type="password") | |
if st.button("Login"): | |
if username == "admin" and password == "password123": | |
st.session_state.logged_in = True | |
st.success("Login successful!") | |
else: | |
st.session_state.logged_in = False | |
st.error("Invalid credentials. Please try again.") | |
# Initialize session state variables | |
if "logged_in" not in st.session_state: | |
st.session_state.logged_in = False | |
if not st.session_state.logged_in: | |
login() | |
def get_pdf_text(pdf_docs): | |
text = "" | |
if isinstance(pdf_docs, list): | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
else: | |
pdf_reader = PdfReader(pdf_docs) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
def get_text_chunks(text): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
def get_vector_store(text_chunks): | |
try: | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=google_api_key) | |
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) | |
return vector_store | |
except Exception as e: | |
st.error(f"Error creating vector store: {str(e)}") | |
return None | |
def get_response(user_query, chat_history, vector_store): | |
if vector_store is None: | |
return "Please upload a PDF document first." | |
template = """ | |
You are a helpful assistant. Answer the following questions considering the history of the conversation and the document provided: | |
Context: {context} | |
Chat history: {chat_history} | |
User question: {user_question} | |
""" | |
prompt = ChatPromptTemplate.from_template(template) | |
try: | |
llm = ChatOpenAI( | |
base_url="https://api.groq.com/openai/v1", | |
api_key=openai_api_key, | |
model_name="gpt-4o-mini", | |
temperature=1, | |
max_tokens=1024 | |
) | |
docs = vector_store.similarity_search(user_query) | |
context = "\n".join(doc.page_content for doc in docs) | |
chain = prompt | llm | StrOutputParser() | |
return chain.invoke({ | |
"context": context, | |
"chat_history": chat_history, | |
"user_question": user_query, | |
}) | |
except Exception as e: | |
return f"Error generating response: {str(e)}" | |
def get_youtube_url(query): | |
try: | |
response = web_tool_search.search( | |
query=query, | |
search_depth="basic", | |
include_domains=["youtube.com"], | |
max_results=1 | |
) | |
for result in response['results']: | |
if 'youtube.com/watch' in result['url']: | |
return result['url'] | |
return None | |
except Exception as e: | |
st.error(f"Error searching for video: {str(e)}") | |
return None | |
def get_pdfs_hash(pdf_docs): | |
combined_hash = hashlib.md5() | |
if isinstance(pdf_docs, list): | |
for pdf in pdf_docs: | |
content = pdf.read() | |
combined_hash.update(content) | |
pdf.seek(0) | |
else: | |
content = pdf_docs.read() | |
combined_hash.update(content) | |
pdf_docs.seek(0) | |
return combined_hash.hexdigest() | |
# If logged in, continue with the chatbot functionality | |
if st.session_state.logged_in: | |
# Initialize session state variables | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [ | |
AIMessage(content="Hello, I am Chatbot professor assistant. How can I help you?") | |
] | |
if "vector_store" not in st.session_state: | |
st.session_state.vector_store = None | |
# Sidebar for PDF upload and settings | |
with st.sidebar: | |
st.title("Menu:") | |
pdf_docs = st.file_uploader("Upload your PDF Files", accept_multiple_files=False) | |
quiz_button = st.button("๐๏ธ Make a quiz") | |
video_button = st.button("๐บ Search a video") | |
view = st.toggle("๐๏ธ View PDF") | |
if view and pdf_docs: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
temp_file.write(pdf_docs.read()) | |
temp_pdf_path = temp_file.name | |
pdf_viewer(temp_pdf_path, width=800) | |
# Display chat history | |
for message in st.session_state.chat_history: | |
if isinstance(message, AIMessage): | |
with st.chat_message("AI"): | |
st.write(message.content) | |
elif isinstance(message, HumanMessage): | |
with st.chat_message("Human"): | |
st.write(message.content) | |
# Process PDF upload | |
if pdf_docs: | |
# Convert PDF to text and split into chunks for embedding | |
text = get_pdf_text(pdf_docs) | |
text_chunks = get_text_chunks(text) | |
st.session_state.vector_store = get_vector_store(text_chunks) | |
st.success("Document uploaded and ready for conversation.") | |
# Process user query | |
user_query = st.chat_input("Type your message here...") | |
if user_query: | |
st.session_state.chat_history.append(HumanMessage(content=user_query)) | |
with st.chat_message("Human"): | |
st.write(user_query) | |
response = get_response(user_query, st.session_state.chat_history, st.session_state.vector_store) | |
st.session_state.chat_history.append(AIMessage(content=response)) | |
with st.chat_message("AI"): | |
st.write(response) | |
# Upload conversation to Google Drive | |
# upload_to_drive("".join([msg.content for msg in st.session_state.chat_history]), "chat_conversation.txt") | |
# Handle quiz generation | |
if quiz_button: | |
with st.spinner("Generating quiz..."): | |
quiz_prompt = """ | |
Based on the document content, create a quiz with 5 multiple choice questions. | |
Format each question like this: | |
Question X: | |
**A)** Answer 1 | |
**B)** Answer 2 | |
**C)** Answer 3 | |
**D)** Answer 4 | |
""" | |
response = get_response(quiz_prompt, st.session_state.chat_history, st.session_state.vector_store) | |
st.write(response) | |
st.session_state.chat_history.append(AIMessage(content=response)) | |
# Handle video search | |
if video_button: | |
with st.spinner("Searching for relevant video..."): | |
video_prompt = """ | |
Extract the main topic and key concepts from the document and the last conversation. | |
""" | |
response = get_response(video_prompt, st.session_state.chat_history, st.session_state.vector_store) | |
youtube_url = get_youtube_url(f"Course on {response}") | |
if youtube_url: | |
st.write(f"๐บ Here's a video about {response}: {youtube_url}") | |
st.video(youtube_url) | |
video_message = f"๐บ Here's a video about {response}:\n{youtube_url}" | |
st.session_state.chat_history.append(AIMessage(content=video_message)) |