tanlocc commited on
Commit
213de71
·
1 Parent(s): f67fcbe

add base file for api

Browse files
Files changed (3) hide show
  1. __init__.py +10 -0
  2. base.py +73 -0
  3. utils.py +323 -0
__init__.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from .localize_with_landmark import *
2
+ from .check_mask import *
3
+ from .check_liveness import *
4
+ from .estimate_headpose import *
5
+ from .extract_facevector import *
6
+ from .extract_agegender import *
7
+ from .extract_emotion import *
8
+ from .configs import TASK_CONFIG
9
+
10
+
base.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import onnxruntime
2
+ import random
3
+ import numpy as np
4
+ from pathlib import Path
5
+ from numpy.typing import NDArray
6
+ from typing import Any, List
7
+ from .utils import count_gpus, get_memory_free_MiB
8
+ from abc import ABC, abstractclassmethod
9
+
10
+ __dir__ = Path(__file__).parent
11
+
12
+ class ONNXBaseTask(ABC):
13
+ num_gpus: int = count_gpus()
14
+
15
+ def __init__(self, weight: str) -> None:
16
+ self.session = self.initialize_session(weight)
17
+ self.input_metadata = self.session.get_inputs()[0]
18
+ self.prepare_input = self.setup_prepare_input()
19
+
20
+ # warmup model
21
+ input_height, input_width = self.input_metadata.shape[-2:]
22
+ temp = np.zeros((1, 3, int(input_height) if int(input_height) > 0 else 320, int(input_width) if int(input_width) > 0 else 320), dtype=np.float32)
23
+ self.run_session(temp)
24
+
25
+ @abstractclassmethod
26
+ def process_output(self, raw_outputs: List[NDArray], **kwargs) -> Any:
27
+ pass
28
+
29
+ @abstractclassmethod
30
+ def setup_prepare_input(self):
31
+ pass
32
+
33
+ def call(self, image) -> Any:
34
+ input_height, input_width = self.input_metadata.shape[-2:]
35
+
36
+ # predict
37
+ input_value = self.prepare_input(image, height=input_height, width=input_width)
38
+ raw_outputs = self.run_session(input_value)
39
+
40
+ return self.process_output(raw_outputs)
41
+
42
+ def run_session(self, input_value: NDArray) -> List[NDArray]:
43
+ input_dict = {self.input_metadata.name : input_value}
44
+
45
+ return self.session.run(None, input_dict)
46
+
47
+ def initialize_session(self, weight: str):
48
+ # get avaiable runtime
49
+ providers=[]
50
+ if self.num_gpus == 0:
51
+ providers += [("CPUExecutionProvider", {})]
52
+ else:
53
+ providers += [(
54
+ "CUDAExecutionProvider",
55
+ {
56
+ "device_id": random.choice([i for i in range(self.num_gpus) if get_memory_free_MiB(i) >= 1000])
57
+ }
58
+ )]
59
+
60
+ # init session
61
+ return onnxruntime.InferenceSession(
62
+ str(__dir__.parent.parent.parent/weight),
63
+ providers=providers
64
+ )
65
+
66
+
67
+
68
+
69
+
70
+
71
+
72
+
73
+
utils.py ADDED
@@ -0,0 +1,323 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import cv2
3
+ import subprocess
4
+ import math
5
+ from itertools import product as product
6
+ from numpy.typing import NDArray
7
+ from typing import List
8
+ import argparse
9
+ import pynvml
10
+ from dataclasses import dataclass
11
+ from skimage import transform
12
+
13
+ def parse_args():
14
+ @dataclass
15
+ class Argument:
16
+ image_path: str
17
+ weight_path: str
18
+
19
+ # parse argument
20
+ parser = argparse.ArgumentParser(
21
+ prog="Run AI Tasks",
22
+ description="call builded task belong to Face",
23
+ )
24
+ parser.add_argument(
25
+ "--image", type=str, default="samples/An_2000.jpg", help="path to tested image"
26
+ )
27
+ parser.add_argument(
28
+ "--weight", type=str, default="weights/retinaface_mobilev3.onnx", help="path to weight"
29
+ )
30
+
31
+ args = parser.parse_args()
32
+ return Argument(
33
+ image_path=args.image,
34
+ weight_path=args.weight
35
+ )
36
+
37
+ def get_memory_free_MiB(gpu_index):
38
+ pynvml.nvmlInit()
39
+ handle = pynvml.nvmlDeviceGetHandleByIndex(int(gpu_index))
40
+ mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
41
+ return mem_info.free // 1024 ** 2
42
+
43
+ def count_gpus():
44
+ try:
45
+ output = subprocess.check_output(['nvidia-smi', '--query-gpu=count', '--format=csv,noheader'], encoding='utf-8')
46
+ num_gpus = int(output.strip().split('\n')[0])
47
+ except subprocess.CalledProcessError:
48
+ num_gpus = 0
49
+
50
+ return num_gpus
51
+
52
+ def prepare_input_wraper(inter=1, mean=None, std=None, channel_first=True, color_space="BGR", is_scale=False):
53
+ '''
54
+ THIS PROCESS WAY WILL OPTIMIZE RUNTIME (scaling will bit slower than no scaling)
55
+ ==========================================================================
56
+ inter: resize type (0: Nearest, 1: Linear, 2: Cubic)
57
+
58
+ is_scale: whether we scale image in range(0,1) to normalize or not
59
+ NOTE: image normalize with scale DIFFERENT normalize no scale
60
+ mean: expected value of distribution
61
+ std: standard deviation of distribution
62
+
63
+ channel_first: True is (c,h,w), False is (h,w,c)
64
+ color_space: BGR (default of cv2), RGB
65
+ ==========================================================================
66
+
67
+ '''
68
+ if mean is not None and std is not None:
69
+ mean = mean if isinstance(mean, list) or isinstance(mean, tuple) else [mean]*3
70
+ std = std if isinstance(std, list) or isinstance(std, tuple) else [std]*3
71
+
72
+ def call(img: NDArray, width: int, height: int):
73
+ '''
74
+ weight: input width of input model
75
+ height: input height of input model
76
+ '''
77
+
78
+ if img.shape[0] != height or img.shape[1] != width:
79
+ image = cv2.resize(img.copy(), (width, height), interpolation=inter)
80
+ else:
81
+ image = img.copy()
82
+
83
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if color_space == "RGB" else image
84
+ image = image.transpose((2,0,1)) if channel_first else image
85
+ image = image.astype(np.float32)
86
+
87
+ # scale image in range(0,1)
88
+ if is_scale:
89
+ image /= 255
90
+
91
+ if mean is not None and std is not None:
92
+ if channel_first:
93
+ image[0, :, :] -= mean[0]; image[1, :, :] -= mean[1]; image[2, :, :] -= mean[2]
94
+ image[0, :, :] /= std[0] ; image[1, :, :] /= std[1] ; image[2, :, :] /= std[2]
95
+ else:
96
+ image[:, :, 0] -= mean[0]; image[:, :, 1] -= mean[1]; image[:, :, 2] -= mean[2]
97
+ image[:, :, 0] /= std[0] ; image[:, :, 1] /= std[1] ; image[:, :, 2] /= std[2]
98
+
99
+ return image[np.newaxis, :]
100
+
101
+ return call
102
+
103
+ # =============================External Process image
104
+ def class_letterbox(im, new_shape=(640, 640), color=(0, 0, 0), scaleup=True):
105
+ # Resize and pad image while meeting stride-multiple constraints
106
+ shape = im.shape[:2] # current shape [height, width]
107
+ if isinstance(new_shape, int):
108
+ new_shape = (new_shape, new_shape)
109
+
110
+ if im.shape[0] == new_shape[0] and im.shape[1] == new_shape[1]:
111
+ return im
112
+
113
+ # Scale ratio (new / old)
114
+ r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
115
+ if not scaleup: # only scale down, do not scale up (for better val mAP)
116
+ r = min(r, 1.0)
117
+
118
+ # Compute padding
119
+ # ratio = r, r # width, height ratios
120
+ new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
121
+ dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
122
+
123
+ dw /= 2 # divide padding into 2 sides
124
+ dh /= 2
125
+
126
+ if shape[::-1] != new_unpad: # resize
127
+ im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
128
+ top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
129
+ left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
130
+ im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
131
+ return im
132
+
133
+ def get_new_box(src_w: int, src_h: int, bbox: List[int], scale: float):
134
+ x, y, xmax, ymax = bbox
135
+ box_w = (xmax - x)
136
+ box_h = (ymax - y)
137
+
138
+ # Re-calculate scale ratio
139
+ scale = min((src_h-1)/box_h, min((src_w-1)/box_w, scale))
140
+
141
+ # get new width and height with scale ratio
142
+ new_width = box_w * scale
143
+ new_height = box_h * scale
144
+ center_x, center_y = box_w/2+x, box_h/2+y
145
+
146
+ # calculate bbox with new width and height
147
+ left_top_x = center_x-new_width/2
148
+ left_top_y = center_y-new_height/2
149
+ right_bottom_x = center_x+new_width/2
150
+ right_bottom_y = center_y+new_height/2
151
+
152
+ # bbox must be in image
153
+ if left_top_x < 0:
154
+ right_bottom_x -= left_top_x
155
+ left_top_x = 0
156
+
157
+ if left_top_y < 0:
158
+ right_bottom_y -= left_top_y
159
+ left_top_y = 0
160
+
161
+ if right_bottom_x > src_w-1:
162
+ left_top_x -= right_bottom_x-src_w+1
163
+ right_bottom_x = src_w-1
164
+
165
+ if right_bottom_y > src_h-1:
166
+ left_top_y -= right_bottom_y-src_h+1
167
+ right_bottom_y = src_h-1
168
+
169
+ return int(left_top_x), int(left_top_y),\
170
+ int(right_bottom_x), int(right_bottom_y)
171
+
172
+ def align_face(image: NDArray, bounding_box: List[int], landmark: List[int], use_bbox: int=True):
173
+ src = np.array(landmark).reshape(-1, 2)
174
+ if use_bbox:
175
+ # crop face
176
+ x1, y1, x2, y2 = bounding_box
177
+ image = image[y1:y2+1, x1:x2+1]
178
+
179
+ # align
180
+ src -= np.array([x1, y1])
181
+
182
+ des = np.array(
183
+ [
184
+ [38.2946, 51.6963],
185
+ [73.5318, 51.5014],
186
+ [56.0252, 71.7366],
187
+ [38.2946, 92.3655],
188
+ [70.7299, 92.2041],
189
+ ]
190
+ )
191
+
192
+ trans = transform.SimilarityTransform()
193
+ trans.estimate(src, des)
194
+
195
+ return cv2.warpAffine(image, trans.params[:2, :], dsize=(112, 112))
196
+
197
+ # =============================DETECT
198
+ def get_largest_bbox(bboxes: NDArray) -> NDArray:
199
+ # compute bbox area
200
+ hbbox, wbbox = (
201
+ bboxes[:, 3] - bboxes[:, 1],
202
+ bboxes[:, 2] - bboxes[:, 0],
203
+ )
204
+ area = hbbox*wbbox
205
+
206
+ return np.argmax(area)
207
+
208
+ def get_input_size(image_height: int, image_width: int, limit_side_len: int) -> List[int]:
209
+ '''
210
+ image_size: [ImageHeight, ImageWidth]
211
+ '''
212
+ if max(image_height, image_width) >= limit_side_len:
213
+ ratio = (
214
+ float(limit_side_len) / image_height
215
+ if image_height < image_width
216
+ else float(limit_side_len) / image_width
217
+ )
218
+ else:
219
+ ratio = 1.
220
+
221
+ input_height = int((ratio*image_height // 32) * 32)
222
+ input_width = int((ratio*image_width // 32) * 32)
223
+
224
+ return input_height, input_width
225
+
226
+ def prior_box(width: int, height: int, steps: List[int], min_sizes: List[List[int]]) -> NDArray:
227
+ anchors = []
228
+
229
+ feature_maps = [
230
+ [math.ceil(height / step), math.ceil(width / step)] for step in steps
231
+ ]
232
+ for k, f in enumerate(feature_maps):
233
+ for i, j in product(range(f[0]), range(f[1])):
234
+ for min_size in min_sizes[k]:
235
+ s_kx = min_size / width
236
+ s_ky = min_size / height
237
+ dense_cx = [x * steps[k] / width for x in [j + 0.5]]
238
+ dense_cy = [y * steps[k] / height for y in [i + 0.5]]
239
+ for cy, cx in product(dense_cy, dense_cx):
240
+ anchors += [cx, cy, s_kx, s_ky]
241
+
242
+ return np.reshape(anchors, (-1, 4))
243
+
244
+ def decode_boxes(bboxes: NDArray, priors: NDArray, variances: List[float], scale_factor: List[float]) -> NDArray:
245
+ bboxes = np.concatenate(
246
+ (
247
+ priors[:, :2] + bboxes[:, :2] * variances[0] * priors[:, 2:],
248
+ priors[:, 2:] * np.exp(bboxes[:, 2:] * variances[1]),
249
+ ),
250
+ axis=1,
251
+ )
252
+
253
+ bboxes[:, :2] -= bboxes[:, 2:] / 2
254
+ bboxes[:, 2:] += bboxes[:, :2]
255
+
256
+ return bboxes * np.array(scale_factor * 2)
257
+
258
+ def decode_landmarks(landmarks: NDArray, priors: NDArray, variances: List[float], scale_factor: List[float]) -> NDArray:
259
+ landmarks = np.concatenate(
260
+ (
261
+ priors[:, :2] + landmarks[:, :2] * variances[0] * priors[:, 2:],
262
+ priors[:, :2] + landmarks[:, 2:4] * variances[0] * priors[:, 2:],
263
+ priors[:, :2] + landmarks[:, 4:6] * variances[0] * priors[:, 2:],
264
+ priors[:, :2] + landmarks[:, 6:8] * variances[0] * priors[:, 2:],
265
+ priors[:, :2] + landmarks[:, 8:10] * variances[0] * priors[:, 2:],
266
+ ),
267
+ axis=1,
268
+ )
269
+
270
+ return landmarks * np.array(scale_factor * 5)
271
+
272
+ def intersection_over_union(bbox: NDArray, bboxes: NDArray, mode="Union") -> NDArray:
273
+ """
274
+ Caculate IoU between detect and ground truth boxes
275
+ :param crop_box:numpy array (4, )
276
+ :param bboxes:numpy array (n, 4):x1, y1, x2, y2
277
+ :return:
278
+ numpy array, shape (n, ) Iou
279
+ """
280
+ bbox_area = (bbox[2] - bbox[0] + 1) * (bbox[3] - bbox[1] + 1)
281
+ areas = (bboxes[:, 2] - bboxes[:, 0] + 1) * (bboxes[:, 3] - bboxes[:, 1] + 1)
282
+
283
+ xx1 = np.maximum(bbox[0], bboxes[:, 0])
284
+ yy1 = np.maximum(bbox[1], bboxes[:, 1])
285
+ xx2 = np.minimum(bbox[2], bboxes[:, 2])
286
+ yy2 = np.minimum(bbox[3], bboxes[:, 3])
287
+
288
+ # compute the width and height of the bounding box
289
+ w = np.maximum(0, xx2 - xx1 + 1)
290
+ h = np.maximum(0, yy2 - yy1 + 1)
291
+
292
+ inter = w * h
293
+ if mode == "Union":
294
+ over = inter / (bbox_area + areas - inter)
295
+
296
+ elif mode == "Minimum":
297
+ over = inter / np.minimum(bbox_area, areas)
298
+
299
+ return over
300
+
301
+ def non_max_suppression(bboxes: NDArray, scores: NDArray, thresh: float, keep_top_k:int=100, mode:str="Union") -> List[int]:
302
+ """
303
+ Bước 1: Tính diện tích của từng bbox
304
+ Bước 2: Sort score của từng bbox theo thứ tự giảm dần và lấy vị trí index của chúng
305
+ Bước 3: Theo thứ tự giảm dần của score, ta lấy bbox này giao với các bbox còn lại,
306
+ sau đó loại bỏ bớt các vị trí mà phần giao của 2 bbox lớn hơn THRESHOLD
307
+ """
308
+ # Sắp xếp độ tư tin giảm giần (lấy index)
309
+ order = scores.argsort()[::-1][:keep_top_k]
310
+
311
+ # Duyệt qua từng bbox với độ tự tin giảm dần để loại bỏ những bbox trùng nhau
312
+ keep = []
313
+ while order.size > 0:
314
+ i = order[0]
315
+ keep.append(i)
316
+
317
+ iou = intersection_over_union(bboxes[i], bboxes[order[1:]], mode=mode)
318
+
319
+ # keep (cập nhật lại order bằng những gì còn lại sau khi loại bỏ)
320
+ inds = np.where(iou <= thresh)[0] # [1,2,3,6,45,....]
321
+ order = order[inds + 1]
322
+
323
+ return keep