Spaces:
Runtime error
Runtime error
"""Elasticsearch/Opensearch vector store.""" | |
import json | |
from typing import Any, Dict, List, Optional | |
from gpt_index.data_structs import Node | |
from gpt_index.vector_stores.types import ( | |
NodeEmbeddingResult, | |
VectorStore, | |
VectorStoreQueryResult, | |
) | |
class OpensearchVectorClient: | |
"""Object encapsulating an Opensearch index that has vector search enabled. | |
If the index does not yet exist, it is created during init. | |
Therefore, the underlying index is assumed to either: | |
1) not exist yet or 2) be created due to previous usage of this class. | |
Args: | |
endpoint (str): URL (http/https) of elasticsearch endpoint | |
index (str): Name of the elasticsearch index | |
dim (int): Dimension of the vector | |
embedding_field (str): Name of the field in the index to store | |
embedding array in. | |
text_field (str): Name of the field to grab text from | |
method (Optional[dict]): Opensearch "method" JSON obj for configuring | |
the KNN index. | |
This includes engine, metric, and other config params. Defaults to: | |
{"name": "hnsw", "space_type": "l2", "engine": "faiss", | |
"parameters": {"ef_construction": 256, "m": 48}} | |
""" | |
def __init__( | |
self, | |
endpoint: str, | |
index: str, | |
dim: int, | |
embedding_field: str = "embedding", | |
text_field: str = "content", | |
method: Optional[dict] = None, | |
): | |
"""Init params.""" | |
if method is None: | |
method = { | |
"name": "hnsw", | |
"space_type": "l2", | |
"engine": "nmslib", | |
"parameters": {"ef_construction": 256, "m": 48}, | |
} | |
import_err_msg = "`httpx` package not found, please run `pip install httpx`" | |
if embedding_field is None: | |
embedding_field = "embedding" | |
try: | |
import httpx # noqa: F401 | |
except ImportError: | |
raise ImportError(import_err_msg) | |
self._embedding_field = embedding_field | |
self._client = httpx.Client(base_url=endpoint) | |
self._endpoint = endpoint | |
self._dim = dim | |
self._index = index | |
self._text_field = text_field | |
# initialize mapping | |
idx_conf = { | |
"settings": {"index": {"knn": True, "knn.algo_param.ef_search": 100}}, | |
"mappings": { | |
"properties": { | |
embedding_field: { | |
"type": "knn_vector", | |
"dimension": dim, | |
"method": method, | |
}, | |
} | |
}, | |
} | |
res = self._client.put(f"/{self._index}", json=idx_conf) | |
# will 400 if the index already existed, so allow 400 errors right here | |
assert res.status_code == 200 or res.status_code == 400 | |
def index_results(self, results: List[NodeEmbeddingResult]) -> List[str]: | |
"""Store results in the index.""" | |
bulk_req: List[Dict[Any, Any]] = [] | |
for result in results: | |
bulk_req.append({"index": {"_index": self._index, "_id": result.id}}) | |
bulk_req.append( | |
{ | |
self._text_field: result.node.text, | |
self._embedding_field: result.embedding, | |
} | |
) | |
bulk = "\n".join([json.dumps(v) for v in bulk_req]) + "\n" | |
res = self._client.post( | |
"/_bulk", headers={"Content-Type": "application/x-ndjson"}, content=bulk | |
) | |
assert res.status_code == 200 | |
assert not res.json()["errors"], "expected no errors while indexing docs" | |
return [r.id for r in results] | |
def delete_doc_id(self, doc_id: str) -> None: | |
"""Delete a document. | |
Args: | |
doc_id (str): document id | |
""" | |
self._client.delete(f"{self._index}/_doc/{doc_id}") | |
def do_approx_knn( | |
self, query_embedding: List[float], k: int | |
) -> VectorStoreQueryResult: | |
"""Do approximate knn.""" | |
res = self._client.post( | |
f"{self._index}/_search", | |
json={ | |
"size": k, | |
"query": { | |
"knn": {self._embedding_field: {"vector": query_embedding, "k": k}} | |
}, | |
}, | |
) | |
nodes = [] | |
ids = [] | |
scores = [] | |
for hit in res.json()["hits"]["hits"]: | |
source = hit["_source"] | |
text = source[self._text_field] | |
doc_id = hit["_id"] | |
node = Node(text=text, extra_info=source, doc_id=doc_id) | |
ids.append(doc_id) | |
nodes.append(node) | |
scores.append(hit["_score"]) | |
return VectorStoreQueryResult(nodes=nodes, ids=ids, similarities=scores) | |
class OpensearchVectorStore(VectorStore): | |
"""Elasticsearch/Opensearch vector store. | |
Args: | |
client (OpensearchVectorClient): Vector index client to use | |
for data insertion/querying. | |
""" | |
stores_text: bool = True | |
def __init__( | |
self, | |
client: OpensearchVectorClient, | |
) -> None: | |
"""Initialize params.""" | |
import_err_msg = "`httpx` package not found, please run `pip install httpx`" | |
try: | |
import httpx # noqa: F401 | |
except ImportError: | |
raise ImportError(import_err_msg) | |
self._client = client | |
def client(self) -> Any: | |
"""Get client.""" | |
return self._client | |
def config_dict(self) -> dict: | |
"""Get config dict.""" | |
return {} | |
def add( | |
self, | |
embedding_results: List[NodeEmbeddingResult], | |
) -> List[str]: | |
"""Add embedding results to index. | |
Args | |
embedding_results: List[NodeEmbeddingResult]: list of embedding results | |
""" | |
self._client.index_results(embedding_results) | |
return [result.id for result in embedding_results] | |
def delete(self, doc_id: str, **delete_kwargs: Any) -> None: | |
"""Delete a document. | |
Args: | |
doc_id (str): document id | |
""" | |
self._client.delete_doc_id(doc_id) | |
def query( | |
self, | |
query_embedding: List[float], | |
similarity_top_k: int, | |
doc_ids: Optional[List[str]] = None, | |
) -> VectorStoreQueryResult: | |
"""Query index for top k most similar nodes. | |
Args: | |
query_embedding (List[float]): query embedding | |
similarity_top_k (int): top k most similar nodes | |
""" | |
return self._client.do_approx_knn(query_embedding, similarity_top_k) | |