韩宇
init
1b7e88c
import base64
import os
import platform
from collections import OrderedDict
from io import BytesIO
from pathlib import Path
from typing import Sequence
import requests
from PIL import Image
class LRUCache:
# initializing capacity
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def has(self, key) -> bool:
return key in self.cache
# we return the value of the key
# that is queried in O(1) and return -1 if we
# don't find the key in out dict / cache.
# And also move the key to the end
# to show that it was recently used.
def get(self, key, default=None):
if key not in self.cache:
return default
else:
self.cache.move_to_end(key)
return self.cache[key]
# first, we add / update the key by conventional methods.
# And also move the key to the end to show that it was recently used.
# But here we will also check whether the length of our
# ordered dictionary has exceeded our capacity,
# If so we remove the first key (least recently used)
def put(self, key, value) -> None:
self.cache[key] = value
self.cache.move_to_end(key)
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
def pop(self, key, value):
self.cache.pop(key, None)
def handle_response(res, json, url):
if res.status_code < 299:
return res.json()
elif res.status_code == 404:
logging.error("Request URL: {} | Error[404]: 请求错误: 错误的地址".format(url))
raise VQLError(516)
elif res.status_code == 422:
logging.error(
"Request URL: {} | Request body: {} | Error[422]: 请求错误: 错误的请求格式".format(
url, json
)
)
raise VQLError(517)
else:
info = res.json()
logging.error(
"Request URL: {} | Request body: {} | Error: {}".format(url, json, info)
)
raise VQLError(511, detail=info)
def chunks(l: Sequence, win_len: int, stride_len: int):
s_id = 0
e_id = min(len(l), win_len)
while True:
yield l[s_id:e_id]
if e_id == len(l):
break
s_id = s_id + stride_len
e_id = min(s_id + win_len, len(l))
def encode_image(input):
def _encode(img):
output_buffer = BytesIO()
img.save(output_buffer, format="JPEG")
byte_data = output_buffer.getvalue()
base64_str = base64.b64encode(byte_data)
base64_str = str(base64_str, encoding="utf-8")
return base64_str
input = input.convert("RGB")
if isinstance(input, list):
res = []
for img in input:
res.append(_encode(img))
return res
else:
return _encode(input)
def get_platform() -> str:
"""Get platform."""
system = platform.system()
if system == "Darwin":
return "MacOS"
return system
def read_image(input_source) -> Image.Image:
"""
Read an image from a local path, URL, PIL Image object, or Path object.
Args:
input_source (str or PIL.Image.Image or Path): The source of the image.
Can be a local file path, a URL, a PIL Image object, or a Path object.
Returns:
PIL.Image.Image: The image as a PIL Image object.
Raises:
ValueError: If the input source is invalid or the image cannot be read.
"""
if isinstance(input_source, Image.Image):
return input_source
if isinstance(input_source, (str, Path)):
if isinstance(input_source, str) and input_source.startswith(
("http://", "https://")
):
# URL
try:
response = requests.get(input_source)
response.raise_for_status()
return Image.open(BytesIO(response.content))
except requests.RequestException as e:
raise ValueError(f"Failed to fetch image from URL: {e}")
elif os.path.isfile(str(input_source)):
# Local file path or Path object
try:
return Image.open(input_source)
except IOError as e:
raise ValueError(f"Failed to open local image file: {e}")
else:
raise ValueError(
"Invalid input source. Must be a valid URL or local file path."
)
raise ValueError(
"Invalid input type. Must be a string (URL or file path), Path object, or PIL Image object."
)