File size: 3,245 Bytes
a7642c6 6054f54 758538f 6054f54 eae0334 6054f54 758538f 6054f54 2493f1d 6054f54 2493f1d 6054f54 2493f1d 6054f54 2493f1d 6054f54 2493f1d 6054f54 758538f a3da325 758538f 8e1cb41 758538f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""
Reference:
- [graphrag](https://github.com/microsoft/graphrag)
"""
import html
import json
import re
from typing import Any, Callable
import numpy as np
import xxhash
from rag.utils.redis_conn import REDIS_CONN
ErrorHandlerFn = Callable[[BaseException | None, str | None, dict | None], None]
def perform_variable_replacements(
input: str, history: list[dict] | None = None, variables: dict | None = None
) -> str:
"""Perform variable replacements on the input string and in a chat log."""
if history is None:
history = []
if variables is None:
variables = {}
result = input
def replace_all(input: str) -> str:
result = input
for k, v in variables.items():
result = result.replace(f"{{{k}}}", v)
return result
result = replace_all(result)
for i, entry in enumerate(history):
if entry.get("role") == "system":
entry["content"] = replace_all(entry.get("content") or "")
return result
def clean_str(input: Any) -> str:
"""Clean an input string by removing HTML escapes, control characters, and other unwanted characters."""
# If we get non-string input, just give it back
if not isinstance(input, str):
return input
result = html.unescape(input.strip())
# https://stackoverflow.com/questions/4324790/removing-control-characters-from-a-string-in-python
return re.sub(r"[\"\x00-\x1f\x7f-\x9f]", "", result)
def dict_has_keys_with_types(
data: dict, expected_fields: list[tuple[str, type]]
) -> bool:
"""Return True if the given dictionary has the given keys with the given types."""
for field, field_type in expected_fields:
if field not in data:
return False
value = data[field]
if not isinstance(value, field_type):
return False
return True
def get_llm_cache(llmnm, txt, history, genconf):
hasher = xxhash.xxh64()
hasher.update(str(llmnm).encode("utf-8"))
hasher.update(str(txt).encode("utf-8"))
hasher.update(str(history).encode("utf-8"))
hasher.update(str(genconf).encode("utf-8"))
k = hasher.hexdigest()
bin = REDIS_CONN.get(k)
if not bin:
return
return bin
def set_llm_cache(llmnm, txt, v: str, history, genconf):
hasher = xxhash.xxh64()
hasher.update(str(llmnm).encode("utf-8"))
hasher.update(str(txt).encode("utf-8"))
hasher.update(str(history).encode("utf-8"))
hasher.update(str(genconf).encode("utf-8"))
k = hasher.hexdigest()
REDIS_CONN.set(k, v.encode("utf-8"), 24*3600)
def get_embed_cache(llmnm, txt):
hasher = xxhash.xxh64()
hasher.update(str(llmnm).encode("utf-8"))
hasher.update(str(txt).encode("utf-8"))
k = hasher.hexdigest()
bin = REDIS_CONN.get(k)
if not bin:
return
return np.array(json.loads(bin))
def set_embed_cache(llmnm, txt, arr):
hasher = xxhash.xxh64()
hasher.update(str(llmnm).encode("utf-8"))
hasher.update(str(txt).encode("utf-8"))
k = hasher.hexdigest()
arr = json.dumps(arr.tolist() if isinstance(arr, np.ndarray) else arr)
REDIS_CONN.set(k, arr.encode("utf-8"), 24*3600) |