Spaces:
Runtime error
Runtime error
"""Base vector store index. | |
An index that that is built on top of an existing vector store. | |
""" | |
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Type | |
from gpt_index.async_utils import run_async_tasks | |
from gpt_index.data_structs.data_structs import IndexDict, Node | |
from gpt_index.embeddings.base import BaseEmbedding | |
from gpt_index.indices.base import DOCUMENTS_INPUT, BaseGPTIndex | |
from gpt_index.indices.query.base import BaseGPTIndexQuery | |
from gpt_index.indices.query.schema import QueryMode | |
from gpt_index.indices.query.vector_store.base import GPTVectorStoreIndexQuery | |
from gpt_index.langchain_helpers.chain_wrapper import LLMPredictor | |
from gpt_index.langchain_helpers.text_splitter import TextSplitter | |
from gpt_index.prompts.default_prompts import DEFAULT_TEXT_QA_PROMPT | |
from gpt_index.prompts.prompts import QuestionAnswerPrompt | |
from gpt_index.schema import BaseDocument | |
from gpt_index.utils import get_new_id | |
from gpt_index.vector_stores.simple import SimpleVectorStore | |
from gpt_index.vector_stores.types import NodeEmbeddingResult, VectorStore | |
VECTOR_STORE_CONFIG_DICT_KEY = "vector_store" | |
class GPTVectorStoreIndex(BaseGPTIndex[IndexDict]): | |
"""Base GPT Vector Store Index. | |
Args: | |
text_qa_template (Optional[QuestionAnswerPrompt]): A Question-Answer Prompt | |
(see :ref:`Prompt-Templates`). | |
NOTE: this is a deprecated field. | |
embed_model (Optional[BaseEmbedding]): Embedding model to use for | |
embedding similarity. | |
vector_store (Optional[VectorStore]): Vector store to use for | |
embedding similarity. See :ref:`Ref-Indices-VectorStore-Stores` | |
for more details. | |
use_async (bool): Whether to use asynchronous calls. Defaults to False. | |
""" | |
index_struct_cls = IndexDict | |
def __init__( | |
self, | |
documents: Optional[Sequence[DOCUMENTS_INPUT]] = None, | |
index_struct: Optional[IndexDict] = None, | |
text_qa_template: Optional[QuestionAnswerPrompt] = None, | |
llm_predictor: Optional[LLMPredictor] = None, | |
embed_model: Optional[BaseEmbedding] = None, | |
vector_store: Optional[VectorStore] = None, | |
text_splitter: Optional[TextSplitter] = None, | |
use_async: bool = False, | |
**kwargs: Any, | |
) -> None: | |
"""Initialize params.""" | |
self._vector_store = vector_store or SimpleVectorStore() | |
self.text_qa_template = text_qa_template or DEFAULT_TEXT_QA_PROMPT | |
self._use_async = use_async | |
super().__init__( | |
documents=documents, | |
index_struct=index_struct, | |
llm_predictor=llm_predictor, | |
embed_model=embed_model, | |
text_splitter=text_splitter, | |
**kwargs, | |
) | |
def get_query_map(self) -> Dict[str, Type[BaseGPTIndexQuery]]: | |
"""Get query map.""" | |
return { | |
QueryMode.DEFAULT: GPTVectorStoreIndexQuery, | |
QueryMode.EMBEDDING: GPTVectorStoreIndexQuery, | |
} | |
def _get_node_embedding_results( | |
self, nodes: List[Node], existing_node_ids: Set, doc_id: str | |
) -> List[NodeEmbeddingResult]: | |
"""Get tuples of id, node, and embedding. | |
Allows us to store these nodes in a vector store. | |
Embeddings are called in batches. | |
""" | |
id_to_node_map: Dict[str, Node] = {} | |
id_to_embed_map: Dict[str, List[float]] = {} | |
for n in nodes: | |
new_id = get_new_id(existing_node_ids.union(id_to_node_map.keys())) | |
if n.embedding is None: | |
self._embed_model.queue_text_for_embeddding(new_id, n.get_text()) | |
else: | |
id_to_embed_map[new_id] = n.embedding | |
id_to_node_map[new_id] = n | |
# call embedding model to get embeddings | |
result_ids, result_embeddings = self._embed_model.get_queued_text_embeddings() | |
for new_id, text_embedding in zip(result_ids, result_embeddings): | |
id_to_embed_map[new_id] = text_embedding | |
result_tups = [] | |
for id, embed in id_to_embed_map.items(): | |
result_tups.append( | |
NodeEmbeddingResult(id, id_to_node_map[id], embed, doc_id=doc_id) | |
) | |
return result_tups | |
async def _aget_node_embedding_results( | |
self, nodes: List[Node], existing_node_ids: Set, doc_id: str | |
) -> List[NodeEmbeddingResult]: | |
"""Asynchronously get tuples of id, node, and embedding. | |
Allows us to store these nodes in a vector store. | |
Embeddings are called in batches. | |
""" | |
id_to_node_map: Dict[str, Node] = {} | |
id_to_embed_map: Dict[str, List[float]] = {} | |
text_queue: List[Tuple[str, str]] = [] | |
for n in nodes: | |
new_id = get_new_id(existing_node_ids.union(id_to_node_map.keys())) | |
if n.embedding is None: | |
text_queue.append((new_id, n.get_text())) | |
else: | |
id_to_embed_map[new_id] = n.embedding | |
id_to_node_map[new_id] = n | |
# call embedding model to get embeddings | |
( | |
result_ids, | |
result_embeddings, | |
) = await self._embed_model.aget_queued_text_embeddings(text_queue) | |
for new_id, text_embedding in zip(result_ids, result_embeddings): | |
id_to_embed_map[new_id] = text_embedding | |
result_tups = [] | |
for id, embed in id_to_embed_map.items(): | |
result_tups.append( | |
NodeEmbeddingResult(id, id_to_node_map[id], embed, doc_id=doc_id) | |
) | |
return result_tups | |
def _build_fallback_text_splitter(self) -> TextSplitter: | |
# if not specified, use "smart" text splitter to ensure chunks fit in prompt | |
return self._prompt_helper.get_text_splitter_given_prompt( | |
self.text_qa_template, 1 | |
) | |
async def _async_add_document_to_index( | |
self, | |
index_struct: IndexDict, | |
document: BaseDocument, | |
) -> None: | |
"""Asynchronously add document to index.""" | |
nodes = self._get_nodes_from_document(document) | |
embedding_results = await self._aget_node_embedding_results( | |
nodes, set(), document.get_doc_id() | |
) | |
new_ids = self._vector_store.add(embedding_results) | |
# if the vector store doesn't store text, we need to add the nodes to the | |
# index struct | |
if not self._vector_store.stores_text: | |
for result, new_id in zip(embedding_results, new_ids): | |
index_struct.add_node(result.node, text_id=new_id) | |
def _add_document_to_index( | |
self, | |
index_struct: IndexDict, | |
document: BaseDocument, | |
) -> None: | |
"""Add document to index.""" | |
nodes = self._get_nodes_from_document(document) | |
embedding_results = self._get_node_embedding_results( | |
nodes, set(), document.get_doc_id() | |
) | |
new_ids = self._vector_store.add(embedding_results) | |
# if the vector store doesn't store text, we need to add the nodes to the | |
# index struct | |
if not self._vector_store.stores_text: | |
for result, new_id in zip(embedding_results, new_ids): | |
index_struct.add_node(result.node, text_id=new_id) | |
def _build_index_from_documents( | |
self, documents: Sequence[BaseDocument] | |
) -> IndexDict: | |
"""Build index from documents.""" | |
index_struct = self.index_struct_cls() | |
if self._use_async: | |
tasks = [ | |
self._async_add_document_to_index(index_struct, d) for d in documents | |
] | |
run_async_tasks(tasks) | |
else: | |
for d in documents: | |
self._add_document_to_index(index_struct, d) | |
return index_struct | |
def _insert(self, document: BaseDocument, **insert_kwargs: Any) -> None: | |
"""Insert a document.""" | |
self._add_document_to_index(self._index_struct, document) | |
def _delete(self, doc_id: str, **delete_kwargs: Any) -> None: | |
"""Delete a document.""" | |
self._index_struct.delete(doc_id) | |
self._vector_store.delete(doc_id) | |
def load_from_dict( | |
cls, result_dict: Dict[str, Any], **kwargs: Any | |
) -> "BaseGPTIndex": | |
"""Load index from string (in JSON-format). | |
This method loads the index from a JSON string. The index data | |
structure itself is preserved completely. If the index is defined over | |
subindices, those subindices will also be preserved (and subindices of | |
those subindices, etc.). | |
NOTE: load_from_string should not be used for indices composed on top | |
of other indices. Please define a `ComposableGraph` and use | |
`save_to_string` and `load_from_string` on that instead. | |
Args: | |
index_string (str): The index string (in JSON-format). | |
Returns: | |
BaseGPTIndex: The loaded index. | |
""" | |
config_dict = {} | |
if "vector_store" in result_dict: | |
config_dict = result_dict[VECTOR_STORE_CONFIG_DICT_KEY] | |
return super().load_from_dict(result_dict, **config_dict, **kwargs) | |
def save_to_dict(self, **save_kwargs: Any) -> dict: | |
"""Save to string. | |
This method stores the index into a JSON string. | |
NOTE: save_to_string should not be used for indices composed on top | |
of other indices. Please define a `ComposableGraph` and use | |
`save_to_string` and `load_from_string` on that instead. | |
Returns: | |
dict: The JSON dict of the index. | |
""" | |
out_dict = super().save_to_dict() | |
out_dict[VECTOR_STORE_CONFIG_DICT_KEY] = self._vector_store.config_dict | |
return out_dict | |
def _preprocess_query(self, mode: QueryMode, query_kwargs: Any) -> None: | |
super()._preprocess_query(mode, query_kwargs) | |
if "text_qa_template" not in query_kwargs: | |
query_kwargs["text_qa_template"] = self.text_qa_template | |
# NOTE: Pass along vector store instance to query objects | |
# TODO: refactor this to be more explicit | |
query_kwargs["vector_store"] = self._vector_store | |