|  |  | 
					
						
						|  |  | 
					
						
						|  | from typing import Dict, List, Optional, Tuple | 
					
						
						|  | import os | 
					
						
						|  | import numpy as np | 
					
						
						|  | import pandas as pd | 
					
						
						|  | import umap | 
					
						
						|  | from langchain_core.prompts import ChatPromptTemplate | 
					
						
						|  | from langchain_core.output_parsers import StrOutputParser | 
					
						
						|  | from sklearn.mixture import GaussianMixture | 
					
						
						|  | from langchain_community.chat_models import ChatOpenAI | 
					
						
						|  | from langchain_community.vectorstores import FAISS | 
					
						
						|  | from langchain.text_splitter import RecursiveCharacterTextSplitter | 
					
						
						|  | from modules.vectorstore.base import VectorStoreBase | 
					
						
						|  |  | 
					
						
						|  | RANDOM_SEED = 42 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class RAPTORVectoreStore(VectorStoreBase): | 
					
						
						|  | def __init__(self, config, documents=[], text_splitter=None, embedding_model=None): | 
					
						
						|  | self.documents = documents | 
					
						
						|  | self.config = config | 
					
						
						|  | self.text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( | 
					
						
						|  | chunk_size=self.config["splitter_options"]["chunk_size"], | 
					
						
						|  | chunk_overlap=self.config["splitter_options"]["chunk_overlap"], | 
					
						
						|  | separators=self.config["splitter_options"]["chunk_separators"], | 
					
						
						|  | disallowed_special=(), | 
					
						
						|  | ) | 
					
						
						|  | self.embd = embedding_model | 
					
						
						|  | self.model = ChatOpenAI( | 
					
						
						|  | model="gpt-3.5-turbo", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def concat_documents(self, documents): | 
					
						
						|  | d_sorted = sorted(documents, key=lambda x: x.metadata["source"]) | 
					
						
						|  | d_reversed = list(reversed(d_sorted)) | 
					
						
						|  | concatenated_content = "\n\n\n --- \n\n\n".join( | 
					
						
						|  | [doc.page_content for doc in d_reversed] | 
					
						
						|  | ) | 
					
						
						|  | return concatenated_content | 
					
						
						|  |  | 
					
						
						|  | def split_documents(self, documents): | 
					
						
						|  | concatenated_content = self.concat_documents(documents) | 
					
						
						|  | texts_split = self.text_splitter.split_text(concatenated_content) | 
					
						
						|  | return texts_split | 
					
						
						|  |  | 
					
						
						|  | def add_documents(self, documents): | 
					
						
						|  | self.documents.extend(documents) | 
					
						
						|  |  | 
					
						
						|  | def global_cluster_embeddings( | 
					
						
						|  | self, | 
					
						
						|  | embeddings: np.ndarray, | 
					
						
						|  | dim: int, | 
					
						
						|  | n_neighbors: Optional[int] = None, | 
					
						
						|  | metric: str = "cosine", | 
					
						
						|  | ) -> np.ndarray: | 
					
						
						|  | """ | 
					
						
						|  | Perform global dimensionality reduction on the embeddings using UMAP. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - embeddings: The input embeddings as a numpy array. | 
					
						
						|  | - dim: The target dimensionality for the reduced space. | 
					
						
						|  | - n_neighbors: Optional; the number of neighbors to consider for each point. | 
					
						
						|  | If not provided, it defaults to the square root of the number of embeddings. | 
					
						
						|  | - metric: The distance metric to use for UMAP. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A numpy array of the embeddings reduced to the specified dimensionality. | 
					
						
						|  | """ | 
					
						
						|  | if n_neighbors is None: | 
					
						
						|  | n_neighbors = int((len(embeddings) - 1) ** 0.5) | 
					
						
						|  | return umap.UMAP( | 
					
						
						|  | n_neighbors=n_neighbors, n_components=dim, metric=metric | 
					
						
						|  | ).fit_transform(embeddings) | 
					
						
						|  |  | 
					
						
						|  | def local_cluster_embeddings( | 
					
						
						|  | self, | 
					
						
						|  | embeddings: np.ndarray, | 
					
						
						|  | dim: int, | 
					
						
						|  | num_neighbors: int = 10, | 
					
						
						|  | metric: str = "cosine", | 
					
						
						|  | ) -> np.ndarray: | 
					
						
						|  | """ | 
					
						
						|  | Perform local dimensionality reduction on the embeddings using UMAP, typically after global clustering. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - embeddings: The input embeddings as a numpy array. | 
					
						
						|  | - dim: The target dimensionality for the reduced space. | 
					
						
						|  | - num_neighbors: The number of neighbors to consider for each point. | 
					
						
						|  | - metric: The distance metric to use for UMAP. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A numpy array of the embeddings reduced to the specified dimensionality. | 
					
						
						|  | """ | 
					
						
						|  | return umap.UMAP( | 
					
						
						|  | n_neighbors=num_neighbors, n_components=dim, metric=metric | 
					
						
						|  | ).fit_transform(embeddings) | 
					
						
						|  |  | 
					
						
						|  | def get_optimal_clusters( | 
					
						
						|  | self, | 
					
						
						|  | embeddings: np.ndarray, | 
					
						
						|  | max_clusters: int = 50, | 
					
						
						|  | random_state: int = RANDOM_SEED, | 
					
						
						|  | ) -> int: | 
					
						
						|  | """ | 
					
						
						|  | Determine the optimal number of clusters using the Bayesian Information Criterion (BIC) with a Gaussian Mixture Model. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - embeddings: The input embeddings as a numpy array. | 
					
						
						|  | - max_clusters: The maximum number of clusters to consider. | 
					
						
						|  | - random_state: Seed for reproducibility. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - An integer representing the optimal number of clusters found. | 
					
						
						|  | """ | 
					
						
						|  | max_clusters = min(max_clusters, len(embeddings)) | 
					
						
						|  | n_clusters = np.arange(1, max_clusters) | 
					
						
						|  | bics = [] | 
					
						
						|  | for n in n_clusters: | 
					
						
						|  | gm = GaussianMixture(n_components=n, random_state=random_state) | 
					
						
						|  | gm.fit(embeddings) | 
					
						
						|  | bics.append(gm.bic(embeddings)) | 
					
						
						|  | return n_clusters[np.argmin(bics)] | 
					
						
						|  |  | 
					
						
						|  | def GMM_cluster( | 
					
						
						|  | self, embeddings: np.ndarray, threshold: float, random_state: int = 0 | 
					
						
						|  | ): | 
					
						
						|  | """ | 
					
						
						|  | Cluster embeddings using a Gaussian Mixture Model (GMM) based on a probability threshold. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - embeddings: The input embeddings as a numpy array. | 
					
						
						|  | - threshold: The probability threshold for assigning an embedding to a cluster. | 
					
						
						|  | - random_state: Seed for reproducibility. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A tuple containing the cluster labels and the number of clusters determined. | 
					
						
						|  | """ | 
					
						
						|  | n_clusters = self.get_optimal_clusters(embeddings) | 
					
						
						|  | gm = GaussianMixture(n_components=n_clusters, random_state=random_state) | 
					
						
						|  | gm.fit(embeddings) | 
					
						
						|  | probs = gm.predict_proba(embeddings) | 
					
						
						|  | labels = [np.where(prob > threshold)[0] for prob in probs] | 
					
						
						|  | return labels, n_clusters | 
					
						
						|  |  | 
					
						
						|  | def perform_clustering( | 
					
						
						|  | self, | 
					
						
						|  | embeddings: np.ndarray, | 
					
						
						|  | dim: int, | 
					
						
						|  | threshold: float, | 
					
						
						|  | ) -> List[np.ndarray]: | 
					
						
						|  | """ | 
					
						
						|  | Perform clustering on the embeddings by first reducing their dimensionality globally, then clustering | 
					
						
						|  | using a Gaussian Mixture Model, and finally performing local clustering within each global cluster. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - embeddings: The input embeddings as a numpy array. | 
					
						
						|  | - dim: The target dimensionality for UMAP reduction. | 
					
						
						|  | - threshold: The probability threshold for assigning an embedding to a cluster in GMM. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A list of numpy arrays, where each array contains the cluster IDs for each embedding. | 
					
						
						|  | """ | 
					
						
						|  | if len(embeddings) <= dim + 1: | 
					
						
						|  |  | 
					
						
						|  | return [np.array([0]) for _ in range(len(embeddings))] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | reduced_embeddings_global = self.global_cluster_embeddings(embeddings, dim) | 
					
						
						|  |  | 
					
						
						|  | global_clusters, n_global_clusters = self.GMM_cluster( | 
					
						
						|  | reduced_embeddings_global, threshold | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | all_local_clusters = [np.array([]) for _ in range(len(embeddings))] | 
					
						
						|  | total_clusters = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for i in range(n_global_clusters): | 
					
						
						|  |  | 
					
						
						|  | global_cluster_embeddings_ = embeddings[ | 
					
						
						|  | np.array([i in gc for gc in global_clusters]) | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | if len(global_cluster_embeddings_) == 0: | 
					
						
						|  | continue | 
					
						
						|  | if len(global_cluster_embeddings_) <= dim + 1: | 
					
						
						|  |  | 
					
						
						|  | local_clusters = [np.array([0]) for _ in global_cluster_embeddings_] | 
					
						
						|  | n_local_clusters = 1 | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | reduced_embeddings_local = self.local_cluster_embeddings( | 
					
						
						|  | global_cluster_embeddings_, dim | 
					
						
						|  | ) | 
					
						
						|  | local_clusters, n_local_clusters = self.GMM_cluster( | 
					
						
						|  | reduced_embeddings_local, threshold | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for j in range(n_local_clusters): | 
					
						
						|  | local_cluster_embeddings_ = global_cluster_embeddings_[ | 
					
						
						|  | np.array([j in lc for lc in local_clusters]) | 
					
						
						|  | ] | 
					
						
						|  | indices = np.where( | 
					
						
						|  | (embeddings == local_cluster_embeddings_[:, None]).all(-1) | 
					
						
						|  | )[1] | 
					
						
						|  | for idx in indices: | 
					
						
						|  | all_local_clusters[idx] = np.append( | 
					
						
						|  | all_local_clusters[idx], j + total_clusters | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | total_clusters += n_local_clusters | 
					
						
						|  |  | 
					
						
						|  | return all_local_clusters | 
					
						
						|  |  | 
					
						
						|  | def embed(self, texts): | 
					
						
						|  | """ | 
					
						
						|  | Generate embeddings for a list of text documents. | 
					
						
						|  |  | 
					
						
						|  | This function assumes the existence of an `embd` object with a method `embed_documents` | 
					
						
						|  | that takes a list of texts and returns their embeddings. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - texts: List[str], a list of text documents to be embedded. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - numpy.ndarray: An array of embeddings for the given text documents. | 
					
						
						|  | """ | 
					
						
						|  | text_embeddings = self.embd.embed_documents(texts) | 
					
						
						|  | text_embeddings_np = np.array(text_embeddings) | 
					
						
						|  | return text_embeddings_np | 
					
						
						|  |  | 
					
						
						|  | def embed_cluster_texts(self, texts): | 
					
						
						|  | """ | 
					
						
						|  | Embeds a list of texts and clusters them, returning a DataFrame with texts, their embeddings, and cluster labels. | 
					
						
						|  |  | 
					
						
						|  | This function combines embedding generation and clustering into a single step. It assumes the existence | 
					
						
						|  | of a previously defined `perform_clustering` function that performs clustering on the embeddings. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - texts: List[str], a list of text documents to be processed. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - pandas.DataFrame: A DataFrame containing the original texts, their embeddings, and the assigned cluster labels. | 
					
						
						|  | """ | 
					
						
						|  | text_embeddings_np = self.embed(texts) | 
					
						
						|  | cluster_labels = self.perform_clustering( | 
					
						
						|  | text_embeddings_np, 10, 0.1 | 
					
						
						|  | ) | 
					
						
						|  | df = pd.DataFrame() | 
					
						
						|  | df["text"] = texts | 
					
						
						|  | df["embd"] = list( | 
					
						
						|  | text_embeddings_np | 
					
						
						|  | ) | 
					
						
						|  | df["cluster"] = cluster_labels | 
					
						
						|  | return df | 
					
						
						|  |  | 
					
						
						|  | def fmt_txt(self, df: pd.DataFrame) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Formats the text documents in a DataFrame into a single string. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - df: DataFrame containing the 'text' column with text documents to format. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A single string where all text documents are joined by a specific delimiter. | 
					
						
						|  | """ | 
					
						
						|  | unique_txt = df["text"].tolist() | 
					
						
						|  | return "--- --- \n --- --- ".join(unique_txt) | 
					
						
						|  |  | 
					
						
						|  | def embed_cluster_summarize_texts( | 
					
						
						|  | self, texts: List[str], level: int | 
					
						
						|  | ) -> Tuple[pd.DataFrame, pd.DataFrame]: | 
					
						
						|  | """ | 
					
						
						|  | Embeds, clusters, and summarizes a list of texts. This function first generates embeddings for the texts, | 
					
						
						|  | clusters them based on similarity, expands the cluster assignments for easier processing, and then summarizes | 
					
						
						|  | the content within each cluster. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - texts: A list of text documents to be processed. | 
					
						
						|  | - level: An integer parameter that could define the depth or detail of processing. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - Tuple containing two DataFrames: | 
					
						
						|  | 1. The first DataFrame (`df_clusters`) includes the original texts, their embeddings, and cluster assignments. | 
					
						
						|  | 2. The second DataFrame (`df_summary`) contains summaries for each cluster, the specified level of detail, | 
					
						
						|  | and the cluster identifiers. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_clusters = self.embed_cluster_texts(texts) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | expanded_list = [] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for index, row in df_clusters.iterrows(): | 
					
						
						|  | for cluster in row["cluster"]: | 
					
						
						|  | expanded_list.append( | 
					
						
						|  | {"text": row["text"], "embd": row["embd"], "cluster": cluster} | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | expanded_df = pd.DataFrame(expanded_list) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | all_clusters = expanded_df["cluster"].unique() | 
					
						
						|  |  | 
					
						
						|  | print(f"--Generated {len(all_clusters)} clusters--") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | template = """Here is content from the course DS598: Deep Learning for Data Science. | 
					
						
						|  |  | 
					
						
						|  | The content may be form webapge about the course, or lecture content, or any other relevant information. | 
					
						
						|  | If the content is in bullet points (from  pdf lectre slides), you can summarize the bullet points. | 
					
						
						|  |  | 
					
						
						|  | Give a detailed summary of the content below. | 
					
						
						|  |  | 
					
						
						|  | Documentation: | 
					
						
						|  | {context} | 
					
						
						|  | """ | 
					
						
						|  | prompt = ChatPromptTemplate.from_template(template) | 
					
						
						|  | chain = prompt | self.model | StrOutputParser() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | summaries = [] | 
					
						
						|  | for i in all_clusters: | 
					
						
						|  | df_cluster = expanded_df[expanded_df["cluster"] == i] | 
					
						
						|  | formatted_txt = self.fmt_txt(df_cluster) | 
					
						
						|  | summaries.append(chain.invoke({"context": formatted_txt})) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_summary = pd.DataFrame( | 
					
						
						|  | { | 
					
						
						|  | "summaries": summaries, | 
					
						
						|  | "level": [level] * len(summaries), | 
					
						
						|  | "cluster": list(all_clusters), | 
					
						
						|  | } | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return df_clusters, df_summary | 
					
						
						|  |  | 
					
						
						|  | def recursive_embed_cluster_summarize( | 
					
						
						|  | self, texts: List[str], level: int = 1, n_levels: int = 3 | 
					
						
						|  | ) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]: | 
					
						
						|  | """ | 
					
						
						|  | Recursively embeds, clusters, and summarizes texts up to a specified level or until | 
					
						
						|  | the number of unique clusters becomes 1, storing the results at each level. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - texts: List[str], texts to be processed. | 
					
						
						|  | - level: int, current recursion level (starts at 1). | 
					
						
						|  | - n_levels: int, maximum depth of recursion. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion | 
					
						
						|  | levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level. | 
					
						
						|  | """ | 
					
						
						|  | results = {} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df_clusters, df_summary = self.embed_cluster_summarize_texts(texts, level) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | results[level] = (df_clusters, df_summary) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | unique_clusters = df_summary["cluster"].nunique() | 
					
						
						|  | if level < n_levels and unique_clusters > 1: | 
					
						
						|  |  | 
					
						
						|  | new_texts = df_summary["summaries"].tolist() | 
					
						
						|  | next_level_results = self.recursive_embed_cluster_summarize( | 
					
						
						|  | new_texts, level + 1, n_levels | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | results.update(next_level_results) | 
					
						
						|  |  | 
					
						
						|  | return results | 
					
						
						|  |  | 
					
						
						|  | def get_vector_db(self): | 
					
						
						|  | """ | 
					
						
						|  | Generate a retriever object from a list of documents. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | - documents: List of document objects. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | - A retriever object. | 
					
						
						|  | """ | 
					
						
						|  | leaf_texts = self.split_documents(self.documents) | 
					
						
						|  | results = self.recursive_embed_cluster_summarize( | 
					
						
						|  | leaf_texts, level=1, n_levels=10 | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | all_texts = leaf_texts.copy() | 
					
						
						|  |  | 
					
						
						|  | for level in sorted(results.keys()): | 
					
						
						|  |  | 
					
						
						|  | summaries = results[level][1]["summaries"].tolist() | 
					
						
						|  |  | 
					
						
						|  | all_texts.extend(summaries) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | vectorstore = FAISS.from_texts(texts=all_texts, embedding=self.embd) | 
					
						
						|  | return vectorstore | 
					
						
						|  |  | 
					
						
						|  | def create_database(self, documents, embedding_model): | 
					
						
						|  | self.documents = documents | 
					
						
						|  | self.embd = embedding_model | 
					
						
						|  | self.vectorstore = self.get_vector_db() | 
					
						
						|  | self.vectorstore.save_local( | 
					
						
						|  | os.path.join( | 
					
						
						|  | self.config["vectorstore"]["db_path"], | 
					
						
						|  | "db_" | 
					
						
						|  | + self.config["vectorstore"]["db_option"] | 
					
						
						|  | + "_" | 
					
						
						|  | + self.config["vectorstore"]["model"], | 
					
						
						|  | ) | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def load_database(self, embedding_model): | 
					
						
						|  | self.vectorstore = FAISS.load_local( | 
					
						
						|  | os.path.join( | 
					
						
						|  | self.config["vectorstore"]["db_path"], | 
					
						
						|  | "db_" | 
					
						
						|  | + self.config["vectorstore"]["db_option"] | 
					
						
						|  | + "_" | 
					
						
						|  | + self.config["vectorstore"]["model"], | 
					
						
						|  | ), | 
					
						
						|  | embedding_model, | 
					
						
						|  | allow_dangerous_deserialization=True, | 
					
						
						|  | ) | 
					
						
						|  | return self.vectorstore | 
					
						
						|  |  | 
					
						
						|  | def as_retriever(self): | 
					
						
						|  | return self.vectorstore.as_retriever() | 
					
						
						|  |  |