Spaces:
Runtime error
Runtime error
import pandas as pd | |
import cv2, os | |
import argparse, subprocess | |
import supervision as sv | |
import numpy as np | |
import time, csv, json | |
import datetime, requests | |
def initial_setup()-> None : | |
""" | |
Initializes the data folder, csv file | |
and signal json files. | |
""" | |
if not os.path.exists('data') : | |
os.makedirs('data') | |
if not os.path.exists("data/density.csv") : | |
fp = open('data/density.csv', 'x') | |
fp.close() | |
if not os.path.exists('data/signal.json'): | |
fp = open('data/signal.json', 'x') | |
fp.close() | |
data = {"Flag" : 1, 'initiate' : 1} | |
with open("data/signal.json", "w") as outfile: | |
json.dump(data, outfile) | |
outfile.close() | |
main() | |
def parse_arguments() -> argparse.Namespace: | |
""" | |
Initializes the commandline argument parser. | |
""" | |
parser = argparse.ArgumentParser(description='Crowd detection') | |
parser.add_argument( | |
'--webcam-resolution', | |
default=[1280,720], | |
nargs=2, | |
type=int | |
) | |
args = parser.parse_args() | |
return args | |
def main(): | |
""" | |
Initializes the global variables to use in other methods. | |
""" | |
from ultralytics import YOLO | |
global args, model, frame_count, startSeconds, firstFrame, \ | |
videoFPS, videoHeight, videoWidth, fps_set | |
args = parse_arguments() | |
model = YOLO(model='model.pt') | |
frame_count = 0 | |
startSeconds = datetime.datetime.strptime('00:00:00', '%H:%M:%S') | |
firstFrame = True | |
videoFPS = 0 | |
videoWidth = 0 | |
videoHeight = 0 | |
fps_set = set() | |
def process_frame(frame : np.ndarray, _) -> np.ndarray: | |
""" | |
Processes the frame and return the processed frame | |
with bounding boxex and labels from 'frame' | |
""" | |
ZONE_SIDES = np.array([ | |
[0,0], | |
[videoWidth, 0], | |
[videoWidth, videoHeight], | |
[0,videoHeight] | |
]) | |
zone = sv.PolygonZone(polygon=ZONE_SIDES, frame_resolution_wh=tuple(args.webcam_resolution)) | |
start_time = time.time() | |
from ultralytics import YOLO | |
model = YOLO(model='model.pt', ) | |
results = model(frame, imgsz=1280)[0] | |
detections = sv.Detections.from_yolov8(results) | |
detections = detections[detections.class_id == 0] | |
zone.trigger(detections=detections) | |
box_annotator = sv.BoxAnnotator(thickness=2, text_thickness=1, text_scale=0.5, text_padding = 2) | |
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white()) | |
labels = [ | |
f"{model.model.names[class_id]} {confidence :0.2f}" | |
for _, confidence, class_id,_ | |
in detections | |
] | |
frame = box_annotator.annotate(scene=frame, detections=detections, labels=labels) | |
frame = zone_annotator.annotate(scene=frame) | |
end_time = time.time() | |
fps = 1 / (end_time - start_time) | |
global fps_set | |
fps_set.add(fps) | |
cv2.putText(frame, "FPS: " + str(int(fps)), (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
global frame_count | |
global startSeconds | |
global firstFrame | |
frame_count = frame_count + 1 | |
if firstFrame : | |
writeCSV(startSeconds.strftime('%M:%S'), len(labels)) | |
firstFrame = False | |
my_time = videoFPS * int(max(list(fps_set))) | |
if frame_count == my_time: | |
startSeconds += datetime.timedelta(seconds=2) | |
writeCSV(startSeconds.strftime('%M:%S'), len(labels)) | |
frame_count = 0 | |
return frame | |
def writeCSV(startSeconds, count): | |
""" | |
Writes the counts into a csv file | |
using 'Time' and 'Counts'. | |
""" | |
with open('data/density.csv', mode='a', newline='') as csvfile: | |
csvwriter = csv.writer(csvfile) | |
csvwriter.writerow([startSeconds, count]) | |
def detect(imgPath, ) -> int: | |
""" | |
Detects the person in a image using | |
'Image path' and 'Threshold confidence' | |
""" | |
from ultralytics import YOLO | |
args = parse_arguments() | |
frame_width, frame_height = args.webcam_resolution | |
model = YOLO(model='model.pt',) | |
frame = cv2.imread(imgPath) | |
box_annotator = sv.BoxAnnotator( | |
thickness = 1, | |
text_thickness = 1, | |
text_scale = 0.5, | |
text_padding = 2 | |
) | |
height, width, channels = frame.shape | |
IMG_SIDES = np.array([ | |
[0,0], | |
[width, 0], | |
[width, height], | |
[0,height] | |
]) | |
zone = sv.PolygonZone(polygon=IMG_SIDES, frame_resolution_wh=tuple(args.webcam_resolution)) | |
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white()) | |
result = model(frame)[0] | |
detection = sv.Detections.from_yolov8(result) | |
labels = [ | |
f"{model.model.names[class_id]} {confidence :0.2f}" | |
for _, confidence, class_id,_ | |
in detection | |
] | |
print(f"The count of people in the image is {len(labels)}") | |
frame = box_annotator.annotate(scene = frame, detections = detection, labels = labels) | |
zone.trigger(detections=detection) | |
frame = zone_annotator.annotate(scene=frame) | |
cv2.imwrite('data/result.jpg', frame) | |
return len(labels) | |
def detectVideo(videoPath, ) : | |
""" | |
Detects the person in a Video using | |
'Video path' and 'Threshold confidence' | |
""" | |
video_info = sv.VideoInfo.from_video_path(videoPath) | |
global videoFPS, videoWidth, videoHeight | |
videoFPS = video_info.fps | |
videoHeight = video_info.height | |
videoWidth = video_info.width | |
with open('data/density.csv', 'w', newline='') as csvfile: | |
csvwriter = csv.writer(csvfile) | |
csvwriter.writerow(['Time', 'Count']) | |
sv.process_video(source_path=videoPath, target_path="data/output.mp4", callback=process_frame) | |
def getDataframe(): | |
""" | |
Reads and returns the dataframe from csv file | |
""" | |
df = pd.read_csv('data/density.csv') | |
return df | |
def checkStart() : | |
""" | |
Reads and return the flag status | |
from signal file. | |
""" | |
f = open('data/signal.json','r') | |
data = json.load(f) | |
f.close() | |
return data['initiate'] | |
def doneSetup(): | |
""" | |
Reads and Turn the Flag to off, | |
to indicate that initial setup is done | |
""" | |
f = open('data/signal.json','r') | |
data = json.load(f) | |
f.close() | |
data['initiate'] = 0 | |
with open("data/signal.json", "w") as outfile: | |
json.dump(data, outfile) | |
outfile.close() | |
def getFlag(): | |
""" | |
Reads and return the flag value, | |
""" | |
f = open('data/signal.json','r') | |
data = json.load(f) | |
f.close() | |
return data['Flag'] | |
def setFlag(): | |
""" | |
Reads and Sets the Flag to Off, | |
To restrict processing the video. | |
""" | |
f = open('data/signal.json','r') | |
data = json.load(f) | |
f.close() | |
data['Flag'] = 0 | |
with open("data/signal.json", "w") as outfile: | |
json.dump(data, outfile) | |
outfile.close() | |
def resetFlag(): | |
""" | |
Reads and Sets the Flag to On, | |
To allow processing the video. | |
""" | |
f = open('data/signal.json','r') | |
data = json.load(f) | |
f.close() | |
data['Flag'] = 1 | |
with open("data/signal.json", "w") as outfile: | |
json.dump(data, outfile) | |
outfile.close() | |