File size: 5,665 Bytes
0540b53
 
 
 
 
bea81c7
0540b53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
761bf67
 
 
 
 
 
 
 
 
 
 
 
0540b53
 
 
 
 
 
 
 
 
bea81c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a74e166
 
 
 
0540b53
 
 
ac4cbcd
fbcabac
ac4cbcd
194d718
 
 
 
0540b53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
from typing import Dict, Any, List
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
import json


os.environ["LANGCHAIN_TRACING_V2"] = "true"

DB_DIR = "db/"
if not os.path.exists(DB_DIR):
    os.makedirs(DB_DIR)


def timestamp_to_seconds(timestamp):
    """Convert a timestamp in the format 'hh:mm:ss' or 'mm:ss' to total seconds."""
    parts = timestamp.split(':')
    if len(parts) == 3:
        h, m, s = map(int, parts)
        ts = h * 3600 + m * 60 + s
    elif len(parts) == 2:
        m, s = map(int, parts)
        ts = m * 60 + s
    else:
        raise ValueError(f"Invalid timestamp format: {timestamp}")

    return ts


class FAISSAIAssistant:
    def __init__(self, index_name: str = "faiss_index"):
        self.index_name = f"{DB_DIR}{index_name}.faiss"
        model_name = "sentence-transformers/all-mpnet-base-v2"
        model_kwargs = {'device': 'cpu'}
        encode_kwargs = {'normalize_embeddings': False}
        self.embeddings = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs)
        self.vector_store = self._create_app()

    def _create_app(self):
        if os.path.exists(self.index_name):
            print("Loading existing FAISS index...")
            return FAISS.load_local(self.index_name, self.embeddings,
                                    allow_dangerous_deserialization=True)
        else:
            print("Creating new FAISS index...")
            # Create an initial document with placeholder text
            initial_texts = [
                "This is an initial document to create the FAISS index."]
            return FAISS.from_texts(initial_texts, self.embeddings)

    def add_to_knowledge_base(self, data: str, data_type: str = None, metadata: Dict[str, Any] = None) -> None:
        doc = Document(page_content=data, metadata=metadata or {})
        self.vector_store.add_documents([doc])

    def query(self, question: str, num_results: int = 30, filters: Dict[str, List[str]] = None) -> str:
        all_docs = self.list_documents()

        def match_any_filter(doc_metadata, filters):
            if not filters:
                return True
            for key, values in filters.items():
                if key not in doc_metadata:
                    return False
                doc_value = doc_metadata[key]
                if isinstance(doc_value, list):
                    # If doc_value is a list, check if any item in doc_value is in values
                    if not any(item in values for item in doc_value):
                        return False
                else:
                    # If doc_value is a single string, check if it's in values
                    if doc_value not in values:
                        return False
            return True

        filtered_docs = [
            doc for doc in all_docs
            if match_any_filter(doc['metadata'], filters)
        ]

        # Limit the number of results to num_results
        filtered_docs = filtered_docs[:num_results]

        for doc in filtered_docs:
            metadata = doc['metadata']
            st_ts = timestamp_to_seconds(metadata['start_timestamp'])
            end_ts = timestamp_to_seconds(metadata['end_timestamp'])
            st_ts = "0" if st_ts == 0 else st_ts+1
            end_url = "" if end_ts == 0 else f"&end={end_ts+1}"

            yt_url = (
                f"https://youtube.com/embed/{metadata['youtube_id']}"
                f"?start={st_ts}&{end_url}&autoplay=1&rel=0"
            )
            metadata['play'] = yt_url

        return json.dumps(filtered_docs)

        answer = (
            f"Here are the top {len(filtered_docs)}"
            " documents matching the filter:\n\n"
        )
        for i, doc in enumerate(filtered_docs, 1):
            metadata = doc['metadata']
            st_ts = timestamp_to_seconds(metadata['start_timestamp'])
            end_ts = timestamp_to_seconds(metadata['end_timestamp'])
            end_url = "" if end_ts == 0 else f"&end={end_ts+2}"

            yt_url = (
                f"https://youtube.com/embed/{metadata['youtube_id']}"
                f"?start={st_ts}&{end_url}&autoplay=1&rel=0"
            )

            speaker_info = (
                f"Speaker: {metadata.get('speaker', 'Unknown')}, "
                f"Company: {metadata.get('company', 'Unknown')}, "
                f"Timestamp: {metadata.get('start_timestamp', 'Unknown')}"
                f" - {metadata.get('end_timestamp', 'Unknown')}"
            )

            answer += f"{i}. [Speaker Info: {speaker_info}]({yt_url})  \n"
            answer += f"{metadata.get('title', 'Unknown')}  \n"
            answer += f"\"{doc['content']}\"  \n\n"

        return answer

    def save(self):
        self.vector_store.save_local(self.index_name)
        print("FAISS index saved.")

    def list_documents(self) -> List[Dict[str, Any]]:
        """
        List all documents in the FAISS vectorstore.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries, each containing 'content' and 'metadata' of a document.
        """
        documents = []
        for doc_id, doc in self.vector_store.docstore._dict.items():
            documents.append({
                'id': doc_id,
                'content': doc.page_content,
                'metadata': doc.metadata
            })
        return documents

# Usage example


def get_ai_assistant(index_name: str = "faiss_index") -> FAISSAIAssistant:
    return FAISSAIAssistant(index_name)