steveyin's picture
Update webapp.py
2f552d8 verified
import asyncio
from datetime import datetime
import logging
import cv2
import numpy as np
from pathlib import Path
import torch
from zoneinfo import ZoneInfo
from starlette.middleware import Middleware
from starlette.responses import StreamingResponse, Response
from starlette.requests import Request
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from sse_starlette import EventSourceResponse
from asgi_htmx import HtmxMiddleware
from asgi_htmx import HtmxRequest
from ultralytics import YOLO
from ultralytics_solutions_modified import object_counter, speed_estimation
from vidgear.gears import CamGear
from vidgear.gears.asyncio import WebGear
from vidgear.gears.asyncio.helper import reducer
from helper import (
draw_text, make_table_from_dict_multiselect, make_table_from_dict, try_site
)
HERE = Path(__file__).parent
static = StaticFiles(directory=HERE / ".vidgear/webgear/static")
templates = Jinja2Templates(directory=HERE / ".vidgear/webgear/templates")
EVT_STREAM_DELAY_SEC = 0.05 # second
RETRY_TIMEOUT_MILSEC = 15000 # milisecond
# Create and configure logger
# logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
logging.basicConfig(
format='%(asctime)s %(name)-8s->%(module)-20s->%(funcName)-20s:%(lineno)-4s::%(levelname)-8s %(message)s', # noqa
level=logging.INFO
)
class DemoCase:
def __init__(
self,
FRAME_WIDTH: int = 1280,
FRAME_HEIGHT: int = 720,
YOLO_VERBOSE: bool = True
):
self.FRAME_WIDTH: int = FRAME_WIDTH
self.FRAME_HEIGHT: int = FRAME_HEIGHT
self.YOLO_VERBOSE: bool = YOLO_VERBOSE
self.STREAM_RESOLUTION: str = "720p"
# predefined yolov8 model references
self.model_dict: dict = {
"y8nano": "./data/models/yolov8n.pt",
"y8small": "./data/models/yolov8s.pt",
"y8medium": "./data/models/yolov8m.pt",
"y8large": "./data/models/yolov8l.pt",
"y8huge": "./data/models/yolov8x.pt",
}
self.model_choice_default: str = "y8small"
self.model_choice: str = self.model_choice_default
# predefined youtube live stream urls
self.url_dict: dict = {
"Peace Bridge US": "https://youtu.be/9En2186vo5g",
"Peace Bridge CA": "https://youtu.be/WPMgP2C3_co",
"San Marcos TX": "https://youtu.be/E8LsKcVpL5A",
"4Corners Downtown": "https://youtu.be/ByED80IKdIU",
"Gangnam Seoul": "https://youtu.be/3ottn7kfRuc",
"Time Square NY": "https://youtu.be/QTTTY_ra2Tg",
"Port Everglades-1": "https://youtu.be/67-73mgWDf0",
"Port Everglades-2": "https://youtu.be/Nhuu1QsW5LI",
"Port Everglades-3": "https://youtu.be/Lpm-C_Gz6yM",
}
self.obj_dict: dict = {
"person": 0,
"bicycle": 1,
"car": 2,
"motorcycle": 3,
"airplane": 4,
"bus": 5,
"train": 6,
"truck": 7,
"boat": 8,
"traffic light": 9,
"fire hydrant": 10,
"stop sign": 11,
"parking meter": 12
}
self.cam_loc_default: str = "Peace Bridge US"
self.cam_loc: str = self.cam_loc_default
self.frame_reduction: int = 35
# run time parameters that are from user input
self.roi_height_default: int = int(FRAME_HEIGHT / 2)
self.roi_height: int = self.roi_height_default
self.roi_thickness_half_default: int = 30
self.roi_thickness_half: int = self.roi_thickness_half_default
self.obj_class_id_default: list[int] = [2, 3, 5, 7]
self.obj_class_id: list[int] = self.obj_class_id_default
self.conf_threshold: float = 0.25
self.iou_threshold: float = 0.7
self.use_FP16: bool = False
self.use_stream_buffer: bool = True
self.stream0: CamGear = None
self.stream1: CamGear = None
self.counter = None
self.speed_obj = None
# define some logic flow control booleans
self._is_running: bool = False
self._is_tracking: bool = False
self._roi_changed: bool = False
def load_model(
self,
model_choice: str = "y8small",
conf_threshold: float = 0.25,
iou_threshold: float = 0.7,
use_FP16: bool = False,
use_stream_buffer: bool = False
) -> None:
"""
load the YOLOv8 model of choice
"""
if model_choice not in self.model_dict:
logging.warning(
f'\"{model_choice}\" not found in the model_dict, use '
f'\"{self.model_dict[self.model_choice_default]}\" instead!'
)
self.model_choice = self.model_choice_default
else:
self.model_choice = model_choice
self.model = YOLO(f"{self.model_dict[self.model_choice]}")
# push the model to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
torch.cuda.set_device(0)
self.model.to(device)
logging.info(
f"{self.model_dict[self.model_choice]} loaded using "
f"torch w GPU0"
)
else:
logging.info(
f"{self.model_dict[self.model_choice]} loaded using CPU"
)
# setup some configs
self.conf_threshold = conf_threshold if conf_threshold > 0.0 else 0.25 # noqa
self.iou_threshold = iou_threshold if iou_threshold > 0.0 else 0.7
self.use_FP16 = use_FP16
self.use_stream_buffer = use_stream_buffer
logging.info(
f"{self.model_choice}: conf={self.conf_threshold:.2f} | "
f"iou={self.iou_threshold:.2f} | FP16={self.use_FP16} | "
f"stream_buffer={self.use_stream_buffer}"
)
def select_cam_loc(
self,
cam_loc_key: str = "Peace Bridge US",
cam_loc_val: str = "https://www.youtube.com/watch?v=9En2186vo5g"
) -> None:
"""
select camera video feed from url_dict, or set as a new url
"""
if (bool(cam_loc_key) is False or bool(cam_loc_val) is False):
self.cam_loc = self.cam_loc_default
logging.warning(
f'input cam_loc_key, cam_loc_val pair invalid, use default '
f'{{{self.cam_loc_default}: '
f'{self.url_dict[self.cam_loc_default]}}}'
)
elif cam_loc_key not in self.url_dict:
if try_site(self.url_dict[self.cam_loc]):
self.url_dict.update({cam_loc_key: cam_loc_val})
self.cam_loc = cam_loc_key
logging.info(
f'input cam_loc key:val pair is new and playable, add '
f'{{{cam_loc_key}:{cam_loc_val}}} into url_dict'
)
else:
self.cam_loc = self.cam_loc_default
logging.warning(
f'input cam_loc key:val pair is new but not playable, '
f'roll back to default {{{self.cam_loc_default}: '
f'{self.url_dict[self.cam_loc_default]}}}'
)
self.cam_loc = self.cam_loc_default
else:
self.cam_loc = cam_loc_key
logging.info(
f'use {{{self.cam_loc}: {self.url_dict[self.cam_loc]}}} as source'
)
def select_obj_class_id(
self,
obj_names: list[str] = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus",
"train", "truck", "boat", "traffic light", "fire hydrant",
"stop sign", "parking meter"
]
) -> None:
"""
select object class id list based on the input obj_names str list
"""
if (bool(obj_names) is False):
self.obj_class_id = self.obj_class_id_default
logging.warning(
f'input obj_names invalid, use default id {self.obj_class_id_default}'
)
else:
obj_class_id = []
for name in obj_names:
if name in list(self.obj_dict.keys()):
obj_class_id.append(self.obj_dict[name])
if (len(obj_class_id) == 0):
self.obj_class_id = self.obj_class_id_default
logging.warning(
f'input obj_names invalid, use default id '
f'{self.obj_class_id_default}'
)
else:
self.obj_class_id = obj_class_id
logging.info(f'object class id set as {self.obj_class_id}')
# def set_roi(self, roi_height: int = 360, roi_thickness_half: int = 30):
def set_roi(self, roi_height: int = 360):
if (roi_height < 120 or roi_height > 600):
self.roi_height = int(self.FRAME_HEIGHT / 2)
logging.warning(
f'roi_height invalid, use default {int(self.FRAME_HEIGHT / 2)}'
)
else:
self.roi_height = roi_height
logging.info(f'roi_height is set at {self.roi_height}')
self.roi_thickness_half = self.roi_thickness_half_default
'''
if (
roi_thickness_half > 0 and
roi_thickness_half < int(self.FRAME_HEIGHT / 2)
):
if (self.roi_height + roi_thickness_half > self.FRAME_HEIGHT):
self.roi_thickness_half = self.FRAME_HEIGHT - self.roi_height
elif (self.roi_height - roi_thickness_half < 0):
self.roi_thickness_half = self.roi_height
else:
self.roi_thickness_half = roi_thickness_half
logging.info(
f'roi_thickness_half is set at {self.roi_thickness_half}'
)
else:
self.roi_thickness_half = self.roi_thickness_half_default
logging.warning('roi_half_thickness invalid, use default 30')
'''
def set_frame_reduction(self, frame_reduction: int = 35):
if (frame_reduction < 0 or frame_reduction > 100):
self.frame_reduction = 35
logging.warning(
f'frame_reduction:{frame_reduction} invalid, '
f'use default value 35'
)
else:
self.frame_reduction = frame_reduction
logging.info(f'frame_reduction is set at {self.frame_reduction}')
async def frame0_producer(self):
"""
!!! define your original video source here !!!
Yields:
_type_: an image frame as a bytestring output from the producer
"""
while True:
if self._is_running:
if self.stream0 is None:
try:
# Start the stream, set desired resolution to be 720p
options = {"STREAM_RESOLUTION": "720p"}
self.stream0 = CamGear(
source=self.url_dict[self.cam_loc],
colorspace=None,
stream_mode=True,
logging=True,
**options
).start()
except Exception:
# Start the stream, set best resolution
self.stream0 = CamGear(
source=self.url_dict[self.cam_loc],
colorspace=None,
stream_mode=True,
logging=True
).start()
logging.warning(
f"failed to connect {self.url_dict[self.cam_loc]} "
f"at 720p resolution, use best resolution"
)
try:
# loop over frames
while (self.stream0 is not None and self._is_running):
frame = self.stream0.read()
if frame is None:
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
elif frame.shape != (self.FRAME_HEIGHT, self.FRAME_WIDTH, 3):
frame = cv2.resize(frame, (self.FRAME_HEIGHT, self.FRAME_WIDTH))
# do something with your OpenCV frame here
draw_text(
img=frame,
text=datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S") + " PDT",
pos=(int(self.FRAME_WIDTH - 500), 50),
font=cv2.FONT_HERSHEY_SIMPLEX,
font_scale=1,
font_thickness=2,
line_type=cv2.LINE_AA,
text_color=(0, 255, 255),
text_color_bg=(0, 0, 0),
)
# reducer frame size for performance, percentage int
frame = await reducer(
frame, percentage=self.frame_reduction
)
# handle JPEG encoding & yield frame in byte format
img_encoded = cv2.imencode(".jpg", frame)[1].tobytes()
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
if self.stream0 is not None:
self.stream0.stop()
while self.stream0.read() is not None:
continue
self.stream0 = None
self._is_running = False
except asyncio.CancelledError:
if self.stream0 is not None:
self.stream0.stop()
while self.stream0.read() is not None:
continue
self.stream0 = None
self._is_running = False
logging.warning(
"client disconneted in frame0_producer"
)
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
frame = await reducer(
frame, percentage=self.frame_reduction
)
img_encoded = cv2.imencode(".jpg", frame)[1].tobytes()
logging.info(
f"_is_running is {self._is_running} in frame0_producer"
)
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
else:
if self._is_running is True:
pass
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
frame = await reducer(
frame, percentage=self.frame_reduction
)
img_encoded = cv2.imencode(".jpg", frame)[1].tobytes()
logging.info(
f"_is_running is {self._is_running} in frame0_producer"
)
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
async def frame1_producer(self):
"""
!!! define your processed video producer here !!!
Yields:
_type_: an image frame as a bytestring output from the producer
"""
while True:
if self._is_running:
if self.stream1 is None:
try:
# Start the stream, set desired quality as 720p
options = {"STREAM_RESOLUTION": "720p"}
self.stream1 = CamGear(
source=self.url_dict[self.cam_loc],
colorspace=None,
stream_mode=True,
logging=True,
**options
).start()
except Exception:
# Start the stream, use the best resolution
self.stream1 = CamGear(
source=self.url_dict[self.cam_loc],
colorspace=None,
stream_mode=True,
logging=True
).start()
logging.warning(
f"failed to connect {self.url_dict[self.cam_loc]} "
f"at 720p resolution, use best resolution"
)
if (self._is_tracking and self.stream1 is not None):
if self.counter is None or self._roi_changed:
# setup object counter & speed estimator
region_points = [
(5, -self.roi_thickness_half + self.roi_height),
(5, self.roi_thickness_half + self.roi_height),
(
self.FRAME_WIDTH - 5,
self.roi_thickness_half + self.roi_height
),
(
self.FRAME_WIDTH - 5,
-self.roi_thickness_half + self.roi_height
),
]
self.counter = object_counter.ObjectCounter()
self.counter.set_args(
view_img=False,
reg_pts=region_points,
classes_names=self.model.names,
draw_tracks=False,
draw_boxes=False,
draw_reg_pts=True,
)
self._roi_changed = False
if self.speed_obj is None or self._roi_changed:
# Init speed estimator
line_points = [
(5, self.roi_height),
(self.FRAME_WIDTH - 5, self.roi_height)
]
self.speed_obj = speed_estimation.SpeedEstimator()
self.speed_obj.set_args(
reg_pts=line_points,
names=self.model.names,
view_img=False
)
self._roi_changed = False
try:
while (self.stream1 is not None and self._is_running):
if self._roi_changed:
# setup object counter & speed estimator
region_points = [
(5, -self.roi_thickness_half + self.roi_height),
(5, self.roi_thickness_half + self.roi_height),
(
self.FRAME_WIDTH - 5,
self.roi_thickness_half + self.roi_height
),
(
self.FRAME_WIDTH - 5,
-self.roi_thickness_half + self.roi_height
),
]
self.counter = object_counter.ObjectCounter()
self.counter.set_args(
view_img=False,
reg_pts=region_points,
classes_names=self.model.names,
draw_tracks=False,
draw_boxes=False,
draw_reg_pts=True,
)
# Init speed estimator
line_points = [
(5, self.roi_height),
(self.FRAME_WIDTH - 5, self.roi_height)
]
self.speed_obj = speed_estimation.SpeedEstimator()
self.speed_obj.set_args(
reg_pts=line_points,
names=self.model.names,
view_img=False
)
self._roi_changed = False
# read frame from provided source
frame = self.stream1.read()
if frame is None:
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
elif frame.shape != (self.FRAME_HEIGHT, self.FRAME_WIDTH, 3):
frame = cv2.resize(frame, (self.FRAME_WIDTH, self.FRAME_HEIGHT))
# do something with your OpenCV frame here
draw_text(
img=frame,
text=datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S") + " PDT",
pos=(self.FRAME_WIDTH - 500, 50),
font=cv2.FONT_HERSHEY_SIMPLEX,
font_scale=1,
font_thickness=2,
line_type=cv2.LINE_AA,
text_color=(0, 255, 255),
text_color_bg=(0, 0, 0),
)
frame_tagged = frame
if (
self._is_tracking and self.model is not None
and self.speed_obj is not None
and self.counter is not None
and self._roi_changed is False
):
# YOLOv8 tracking, persisting tracks between frames
results = self.model.track(
source=frame,
classes=self.obj_class_id,
conf=self.conf_threshold,
iou=self.iou_threshold,
half=self.use_FP16,
stream_buffer=self.use_stream_buffer,
persist=True,
show=False,
verbose=self.YOLO_VERBOSE
)
if results[0].boxes.id is None:
pass
else:
self.speed_obj.estimate_speed(
frame_tagged, results
)
self.counter.start_counting(
frame_tagged, results
)
# reducer frames size for performance, int percentage
frame_tagged = await reducer(
frame=frame_tagged,
percentage=self.frame_reduction
)
# handle JPEG encoding & yield frame in byte format
img_encoded = \
cv2.imencode(".jpg", frame_tagged)[1].tobytes()
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
if self.stream1 is not None:
self.stream1.stop()
while self.stream1.read() is not None:
continue
self.stream1 = None
self._is_tracking = False
self._is_running = False
except asyncio.CancelledError:
if self.stream1 is not None:
self.stream1.stop()
while self.stream1.read() is not None:
continue
self.stream1 = None
self._is_tracking = False
self._is_running = False
logging.warning(
"client disconnected in frame1_producer"
)
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
frame = await reducer(
frame, percentage=self.frame_reduction
)
img_encoded = cv2.imencode(".jpg", frame)[1].tobytes()
logging.info(
f"_is_running is {self._is_running} in frame0_producer"
)
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
else:
if self._is_running is True:
pass
frame = (np.random.standard_normal([
self.FRAME_HEIGHT, self.FRAME_WIDTH, 3
]) * 255).astype(np.uint8)
# reducer frame size for more performance, percentage int
frame = await reducer(frame, percentage=self.frame_reduction)
# handle JPEG encoding & yield frame in byte format
img_encoded = cv2.imencode(".jpg", frame)[1].tobytes()
yield (
b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" +
img_encoded + b"\r\n"
)
await asyncio.sleep(0.00001)
async def custom_video_response(self, scope):
"""
Return a async video streaming response for `frame1_producer` generator
Tip1: use BackgroundTask to handle the async cleanup
https://github.com/tiangolo/fastapi/discussions/11022
Tip2: use is_disconnected to check client disconnection
https://www.starlette.io/requests/#body
https://github.com/encode/starlette/pull/320/files/d56c917460a1e6488e1206c428445c39854859c1
"""
assert scope["type"] in ["http", "https"]
await asyncio.sleep(0.00001)
return StreamingResponse(
content=self.frame1_producer(),
media_type="multipart/x-mixed-replace; boundary=frame"
)
async def models(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
if len(self.model_dict) == 0:
template = "partials/ack.html"
table_contents = ["model list unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
template = "partials/yolo_models.html"
table_contents = make_table_from_dict(
self.model_dict, self.model_choice
)
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.001)
return response
async def urls(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
if len(self.url_dict) == 0:
template = "partials/ack.html"
table_contents = ["streaming url list unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
template = "partials/camera_streams.html"
table_contents = make_table_from_dict(self.url_dict, self.cam_loc)
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.01)
return response
async def objects(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
if len(self.obj_dict) == 0:
template = "partials/ack.html"
table_contents = ["object list unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
template = "partials/object_list.html"
table_contents = make_table_from_dict_multiselect(
self.obj_dict, self.obj_class_id
)
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.001)
return response
async def geturl(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
if len(self.url_dict) == 0:
template = "partials/ack.html"
table_contents = ["streaming url list unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
template = "partials/ack.html"
if self.cam_loc in self.url_dict.keys():
table_contents = [f"{self.cam_loc} selected"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=201
)
else:
table_contents = [
f"{self.cam_loc} is not in the registered url_list"
]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
await asyncio.sleep(0.01)
return response
async def addurl(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
try:
req_json = await request.json()
except RuntimeError:
template = "partials/ack.html"
table_contents = ["receive channel unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
response.headers['Hx-Retarget'] = '#add-url-ack'
await asyncio.sleep(0.01)
return response
if (
"payload" in req_json
and "CamLoc" in req_json["payload"] and "URL" in req_json["payload"]
):
cam_loc = req_json["payload"]["CamLoc"]
cam_url = req_json["payload"]["URL"]
if cam_loc != "" and cam_url != "":
if try_site(cam_url) is False:
template = "partials/ack.html"
table_contents = ["invalid video URL!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
response.headers['Hx-Retarget'] = '#add-url-ack'
else:
self.select_cam_loc(
cam_loc_key=cam_loc, cam_loc_val=cam_url
)
template = "partials/camera_streams.html"
table_contents = make_table_from_dict(
self.url_dict, self.cam_loc
)
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=201
)
else:
template = "partials/ack.html"
table_contents = ["empty or invalid inputs!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
response.headers['Hx-Retarget'] = '#add-url-ack'
else:
template = "partials/ack.html"
table_contents = ["invalid POST request!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
response.headers['Hx-Retarget'] = '#add-url-ack'
await asyncio.sleep(0.01)
return response
async def seturl(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
table_contents = ["receive channel unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
await asyncio.sleep(0.01)
return response
if ("payload" in req_json and "cam_url" in req_json["payload"]):
logging.info(
f"seturl: _is_running = {self._is_running}, "
f"_is_tracking = {self._is_tracking}"
)
if (self._is_running is True or self._is_tracking is True):
table_contents = ["turn off streaming and tracking before \
setting a new camera stream!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
else:
cam_url = req_json["payload"]["cam_url"]
url_list = list(filter(
lambda x: self.url_dict[x] == cam_url, self.url_dict
))
if len(url_list) > 0:
self.cam_loc = url_list[0]
table_contents = [f"{self.cam_loc} selected"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=201
)
else:
table_contents = [
f"{cam_url} is not in the registered url_list"
]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
else:
table_contents = ["invalid POST request!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
await asyncio.sleep(0.01)
return response
async def getmodel(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
if len(self.model_dict) == 0:
template = "partials/ack.html"
table_contents = ["model list unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
template = "partials/ack.html"
if self.model_choice in self.model_dict.keys():
table_contents = [f"{self.model_choice} selected"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=201
)
else:
table_contents = [
f"{self.model_choice} is not in the registered model_list"
]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-url-ack'
await asyncio.sleep(0.01)
return response
async def setmodel(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
table_contents = ["receive channel unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
await asyncio.sleep(0.01)
return response
if ("payload" in req_json and "model_path" in req_json["payload"]):
logging.info(
f"setmodel: _is_running = {self._is_running}, "
f"_is_tracking = {self._is_tracking}"
)
if (self._is_tracking is True):
table_contents = ["turn off tracking before setting a new \
YOLO model!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
model_path = req_json["payload"]["model_path"]
model_list = list(filter(
lambda x: self.model_dict[x] == model_path, self.model_dict
))
if len(model_list) > 0:
self.model_choice = model_list[0]
self.load_model(
model_choice=self.model_choice,
conf_threshold=self.conf_threshold,
iou_threshold=self.iou_threshold,
use_FP16=self.use_FP16,
use_stream_buffer=self.use_stream_buffer
)
table_contents = [f"{self.model_choice} selected"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=201
)
else:
table_contents = [
f"{model_path} is not in the registered model_list"
]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
else:
table_contents = ["invalid POST request!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
# response.headers['Hx-Retarget'] = '#set-model-ack'
await asyncio.sleep(0.01)
return response
async def selectobjects(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
table_contents = ["receive channel unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.01)
return response
if ("payload" in req_json and "object_id" in req_json["payload"]):
logging.info(f"requested_ids: {req_json['payload']}")
req_ids = req_json["payload"]["object_id"]
if len(req_ids) > 0:
self.obj_class_id = [
int(id) for id in req_ids
if int(id) in self.obj_dict.values()
]
if len(self.obj_class_id) > 0:
table_contents = [
f"{len(self.obj_class_id)} object types selected"
]
else:
self.obj_class_id = self.obj_class_id_default
table_contents = [
"invalid objects selection, use default object types"
]
else:
table_contents = ["invalid POST request! need at least one object type"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.01)
return response
async def setroi(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
table_contents = ["receive channel unavailable!"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.01)
return response
if ("payload" in req_json and "roi_height" in req_json["payload"]):
logging.info(f"{req_json['payload']}")
req_height = (int)(req_json["payload"]["roi_height"])
if (
req_height >= 120 and req_height <= 600 and
req_height < self.FRAME_HEIGHT
):
self.roi_height = self.FRAME_HEIGHT - req_height
table_contents = [
f"roi_height set at "
f"{self.FRAME_HEIGHT - self.roi_height}px"
]
else:
self.roi_height = self.roi_height_default
table_contents = [
f"invalid roi_height request, use default"
f"{self.FRAME_HEIGHT - self.roi_height_default}px"
]
self._roi_changed = True
else:
table_contents = ["invalid POST request! need a valid roi_height"]
context = {"request": request, "table": table_contents}
response = templates.TemplateResponse(
template, context, status_code=200
)
await asyncio.sleep(0.01)
return response
async def streamswitch(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
context = {
"request": request, "table": ["receive channel unavailable!"]
}
status_code = 200
await asyncio.sleep(0.01)
return templates.TemplateResponse(
template, context, status_code=status_code
)
if "payload" in req_json:
logging.info(f"payload = {req_json['payload']}")
if (
"stream_switch" in req_json["payload"]
and req_json["payload"]["stream_switch"] == "on"
):
self._is_running = True
self._is_tracking = False
table_contents = ["on"]
status_code = 201
else:
self._is_running = False
self._is_tracking = False
table_contents = ["off"]
status_code = 201
else:
table_contents = ["invalid POST request!"]
status_code = 200
context = {"request": request, "table": table_contents}
await asyncio.sleep(0.1)
return templates.TemplateResponse(
template, context, status_code=status_code
)
async def trackingswitch(self, request: HtmxRequest) -> Response:
# assert (htmx := request.scope["htmx"])
template = "partials/ack.html"
try:
req_json = await request.json()
except RuntimeError:
context = {
"request": request, "table": ["receive channel unavailable!"]
}
status_code = 200
await asyncio.sleep(0.01)
return templates.TemplateResponse(
template, context, status_code=status_code
)
if "payload" in req_json:
logging.info(f"payload = {req_json['payload']}")
if (
"tracking_switch" in req_json["payload"]
and req_json["payload"]["tracking_switch"] == "on"
):
self._is_tracking = True and self._is_running
else:
self._is_tracking = False
if self._is_tracking:
table_contents = ["on"]
status_code = 201
# setup object counter & speed estimator
region_points = [
(5, -20 + self.roi_height),
(5, 20 + self.roi_height),
(self.FRAME_WIDTH - 5, 20 + self.roi_height),
(self.FRAME_WIDTH - 5, -20 + self.roi_height),
]
self.counter = object_counter.ObjectCounter()
self.counter.set_args(
view_img=False,
reg_pts=region_points,
classes_names=self.model.names,
draw_tracks=False,
draw_boxes=False,
draw_reg_pts=True,
)
# Init speed estimator
line_points = [
(5, self.roi_height),
(self.FRAME_WIDTH - 5, self.roi_height)
]
self.speed_obj = speed_estimation.SpeedEstimator()
self.speed_obj.set_args(
reg_pts=line_points,
names=self.model.names,
view_img=False
)
else:
table_contents = ["off"]
status_code = 201
else:
table_contents = ["invalid POST request!"]
status_code = 200
context = {"request": request, "table": table_contents}
await asyncio.sleep(0.1)
return templates.TemplateResponse(
template, context, status_code=status_code
)
async def sse_incounts(self, request: Request):
async def event_generator():
_stop_sse = False
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
yield {
"event": "evt_in_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": "..."
}
break
if self._is_running:
if self._is_tracking:
if _stop_sse is True:
_stop_sse = False
incounts_msg = self.counter.incounts_updated()
if (self.counter is not None and incounts_msg):
yield {
"event": "evt_in_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": f"{self.counter.in_counts}"
}
else:
if _stop_sse is False:
yield {
"event": "evt_in_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": "---"
}
_stop_sse = True
await asyncio.sleep(EVT_STREAM_DELAY_SEC)
return EventSourceResponse(event_generator())
async def sse_outcounts(self, request: Request):
async def event_generator():
_stop_sse = False
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
yield {
"event": "evt_out_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": "..."
}
break
if self._is_running:
if self._is_tracking:
if _stop_sse is True:
_stop_sse = False
outcounts_msg = self.counter.outcounts_updated()
if (self.counter is not None and outcounts_msg):
yield {
"event": "evt_out_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": f"{self.counter.out_counts}"
}
else:
if _stop_sse is False:
yield {
"event": "evt_out_counts",
"id": datetime.now(
tz=ZoneInfo("America/Los_Angeles")
).strftime("%m/%d/%Y %H:%M:%S"),
"retry": RETRY_TIMEOUT_MILSEC,
"data": "---"
}
_stop_sse = True
await asyncio.sleep(EVT_STREAM_DELAY_SEC)
return EventSourceResponse(event_generator())
# is_huggingface = False
# define the host url and port for webgear server
# HOST_WEBGEAR, PORT_WEBGEAR = "localhost", 8080
# instantiate a demo case
demo_case = DemoCase(YOLO_VERBOSE=False)
demo_case.set_frame_reduction(frame_reduction=35)
demo_case.load_model(
model_choice="y8small",
conf_threshold=0.1,
iou_threshold=0.6,
use_FP16=False,
use_stream_buffer=True
)
logging.info(f"url_dict: {demo_case.url_dict}")
logging.info(f"model_dict: {demo_case.model_dict}")
logging.info(f"obj_dict: {demo_case.obj_dict}")
logging.info(f"obj_class_id: {demo_case.obj_class_id}")
# logging.info(f"model.names: {demo_case.model.names}")
# setup webgear server
options = {
"custom_data_location": "./",
}
web = WebGear(
logging=True, **options
)
# config webgear server
web.config["generator"] = demo_case.frame1_producer
web.config["middleware"] = [Middleware(HtmxMiddleware)]
web.routes.append(Mount("/static", static, name="static"))
# web.routes.append(
# Route("/video1", endpoint=demo_case.custom_video_response)
# )
routes_dict = {
"models": (demo_case.models, ["GET"]),
"getmodel": (demo_case.getmodel, ["GET"]),
"setmodel": (demo_case.setmodel, ["POST"]),
"urls": (demo_case.urls, ["GET"]),
"addurl": (demo_case.addurl, ["POST"]),
"geturl": (demo_case.geturl, ["GET"]),
"seturl": (demo_case.seturl, ["POST"]),
"objects": (demo_case.objects, ["GET"]),
"selectobjects": (demo_case.selectobjects, ["POST"]),
"setroi": (demo_case.setroi, ["POST"]),
"streamswitch": (demo_case.streamswitch, ["POST"]),
"trackingswitch": (demo_case.trackingswitch, ["POST"]),
}
for k, v in routes_dict.items():
web.routes.append(
Route(path=f"/{k}", endpoint=v[0], name=k, methods=v[1])
)
web.routes.append(Route(
path="/sseincounts",
endpoint=demo_case.sse_incounts,
name="sseincounts"
))
web.routes.append(Route(
path="/sseoutcounts",
endpoint=demo_case.sse_outcounts,
name="sseoutcounts"
))
# if is_huggingface is False:
# # run this app on Uvicorn server at address http://localhost:8080/
# uvicorn.run(
# web(), host=HOST_WEBGEAR, port=PORT_WEBGEAR, log_level="info"
# )
# # close app safely
# web.shutdown()
#
# or launch it using cli --
# uvicorn webapp:web --host "localhost" --port 8080 --reload