Spaces:
Runtime error
Runtime error
File size: 6,620 Bytes
8a58cf3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
"""Elasticsearch/Opensearch vector store."""
import json
from typing import Any, Dict, List, Optional
from gpt_index.data_structs import Node
from gpt_index.vector_stores.types import (
NodeEmbeddingResult,
VectorStore,
VectorStoreQueryResult,
)
class OpensearchVectorClient:
"""Object encapsulating an Opensearch index that has vector search enabled.
If the index does not yet exist, it is created during init.
Therefore, the underlying index is assumed to either:
1) not exist yet or 2) be created due to previous usage of this class.
Args:
endpoint (str): URL (http/https) of elasticsearch endpoint
index (str): Name of the elasticsearch index
dim (int): Dimension of the vector
embedding_field (str): Name of the field in the index to store
embedding array in.
text_field (str): Name of the field to grab text from
method (Optional[dict]): Opensearch "method" JSON obj for configuring
the KNN index.
This includes engine, metric, and other config params. Defaults to:
{"name": "hnsw", "space_type": "l2", "engine": "faiss",
"parameters": {"ef_construction": 256, "m": 48}}
"""
def __init__(
self,
endpoint: str,
index: str,
dim: int,
embedding_field: str = "embedding",
text_field: str = "content",
method: Optional[dict] = None,
):
"""Init params."""
if method is None:
method = {
"name": "hnsw",
"space_type": "l2",
"engine": "nmslib",
"parameters": {"ef_construction": 256, "m": 48},
}
import_err_msg = "`httpx` package not found, please run `pip install httpx`"
if embedding_field is None:
embedding_field = "embedding"
try:
import httpx # noqa: F401
except ImportError:
raise ImportError(import_err_msg)
self._embedding_field = embedding_field
self._client = httpx.Client(base_url=endpoint)
self._endpoint = endpoint
self._dim = dim
self._index = index
self._text_field = text_field
# initialize mapping
idx_conf = {
"settings": {"index": {"knn": True, "knn.algo_param.ef_search": 100}},
"mappings": {
"properties": {
embedding_field: {
"type": "knn_vector",
"dimension": dim,
"method": method,
},
}
},
}
res = self._client.put(f"/{self._index}", json=idx_conf)
# will 400 if the index already existed, so allow 400 errors right here
assert res.status_code == 200 or res.status_code == 400
def index_results(self, results: List[NodeEmbeddingResult]) -> List[str]:
"""Store results in the index."""
bulk_req: List[Dict[Any, Any]] = []
for result in results:
bulk_req.append({"index": {"_index": self._index, "_id": result.id}})
bulk_req.append(
{
self._text_field: result.node.text,
self._embedding_field: result.embedding,
}
)
bulk = "\n".join([json.dumps(v) for v in bulk_req]) + "\n"
res = self._client.post(
"/_bulk", headers={"Content-Type": "application/x-ndjson"}, content=bulk
)
assert res.status_code == 200
assert not res.json()["errors"], "expected no errors while indexing docs"
return [r.id for r in results]
def delete_doc_id(self, doc_id: str) -> None:
"""Delete a document.
Args:
doc_id (str): document id
"""
self._client.delete(f"{self._index}/_doc/{doc_id}")
def do_approx_knn(
self, query_embedding: List[float], k: int
) -> VectorStoreQueryResult:
"""Do approximate knn."""
res = self._client.post(
f"{self._index}/_search",
json={
"size": k,
"query": {
"knn": {self._embedding_field: {"vector": query_embedding, "k": k}}
},
},
)
nodes = []
ids = []
scores = []
for hit in res.json()["hits"]["hits"]:
source = hit["_source"]
text = source[self._text_field]
doc_id = hit["_id"]
node = Node(text=text, extra_info=source, doc_id=doc_id)
ids.append(doc_id)
nodes.append(node)
scores.append(hit["_score"])
return VectorStoreQueryResult(nodes=nodes, ids=ids, similarities=scores)
class OpensearchVectorStore(VectorStore):
"""Elasticsearch/Opensearch vector store.
Args:
client (OpensearchVectorClient): Vector index client to use
for data insertion/querying.
"""
stores_text: bool = True
def __init__(
self,
client: OpensearchVectorClient,
) -> None:
"""Initialize params."""
import_err_msg = "`httpx` package not found, please run `pip install httpx`"
try:
import httpx # noqa: F401
except ImportError:
raise ImportError(import_err_msg)
self._client = client
@property
def client(self) -> Any:
"""Get client."""
return self._client
@property
def config_dict(self) -> dict:
"""Get config dict."""
return {}
def add(
self,
embedding_results: List[NodeEmbeddingResult],
) -> List[str]:
"""Add embedding results to index.
Args
embedding_results: List[NodeEmbeddingResult]: list of embedding results
"""
self._client.index_results(embedding_results)
return [result.id for result in embedding_results]
def delete(self, doc_id: str, **delete_kwargs: Any) -> None:
"""Delete a document.
Args:
doc_id (str): document id
"""
self._client.delete_doc_id(doc_id)
def query(
self,
query_embedding: List[float],
similarity_top_k: int,
doc_ids: Optional[List[str]] = None,
) -> VectorStoreQueryResult:
"""Query index for top k most similar nodes.
Args:
query_embedding (List[float]): query embedding
similarity_top_k (int): top k most similar nodes
"""
return self._client.do_approx_knn(query_embedding, similarity_top_k)
|