|
import os |
|
from typing import Dict, Any, List |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_core.documents import Document |
|
import json |
|
|
|
|
|
os.environ["LANGCHAIN_TRACING_V2"] = "true" |
|
|
|
DB_DIR = "db/" |
|
if not os.path.exists(DB_DIR): |
|
os.makedirs(DB_DIR) |
|
|
|
|
|
def timestamp_to_seconds(timestamp): |
|
"""Convert a timestamp in the format 'hh:mm:ss' or 'mm:ss' to total seconds.""" |
|
parts = timestamp.split(':') |
|
if len(parts) == 3: |
|
h, m, s = map(int, parts) |
|
ts = h * 3600 + m * 60 + s |
|
elif len(parts) == 2: |
|
m, s = map(int, parts) |
|
ts = m * 60 + s |
|
else: |
|
raise ValueError(f"Invalid timestamp format: {timestamp}") |
|
|
|
return ts |
|
|
|
|
|
class FAISSAIAssistant: |
|
def __init__(self, index_name: str = "faiss_index"): |
|
self.index_name = f"{DB_DIR}{index_name}.faiss" |
|
model_name = "sentence-transformers/all-mpnet-base-v2" |
|
model_kwargs = {'device': 'cpu'} |
|
encode_kwargs = {'normalize_embeddings': False} |
|
self.embeddings = HuggingFaceEmbeddings( |
|
model_name=model_name, |
|
model_kwargs=model_kwargs, |
|
encode_kwargs=encode_kwargs) |
|
self.vector_store = self._create_app() |
|
|
|
def _create_app(self): |
|
if os.path.exists(self.index_name): |
|
print("Loading existing FAISS index...") |
|
return FAISS.load_local(self.index_name, self.embeddings, |
|
allow_dangerous_deserialization=True) |
|
else: |
|
print("Creating new FAISS index...") |
|
|
|
initial_texts = [ |
|
"This is an initial document to create the FAISS index."] |
|
return FAISS.from_texts(initial_texts, self.embeddings) |
|
|
|
def add_to_knowledge_base(self, data: str, data_type: str = None, metadata: Dict[str, Any] = None) -> None: |
|
doc = Document(page_content=data, metadata=metadata or {}) |
|
self.vector_store.add_documents([doc]) |
|
|
|
def query(self, question: str, num_results: int = 30, filters: Dict[str, List[str]] = None) -> str: |
|
all_docs = self.list_documents() |
|
|
|
def match_any_filter(doc_metadata, filters): |
|
if not filters: |
|
return True |
|
for key, values in filters.items(): |
|
if key not in doc_metadata: |
|
return False |
|
doc_value = doc_metadata[key] |
|
if isinstance(doc_value, list): |
|
|
|
if not any(item in values for item in doc_value): |
|
return False |
|
else: |
|
|
|
if doc_value not in values: |
|
return False |
|
return True |
|
|
|
filtered_docs = [ |
|
doc for doc in all_docs |
|
if match_any_filter(doc['metadata'], filters) |
|
] |
|
|
|
|
|
filtered_docs = filtered_docs[:num_results] |
|
|
|
for doc in filtered_docs: |
|
metadata = doc['metadata'] |
|
st_ts = timestamp_to_seconds(metadata['start_timestamp']) |
|
end_ts = timestamp_to_seconds(metadata['end_timestamp']) |
|
st_ts = "0" if st_ts == 0 else st_ts+1 |
|
end_url = "" if end_ts == 0 else f"&end={end_ts+1}" |
|
|
|
yt_url = ( |
|
f"https://youtube.com/embed/{metadata['youtube_id']}" |
|
f"?start={st_ts}&{end_url}&autoplay=1&rel=0" |
|
) |
|
metadata['play'] = yt_url |
|
|
|
return json.dumps(filtered_docs) |
|
|
|
answer = ( |
|
f"Here are the top {len(filtered_docs)}" |
|
" documents matching the filter:\n\n" |
|
) |
|
for i, doc in enumerate(filtered_docs, 1): |
|
metadata = doc['metadata'] |
|
st_ts = timestamp_to_seconds(metadata['start_timestamp']) |
|
end_ts = timestamp_to_seconds(metadata['end_timestamp']) |
|
end_url = "" if end_ts == 0 else f"&end={end_ts+2}" |
|
|
|
yt_url = ( |
|
f"https://youtube.com/embed/{metadata['youtube_id']}" |
|
f"?start={st_ts}&{end_url}&autoplay=1&rel=0" |
|
) |
|
|
|
speaker_info = ( |
|
f"Speaker: {metadata.get('speaker', 'Unknown')}, " |
|
f"Company: {metadata.get('company', 'Unknown')}, " |
|
f"Timestamp: {metadata.get('start_timestamp', 'Unknown')}" |
|
f" - {metadata.get('end_timestamp', 'Unknown')}" |
|
) |
|
|
|
answer += f"{i}. [Speaker Info: {speaker_info}]({yt_url}) \n" |
|
answer += f"{metadata.get('title', 'Unknown')} \n" |
|
answer += f"\"{doc['content']}\" \n\n" |
|
|
|
return answer |
|
|
|
def save(self): |
|
self.vector_store.save_local(self.index_name) |
|
print("FAISS index saved.") |
|
|
|
def list_documents(self) -> List[Dict[str, Any]]: |
|
""" |
|
List all documents in the FAISS vectorstore. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: A list of dictionaries, each containing 'content' and 'metadata' of a document. |
|
""" |
|
documents = [] |
|
for doc_id, doc in self.vector_store.docstore._dict.items(): |
|
documents.append({ |
|
'id': doc_id, |
|
'content': doc.page_content, |
|
'metadata': doc.metadata |
|
}) |
|
return documents |
|
|
|
|
|
|
|
|
|
def get_ai_assistant(index_name: str = "faiss_index") -> FAISSAIAssistant: |
|
return FAISSAIAssistant(index_name) |
|
|