steveyin's picture
Upload 2 files
90d3ffe verified
from threading import Thread
from datetime import datetime
import cv2
import uvicorn
import contextlib
import threading
import time
import requests
class Server(uvicorn.Server):
'''
Use this Server class to server a uvicorn in a separate thread,
so to proceed to gradio UI launch
https://github.com/encode/uvicorn/issues/742
A couple of other links for reference --
https://stackoverflow.com/questions/61577643/python-how-to-use-fastapi-and-uvicorn-run-without-blocking-the-thread
https://stackoverflow.com/questions/76142431/how-to-run-another-application-within-the-same-running-event-loop/76148361#76148361
'''
def install_signal_handlers(self):
pass
@contextlib.contextmanager
def run_in_thread(self):
thread = threading.Thread(target=self.run)
thread.start()
try:
while not self.started:
time.sleep(1e-3)
yield
finally:
self.should_exit = True
thread.join()
class CountsPerSec:
"""
Class that tracks the number of occurrences ("counts") of an
arbitrary event and returns the frequency in occurrences
(counts) per second. The caller must increment the count.
"""
def __init__(self):
self._start_time = None
self._num_occurrences = 0
def start(self):
self._start_time = datetime.now()
return self
def increment(self):
self._num_occurrences += 1
def countsPerSec(self):
elapsed_time = (datetime.now() - self._start_time).total_seconds()
return self._num_occurrences / elapsed_time if elapsed_time > 0 else 0
class VideoGet:
"""
Class that continuously gets frames from a VideoCapture object
with a dedicated thread.
"""
def __init__(self, src=0):
self.stream = cv2.VideoCapture(src)
(self.grabbed, self.frame) = self.stream.read()
self.tn = Thread(target=self.get, args=())
self.stopped = False
def start(self):
self.tn.start()
return self
def get(self):
while not self.stopped:
if not self.grabbed:
self.stop()
else:
(self.grabbed, self.frame) = self.stream.read()
def stop(self):
self.tn.join()
self.stopped = True
class VideoShow:
"""
Class that continuously shows a frame using a dedicated thread.
"""
def __init__(self, frame=None):
self.frame = frame
self.tn = Thread(target=self.show, args=())
self.stopped = False
def start(self):
self.tn.start()
return self
def show(self):
while not self.stopped:
cv2.imshow("Video", self.frame)
if cv2.waitKey(1) == ord("q"):
self.stopped = True
def stop(self):
self.tn.join()
self.stopped = True
def show_fps(frame, iterations_per_sec):
"""
Add iterations per second text to lower-left corner of a frame.
"""
cv2.putText(
img=frame,
text="{:.0f} fps".format(iterations_per_sec),
org=(1000, 50),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.8,
color=(0, 255, 255),
thickness=1,
lineType=cv2.LINE_AA
)
cv2.putText(
img=frame, # annotated_frame,
text=datetime.now().strftime("%m/%d/%Y %H:%M:%S"),
org=(500, 50),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.8,
color=(0, 255, 255),
thickness=1,
lineType=cv2.LINE_AA
)
return frame
def draw_text(
img,
text,
pos=(0, 0),
font=cv2.FONT_HERSHEY_SIMPLEX,
font_scale=1,
font_thickness=2,
line_type=cv2.LINE_AA,
text_color=(0, 255, 0),
text_color_bg=(0, 0, 0)
) -> None:
"""draw a text with background color on image frame
Args:
img (_type_): _description_
text (_type_): _description_
pos (tuple, optional): _description_. Defaults to (0, 0).
font (_type_, optional): _description_. Defaults to cv2.FONT_HERSHEY_SIMPLEX.
font_scale (int, optional): _description_. Defaults to 1.
font_thickness (int, optional): _description_. Defaults to 2.
line_type (_type_, optional): _description_. Defaults to cv2.LINE_AA.
text_color (tuple, optional): _description_. Defaults to (0, 255, 0).
text_color_bg (tuple, optional): _description_. Defaults to (0, 0, 0).
Returns:
_type_: _description_
"""
x, y = pos
text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)
text_w, text_h = text_size
cv2.rectangle(img, (x, y + 10), (x + text_w, max(0, y - text_h - 10)), text_color_bg, -1)
cv2.putText(
img=img,
text=text,
org=pos,
fontFace=font,
fontScale=font_scale,
color=text_color,
thickness=font_thickness,
lineType=line_type
)
return text_size
def try_site(youtube_url: str) -> bool:
"""Check if a youtube url is playable
Args:
youtube_url (str): a given url for testing
Returns:
bool: whether or not that youtube_url is playable
"""
pattern = '"playabilityStatus":{"status":"ERROR","reason":"Video unavailable"'
request = requests.get(youtube_url)
return False if pattern in request.text else True
def make_table_from_dict(obj: dict, selected_key: str) -> list:
table = []
for k, v in obj.items():
if k == selected_key:
# print(k, v, selected_key)
table.append({"name": k, "value": v, "selected": True})
else:
table.append({"name": k, "value": v, "selected": False})
return table
def make_table_from_dict_multiselect(
obj: dict, selected_vals: list[int]
) -> list:
table = []
for k, v in obj.items():
if v in selected_vals:
# print(k, v, selected_key)
table.append({"name": k, "value": v, "selected": True})
else:
table.append({"name": k, "value": v, "selected": False})
return table