|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import re |
|
import tiktoken |
|
from api.utils.file_utils import get_project_base_directory |
|
|
|
def singleton(cls, *args, **kw): |
|
instances = {} |
|
|
|
def _singleton(): |
|
key = str(cls) + str(os.getpid()) |
|
if key not in instances: |
|
instances[key] = cls(*args, **kw) |
|
return instances[key] |
|
|
|
return _singleton |
|
|
|
|
|
def rmSpace(txt): |
|
txt = re.sub(r"([^a-z0-9.,\)>]) +([^ ])", r"\1\2", txt, flags=re.IGNORECASE) |
|
return re.sub(r"([^ ]) +([^a-z0-9.,\(<])", r"\1\2", txt, flags=re.IGNORECASE) |
|
|
|
|
|
def findMaxDt(fnm): |
|
m = "1970-01-01 00:00:00" |
|
try: |
|
with open(fnm, "r") as f: |
|
while True: |
|
line = f.readline() |
|
if not line: |
|
break |
|
line = line.strip("\n") |
|
if line == 'nan': |
|
continue |
|
if line > m: |
|
m = line |
|
except Exception: |
|
pass |
|
return m |
|
|
|
|
|
def findMaxTm(fnm): |
|
m = 0 |
|
try: |
|
with open(fnm, "r") as f: |
|
while True: |
|
line = f.readline() |
|
if not line: |
|
break |
|
line = line.strip("\n") |
|
if line == 'nan': |
|
continue |
|
if int(line) > m: |
|
m = int(line) |
|
except Exception: |
|
pass |
|
return m |
|
|
|
|
|
tiktoken_cache_dir = get_project_base_directory() |
|
os.environ["TIKTOKEN_CACHE_DIR"] = tiktoken_cache_dir |
|
|
|
encoder = tiktoken.get_encoding("cl100k_base") |
|
|
|
|
|
def num_tokens_from_string(string: str) -> int: |
|
"""Returns the number of tokens in a text string.""" |
|
try: |
|
return len(encoder.encode(string)) |
|
except Exception: |
|
return 0 |
|
|
|
|
|
def truncate(string: str, max_len: int) -> str: |
|
"""Returns truncated text if the length of text exceed max_len.""" |
|
return encoder.decode(encoder.encode(string)[:max_len]) |
|
|