from typing import * from pathlib import Path import importlib import albumentations as A import cv2 from omegaconf import DictConfig, OmegaConf import numpy as np from .utils import read_image, standardize, normalize_cdf, normalize_min_max, get_boxes if importlib.util.find_spec("openvino") is not None: import openvino as ov else: raise ImportError("OpenVINO is not installed.") class OpenVINOInferencer(): """For perform inference with OpenVINO. Args: path (str | Path): Folder path to exported OpenVINO model. Must contains model.xml, model.bin, and metadata.json. device (str): Device to run inference on. Defaults to "AUTO". cache_dir (str | Path): Cache directory for OpenVINO """ def __init__(self, path: Union[str, Path], device: str="AUTO", cache_dir: Union[str, Path, None]=None ) -> None: if isinstance(path, str): path = Path(path) self.model = self._load_model(path, device, cache_dir) self.metadata = self._load_metadata(path) # Note: Transformation require Albumentations package self.transform = A.from_dict(self.metadata["transform"]) self.metadata["expand_offset"] = self._get_expand_offset(self.transform) # Record input & output blob (key) self.metadata["input_blob"] = self.model.input(0).get_names().pop() self.metadata["output_blob"] = self.model.output(0).get_names().pop() def _load_model(self, path: Path, device: str, cache_dir: Union[str, Path, None]) -> ov.CompiledModel: xml_path = path / "model.xml" bin_path = path / "model.bin" ov_core = ov.Core() model = ov_core.read_model(xml_path, bin_path) # Create cache directory if cache_dir is None: cache_dir = "cache" if isinstance(cache_dir, str): cache_dir = Path(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) ov_core.set_property({"CACHE_DIR": cache_dir}) model = ov_core.compile_model(model=model, device_name=device.upper()) return model def _load_metadata(self, path: Path) -> DictConfig: metadata = path / "metadata.json" metadata = OmegaConf.load(metadata) metadata = cast(DictConfig, metadata) return metadata def _get_expand_offset(self, transform): is_center_cropped = False for t in reversed(transform.transforms): if isinstance(t, A.CenterCrop): is_center_cropped = True cropped_h = t.height cropped_w = t.width elif isinstance(t, A.Resize) and is_center_cropped: return (t.height - cropped_h) // 2, (t.width - cropped_w) // 2 def predict(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]: if isinstance(image, (str, Path)): image = read_image(image) # Record input image size self.metadata["image_shape"] = image.shape[:2] inputs = self._pre_process(image, self.transform) predictions = self.model(inputs) outputs = self._post_processs(predictions, self.metadata) outputs.update({"image": image}) return outputs def __call__(self, image: Union[str, Path, np.ndarray]) -> Dict[str, np.ndarray]: return self.predict(image) def _pre_process(self, image: np.ndarray, transform=None) -> np.ndarray: if transform is not None: image = transform(image=image)["image"] if len(image.shape) == 3: # Add batch_size axis image = np.expand_dims(image, axis=0) if image.shape[3] == 3: # Transpose the color_channel axis # Expected shape: [b, c, h, w] image = image.transpose(0, 3, 1, 2) return image def _post_processs(self, predictions: np.ndarray, metadata: DictConfig) -> Dict[str, np.ndarray]: predictions = predictions[metadata["output_blob"]] anomaly_map: np.ndarray = None pred_label: float = None pred_mask: float = None if metadata["task"] == "classification": pred_score = predictions else: anomaly_map = predictions.squeeze() pred_score = anomaly_map.reshape(-1).max() if "image_threshold" in metadata: # Assign anomalous label to predictions with score >= threshold pred_label = pred_score >= metadata["image_threshold"] if metadata["task"] == "classification": _, pred_score = self._normalize(pred_scores=pred_score, metadata=metadata) else: if "pixel_threshold" in metadata: pred_mask = (anomaly_map >= metadata["pixel_threshold"]).astype(np.uint8) anomaly_map, pred_score = self._normalize( pred_scores=pred_score, metadata=metadata, anomaly_map=anomaly_map ) if "image_shape" in metadata and anomaly_map.shape != metadata["image_shape"]: if "expand_offset" in metadata and metadata["expand_offset"] is not None: anomaly_map = self._expand(anomaly_map, metadata["expand_offset"][0], metadata["expand_offset"][1]) pred_mask = self._expand(pred_mask, metadata["expand_offset"][0], metadata["expand_offset"][1]) h, w = metadata["image_shape"] # Fix: cv2.resize take (w, h) as argument anomaly_map = cv2.resize(anomaly_map, (w, h)) if pred_mask is not None: pred_mask = cv2.resize(pred_mask, (w, h)) if metadata["task"] == "detection": pred_boxes = get_boxes(pred_mask) box_labels = np.ones(pred_boxes.shape[0]) else: pred_boxes: np.ndarray | None = None box_labels: np.ndarray | None = None return { "anomaly_map": anomaly_map, "pred_label": pred_label, "pred_score": pred_score, "pred_mask": pred_mask, "pred_boxes": pred_boxes, "box_labels": box_labels } @staticmethod def _expand(map, offset_h, offset_w): h, w = map.shape if map is not None: expanded_map = np.zeros((h + offset_h * 2, w + offset_w * 2), dtype=map.dtype) expanded_map[offset_h:offset_h+h, offset_w:offset_w+w] = map return expanded_map @staticmethod def _normalize( pred_scores: np.float32, metadata: DictConfig, anomaly_map: np.ndarray | None = None ) -> Tuple[Union[np.ndarray, None], float]: # Min-max normalization if "min" in metadata and "max" in metadata: if anomaly_map is not None: anomaly_map = normalize_min_max( anomaly_map, metadata["pixel_threshold"], metadata["min"], metadata["max"] ) pred_scores = normalize_min_max( pred_scores, metadata["image_threshold"], metadata["min"], metadata["max"] ) # Standardize pixel scores if "pixel_mean" in metadata and "pixel_std" in metadata: if anomaly_map is not None: anomaly_map = standardize( anomaly_map, metadata["pixel_mean"], metadata["pixel_std"], center_at=metadata["image_mean"] ) anomaly_map = normalize_cdf( anomaly_map, metadata["pixel_threshold"] ) # Standardize image scores if "image_mean" in metadata and "image_std" in metadata: pred_scores = standardize( pred_scores, metadata["image_mean"], metadata["image_std"] ) pred_scores = normalize_cdf( pred_scores, metadata["image_threshold"] ) return anomaly_map, float(pred_scores)