Spaces:
Runtime error
Runtime error
"""Qdrant vector store index. | |
An index that is built on top of an existing Qdrant collection. | |
""" | |
import logging | |
from typing import Any, List, Optional, cast | |
from gpt_index.data_structs.data_structs import Node | |
from gpt_index.utils import get_new_id | |
from gpt_index.vector_stores.types import ( | |
NodeEmbeddingResult, | |
VectorStore, | |
VectorStoreQueryResult, | |
) | |
class QdrantVectorStore(VectorStore): | |
"""Qdrant Vector Store. | |
In this vector store, embeddings and docs are stored within a | |
Qdrant collection. | |
During query time, the index uses Qdrant to query for the top | |
k most similar nodes. | |
Args: | |
collection_name: (str): name of the Qdrant collection | |
client (Optional[Any]): QdrantClient instance from `qdrant-client` package | |
""" | |
stores_text: bool = True | |
def __init__( | |
self, collection_name: str, client: Optional[Any] = None, **kwargs: Any | |
) -> None: | |
"""Init params.""" | |
import_err_msg = ( | |
"`qdrant-client` package not found, please run `pip install qdrant-client`" | |
) | |
try: | |
import qdrant_client # noqa: F401 | |
except ImportError: | |
raise ImportError(import_err_msg) | |
if client is None: | |
raise ValueError("client cannot be None.") | |
self._client = cast(qdrant_client.QdrantClient, client) | |
self._collection_name = collection_name | |
self._collection_initialized = self._collection_exists(collection_name) | |
def config_dict(self) -> dict: | |
"""Return config dict.""" | |
return { | |
"collection_name": self._collection_name, | |
} | |
def add(self, embedding_results: List[NodeEmbeddingResult]) -> List[str]: | |
"""Add embedding results to index. | |
Args | |
embedding_results: List[NodeEmbeddingResult]: list of embedding results | |
""" | |
from qdrant_client.http import models as rest | |
from qdrant_client.http.exceptions import UnexpectedResponse | |
ids = [] | |
for result in embedding_results: | |
new_id = result.id | |
node = result.node | |
text_embedding = result.embedding | |
collection_name = self._collection_name | |
# assign a new_id if current_id conflicts with existing ids | |
while True: | |
try: | |
self._client.http.points_api.get_point( | |
collection_name=collection_name, id=new_id | |
) | |
except UnexpectedResponse: | |
break | |
new_id = get_new_id(set()) | |
# Create the Qdrant collection, if it does not exist yet | |
if not self._collection_initialized: | |
self._create_collection( | |
collection_name=collection_name, | |
vector_size=len(text_embedding), | |
) | |
self._collection_initialized = True | |
payload = { | |
"doc_id": result.doc_id, | |
"text": node.get_text(), | |
"index": node.index, | |
} | |
self._client.upsert( | |
collection_name=collection_name, | |
points=[ | |
rest.PointStruct( | |
id=new_id, | |
vector=text_embedding, | |
payload=payload, | |
) | |
], | |
) | |
ids.append(new_id) | |
return ids | |
def delete(self, doc_id: str, **delete_kwargs: Any) -> None: | |
"""Delete a document. | |
Args: | |
doc_id: (str): document id | |
""" | |
from qdrant_client.http import models as rest | |
self._client.delete( | |
collection_name=self._collection_name, | |
points_selector=rest.Filter( | |
must=[ | |
rest.FieldCondition( | |
key="doc_id", match=rest.MatchValue(value=doc_id) | |
) | |
] | |
), | |
) | |
def client(self) -> Any: | |
"""Return the Qdrant client.""" | |
return self._client | |
def _create_collection(self, collection_name: str, vector_size: int) -> None: | |
"""Create a Qdrant collection.""" | |
from qdrant_client.http import models as rest | |
self._client.recreate_collection( | |
collection_name=collection_name, | |
vectors_config=rest.VectorParams( | |
size=vector_size, | |
distance=rest.Distance.COSINE, | |
), | |
) | |
def _collection_exists(self, collection_name: str) -> bool: | |
"""Check if a collection exists.""" | |
from qdrant_client.http.exceptions import UnexpectedResponse | |
try: | |
response = self._client.http.collections_api.get_collection(collection_name) | |
return response.result is not None | |
except UnexpectedResponse: | |
return False | |
def query( | |
self, | |
query_embedding: List[float], | |
similarity_top_k: int, | |
doc_ids: Optional[List[str]] = None, | |
) -> VectorStoreQueryResult: | |
"""Query index for top k most similar nodes. | |
Args: | |
query_embedding (List[float]): query embedding | |
similarity_top_k (int): top k most similar nodes | |
doc_ids (Optional[List[str]]): list of doc_ids to filter by | |
""" | |
from qdrant_client.http.models.models import ( | |
FieldCondition, | |
Filter, | |
MatchValue, | |
Payload, | |
) | |
response = self._client.search( | |
collection_name=self._collection_name, | |
query_vector=query_embedding, | |
limit=cast(int, similarity_top_k), | |
query_filter=None | |
if not doc_ids | |
else Filter( | |
must=[ | |
Filter( | |
should=[ | |
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)) | |
for doc_id in doc_ids | |
], | |
) | |
] | |
), | |
) | |
logging.debug(f"> Top {len(response)} nodes:") | |
nodes = [] | |
similarities = [] | |
ids = [] | |
for point in response: | |
payload = cast(Payload, point.payload) | |
node = Node( | |
ref_doc_id=payload.get("doc_id"), | |
text=payload.get("text"), | |
) | |
nodes.append(node) | |
similarities.append(point.score) | |
ids.append(str(point.id)) | |
return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids) | |