AbeerTrial's picture
Duplicate from AbeerTrial/SOAPAssist
35b22df
raw
history blame
10.1 kB
"""Base vector store index.
An index that that is built on top of an existing vector store.
"""
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Type
from gpt_index.async_utils import run_async_tasks
from gpt_index.data_structs.data_structs import IndexDict, Node
from gpt_index.embeddings.base import BaseEmbedding
from gpt_index.indices.base import DOCUMENTS_INPUT, BaseGPTIndex
from gpt_index.indices.query.base import BaseGPTIndexQuery
from gpt_index.indices.query.schema import QueryMode
from gpt_index.indices.query.vector_store.base import GPTVectorStoreIndexQuery
from gpt_index.langchain_helpers.chain_wrapper import LLMPredictor
from gpt_index.langchain_helpers.text_splitter import TextSplitter
from gpt_index.prompts.default_prompts import DEFAULT_TEXT_QA_PROMPT
from gpt_index.prompts.prompts import QuestionAnswerPrompt
from gpt_index.schema import BaseDocument
from gpt_index.utils import get_new_id
from gpt_index.vector_stores.simple import SimpleVectorStore
from gpt_index.vector_stores.types import NodeEmbeddingResult, VectorStore
VECTOR_STORE_CONFIG_DICT_KEY = "vector_store"
class GPTVectorStoreIndex(BaseGPTIndex[IndexDict]):
"""Base GPT Vector Store Index.
Args:
text_qa_template (Optional[QuestionAnswerPrompt]): A Question-Answer Prompt
(see :ref:`Prompt-Templates`).
NOTE: this is a deprecated field.
embed_model (Optional[BaseEmbedding]): Embedding model to use for
embedding similarity.
vector_store (Optional[VectorStore]): Vector store to use for
embedding similarity. See :ref:`Ref-Indices-VectorStore-Stores`
for more details.
use_async (bool): Whether to use asynchronous calls. Defaults to False.
"""
index_struct_cls = IndexDict
def __init__(
self,
documents: Optional[Sequence[DOCUMENTS_INPUT]] = None,
index_struct: Optional[IndexDict] = None,
text_qa_template: Optional[QuestionAnswerPrompt] = None,
llm_predictor: Optional[LLMPredictor] = None,
embed_model: Optional[BaseEmbedding] = None,
vector_store: Optional[VectorStore] = None,
text_splitter: Optional[TextSplitter] = None,
use_async: bool = False,
**kwargs: Any,
) -> None:
"""Initialize params."""
self._vector_store = vector_store or SimpleVectorStore()
self.text_qa_template = text_qa_template or DEFAULT_TEXT_QA_PROMPT
self._use_async = use_async
super().__init__(
documents=documents,
index_struct=index_struct,
llm_predictor=llm_predictor,
embed_model=embed_model,
text_splitter=text_splitter,
**kwargs,
)
@classmethod
def get_query_map(self) -> Dict[str, Type[BaseGPTIndexQuery]]:
"""Get query map."""
return {
QueryMode.DEFAULT: GPTVectorStoreIndexQuery,
QueryMode.EMBEDDING: GPTVectorStoreIndexQuery,
}
def _get_node_embedding_results(
self, nodes: List[Node], existing_node_ids: Set, doc_id: str
) -> List[NodeEmbeddingResult]:
"""Get tuples of id, node, and embedding.
Allows us to store these nodes in a vector store.
Embeddings are called in batches.
"""
id_to_node_map: Dict[str, Node] = {}
id_to_embed_map: Dict[str, List[float]] = {}
for n in nodes:
new_id = get_new_id(existing_node_ids.union(id_to_node_map.keys()))
if n.embedding is None:
self._embed_model.queue_text_for_embeddding(new_id, n.get_text())
else:
id_to_embed_map[new_id] = n.embedding
id_to_node_map[new_id] = n
# call embedding model to get embeddings
result_ids, result_embeddings = self._embed_model.get_queued_text_embeddings()
for new_id, text_embedding in zip(result_ids, result_embeddings):
id_to_embed_map[new_id] = text_embedding
result_tups = []
for id, embed in id_to_embed_map.items():
result_tups.append(
NodeEmbeddingResult(id, id_to_node_map[id], embed, doc_id=doc_id)
)
return result_tups
async def _aget_node_embedding_results(
self, nodes: List[Node], existing_node_ids: Set, doc_id: str
) -> List[NodeEmbeddingResult]:
"""Asynchronously get tuples of id, node, and embedding.
Allows us to store these nodes in a vector store.
Embeddings are called in batches.
"""
id_to_node_map: Dict[str, Node] = {}
id_to_embed_map: Dict[str, List[float]] = {}
text_queue: List[Tuple[str, str]] = []
for n in nodes:
new_id = get_new_id(existing_node_ids.union(id_to_node_map.keys()))
if n.embedding is None:
text_queue.append((new_id, n.get_text()))
else:
id_to_embed_map[new_id] = n.embedding
id_to_node_map[new_id] = n
# call embedding model to get embeddings
(
result_ids,
result_embeddings,
) = await self._embed_model.aget_queued_text_embeddings(text_queue)
for new_id, text_embedding in zip(result_ids, result_embeddings):
id_to_embed_map[new_id] = text_embedding
result_tups = []
for id, embed in id_to_embed_map.items():
result_tups.append(
NodeEmbeddingResult(id, id_to_node_map[id], embed, doc_id=doc_id)
)
return result_tups
def _build_fallback_text_splitter(self) -> TextSplitter:
# if not specified, use "smart" text splitter to ensure chunks fit in prompt
return self._prompt_helper.get_text_splitter_given_prompt(
self.text_qa_template, 1
)
async def _async_add_document_to_index(
self,
index_struct: IndexDict,
document: BaseDocument,
) -> None:
"""Asynchronously add document to index."""
nodes = self._get_nodes_from_document(document)
embedding_results = await self._aget_node_embedding_results(
nodes, set(), document.get_doc_id()
)
new_ids = self._vector_store.add(embedding_results)
# if the vector store doesn't store text, we need to add the nodes to the
# index struct
if not self._vector_store.stores_text:
for result, new_id in zip(embedding_results, new_ids):
index_struct.add_node(result.node, text_id=new_id)
def _add_document_to_index(
self,
index_struct: IndexDict,
document: BaseDocument,
) -> None:
"""Add document to index."""
nodes = self._get_nodes_from_document(document)
embedding_results = self._get_node_embedding_results(
nodes, set(), document.get_doc_id()
)
new_ids = self._vector_store.add(embedding_results)
# if the vector store doesn't store text, we need to add the nodes to the
# index struct
if not self._vector_store.stores_text:
for result, new_id in zip(embedding_results, new_ids):
index_struct.add_node(result.node, text_id=new_id)
def _build_index_from_documents(
self, documents: Sequence[BaseDocument]
) -> IndexDict:
"""Build index from documents."""
index_struct = self.index_struct_cls()
if self._use_async:
tasks = [
self._async_add_document_to_index(index_struct, d) for d in documents
]
run_async_tasks(tasks)
else:
for d in documents:
self._add_document_to_index(index_struct, d)
return index_struct
def _insert(self, document: BaseDocument, **insert_kwargs: Any) -> None:
"""Insert a document."""
self._add_document_to_index(self._index_struct, document)
def _delete(self, doc_id: str, **delete_kwargs: Any) -> None:
"""Delete a document."""
self._index_struct.delete(doc_id)
self._vector_store.delete(doc_id)
@classmethod
def load_from_dict(
cls, result_dict: Dict[str, Any], **kwargs: Any
) -> "BaseGPTIndex":
"""Load index from string (in JSON-format).
This method loads the index from a JSON string. The index data
structure itself is preserved completely. If the index is defined over
subindices, those subindices will also be preserved (and subindices of
those subindices, etc.).
NOTE: load_from_string should not be used for indices composed on top
of other indices. Please define a `ComposableGraph` and use
`save_to_string` and `load_from_string` on that instead.
Args:
index_string (str): The index string (in JSON-format).
Returns:
BaseGPTIndex: The loaded index.
"""
config_dict = {}
if "vector_store" in result_dict:
config_dict = result_dict[VECTOR_STORE_CONFIG_DICT_KEY]
return super().load_from_dict(result_dict, **config_dict, **kwargs)
def save_to_dict(self, **save_kwargs: Any) -> dict:
"""Save to string.
This method stores the index into a JSON string.
NOTE: save_to_string should not be used for indices composed on top
of other indices. Please define a `ComposableGraph` and use
`save_to_string` and `load_from_string` on that instead.
Returns:
dict: The JSON dict of the index.
"""
out_dict = super().save_to_dict()
out_dict[VECTOR_STORE_CONFIG_DICT_KEY] = self._vector_store.config_dict
return out_dict
def _preprocess_query(self, mode: QueryMode, query_kwargs: Any) -> None:
super()._preprocess_query(mode, query_kwargs)
if "text_qa_template" not in query_kwargs:
query_kwargs["text_qa_template"] = self.text_qa_template
# NOTE: Pass along vector store instance to query objects
# TODO: refactor this to be more explicit
query_kwargs["vector_store"] = self._vector_store