Spaces:
Runtime error
Runtime error
File size: 2,388 Bytes
35b22df |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
"""Elasticsearch (or Opensearch) reader over REST api.
This only uses the basic search api, so it will work with Elasticsearch and Opensearch.
"""
from typing import List, Optional
from gpt_index.readers.base import BaseReader
from gpt_index.readers.schema.base import Document
class ElasticsearchReader(BaseReader):
"""
Read documents from an Elasticsearch/Opensearch index.
These documents can then be used in a downstream Llama Index data structure.
Args:
endpoint (str): URL (http/https) of cluster
index (str): Name of the index (required)
httpx_client_args (dict): Optional additional args to pass to the `httpx.Client`
"""
def __init__(
self, endpoint: str, index: str, httpx_client_args: Optional[dict] = None
):
"""Initialize with parameters."""
import_err_msg = """
`httpx` package not found. Install via `pip install httpx`
"""
try:
import httpx # noqa: F401
except ImportError:
raise ImportError(import_err_msg)
self._client = httpx.Client(base_url=endpoint, **(httpx_client_args or {}))
self._index = index
self._endpoint = endpoint
def load_data(
self,
field: str,
query: Optional[dict] = None,
embedding_field: Optional[str] = None,
) -> List[Document]:
"""Read data from the Elasticsearch index.
Args:
field (str): Field in the document to retrieve text from
query (Optional[dict]): Elasticsearch JSON query DSL object.
For example:
{"query": {"match": {"message": {"query": "this is a test"}}}}
embedding_field (Optional[str]): If there are embeddings stored in
this index, this field can be used
to set the embedding field on the returned Document list.
Returns:
List[Document]: A list of documents.
"""
res = self._client.post(f"{self._index}/_search", json=query).json()
documents = []
for hit in res["hits"]["hits"]:
value = hit["_source"][field]
embedding = hit["_source"].get(embedding_field or "", None)
documents.append(
Document(text=value, extra_info=hit["_source"], embedding=embedding)
)
return documents
|