Spaces:
Runtime error
Runtime error
# Copyright (c) Open-CD. All rights reserved. | |
from numbers import Number | |
from typing import Any, Dict, List, Optional, Sequence, Union | |
import numpy as np | |
import torch | |
import torch.nn.functional as F | |
from mmengine.model import BaseDataPreprocessor | |
from mmseg.utils import SampleList | |
from opencd.registry import MODELS | |
def stack_batch(inputs: List[torch.Tensor], | |
data_samples: Optional[SampleList] = None, | |
size: Optional[tuple] = None, | |
size_divisor: Optional[int] = None, | |
pad_val: Union[int, float] = 0, | |
seg_pad_val: Union[int, float] = 255) -> torch.Tensor: | |
"""Stack multiple inputs to form a batch and pad the images and gt_sem_segs | |
to the max shape use the right bottom padding mode. | |
Args: | |
inputs (List[Tensor]): The input multiple tensors. each is a | |
CHW 3D-tensor. | |
data_samples (list[:obj:`SegDataSample`]): The list of data samples. | |
It usually includes information such as `gt_sem_seg`. | |
size (tuple, optional): Fixed padding size. | |
size_divisor (int, optional): The divisor of padded size. | |
pad_val (int, float): The padding value. Defaults to 0 | |
seg_pad_val (int, float): The padding value. Defaults to 255 | |
Returns: | |
Tensor: The 4D-tensor. | |
List[:obj:`SegDataSample`]: After the padding of the gt_seg_map. | |
""" | |
assert isinstance(inputs, list), \ | |
f'Expected input type to be list, but got {type(inputs)}' | |
assert len({tensor.ndim for tensor in inputs}) == 1, \ | |
f'Expected the dimensions of all inputs must be the same, ' \ | |
f'but got {[tensor.ndim for tensor in inputs]}' | |
assert inputs[0].ndim == 3, f'Expected tensor dimension to be 3, ' \ | |
f'but got {inputs[0].ndim}' | |
assert len({tensor.shape[0] for tensor in inputs}) == 1, \ | |
f'Expected the channels of all inputs must be the same, ' \ | |
f'but got {[tensor.shape[0] for tensor in inputs]}' | |
# only one of size and size_divisor should be valid | |
assert (size is not None) ^ (size_divisor is not None), \ | |
'only one of size and size_divisor should be valid' | |
padded_inputs = [] | |
padded_samples = [] | |
inputs_sizes = [(img.shape[-2], img.shape[-1]) for img in inputs] | |
max_size = np.stack(inputs_sizes).max(0) | |
if size_divisor is not None and size_divisor > 1: | |
# the last two dims are H,W, both subject to divisibility requirement | |
max_size = (max_size + | |
(size_divisor - 1)) // size_divisor * size_divisor | |
for i in range(len(inputs)): | |
tensor = inputs[i] | |
if size is not None: | |
width = max(size[-1] - tensor.shape[-1], 0) | |
height = max(size[-2] - tensor.shape[-2], 0) | |
# (padding_left, padding_right, padding_top, padding_bottom) | |
padding_size = (0, width, 0, height) | |
elif size_divisor is not None: | |
width = max(max_size[-1] - tensor.shape[-1], 0) | |
height = max(max_size[-2] - tensor.shape[-2], 0) | |
padding_size = (0, width, 0, height) | |
else: | |
padding_size = [0, 0, 0, 0] | |
# pad img | |
pad_img = F.pad(tensor, padding_size, value=pad_val) | |
padded_inputs.append(pad_img) | |
# pad gt_sem_seg | |
if data_samples is not None: | |
data_sample = data_samples[i] | |
gt_sem_seg = data_sample.gt_sem_seg.data | |
del data_sample.gt_sem_seg.data | |
data_sample.gt_sem_seg.data = F.pad( | |
gt_sem_seg, padding_size, value=seg_pad_val) | |
if 'gt_edge_map' in data_sample: | |
gt_edge_map = data_sample.gt_edge_map.data | |
del data_sample.gt_edge_map.data | |
data_sample.gt_edge_map.data = F.pad( | |
gt_edge_map, padding_size, value=seg_pad_val) | |
if 'gt_seg_map_from' in data_sample: | |
gt_seg_map_from = data_sample.gt_seg_map_from.data | |
del data_sample.gt_seg_map_from.data | |
data_sample.gt_seg_map_from.data = F.pad( | |
gt_seg_map_from, padding_size, value=seg_pad_val) | |
if 'gt_seg_map_to' in data_sample: | |
gt_seg_map_to = data_sample.gt_seg_map_to.data | |
del data_sample.gt_seg_map_to.data | |
data_sample.gt_seg_map_to.data = F.pad( | |
gt_seg_map_to, padding_size, value=seg_pad_val) | |
data_sample.set_metainfo({ | |
'img_shape': tensor.shape[-2:], | |
'pad_shape': data_sample.gt_sem_seg.shape, | |
'padding_size': padding_size | |
}) | |
padded_samples.append(data_sample) | |
else: | |
padded_samples.append( | |
dict( | |
img_padding_size=padding_size, | |
pad_shape=pad_img.shape[-2:])) | |
return torch.stack(padded_inputs, dim=0), padded_samples | |
class DualInputSegDataPreProcessor(BaseDataPreprocessor): | |
"""Image pre-processor for change detection tasks. | |
Comparing with the :class:`mmengine.ImgDataPreprocessor`, | |
1. It won't do normalization if ``mean`` is not specified. | |
2. It does normalization and color space conversion after stacking batch. | |
3. It supports batch augmentations like mixup and cutmix. | |
It provides the data pre-processing as follows | |
- Collate and move data to the target device. | |
- Pad inputs to the input size with defined ``pad_val``, and pad seg map | |
with defined ``seg_pad_val``. | |
- Stack inputs to batch_inputs. | |
- Convert inputs from bgr to rgb if the shape of input is (3, H, W). | |
- Normalize image with defined std and mean. | |
- Do batch augmentations like Mixup and Cutmix during training. | |
Args: | |
mean (Sequence[Number], optional): The pixel mean of R, G, B channels. | |
Defaults to None. | |
std (Sequence[Number], optional): The pixel standard deviation of | |
R, G, B channels. Defaults to None. | |
size (tuple, optional): Fixed padding size. | |
size_divisor (int, optional): The divisor of padded size. | |
pad_val (float, optional): Padding value. Default: 0. | |
seg_pad_val (float, optional): Padding value of segmentation map. | |
Default: 255. | |
padding_mode (str): Type of padding. Default: constant. | |
- constant: pads with a constant value, this value is specified | |
with pad_val. | |
bgr_to_rgb (bool): whether to convert image from BGR to RGB. | |
Defaults to False. | |
rgb_to_bgr (bool): whether to convert image from RGB to RGB. | |
Defaults to False. | |
batch_augments (list[dict], optional): Batch-level augmentations | |
test_cfg (dict, optional): The padding size config in testing, if not | |
specify, will use `size` and `size_divisor` params as default. | |
Defaults to None, only supports keys `size` or `size_divisor`. | |
""" | |
def __init__( | |
self, | |
mean: Sequence[Number] = None, | |
std: Sequence[Number] = None, | |
size: Optional[tuple] = None, | |
size_divisor: Optional[int] = None, | |
pad_val: Number = 0, | |
seg_pad_val: Number = 255, | |
bgr_to_rgb: bool = False, | |
rgb_to_bgr: bool = False, | |
batch_augments: Optional[List[dict]] = None, | |
test_cfg: dict = None, | |
): | |
super().__init__() | |
self.size = size | |
self.size_divisor = size_divisor | |
self.pad_val = pad_val | |
self.seg_pad_val = seg_pad_val | |
assert not (bgr_to_rgb and rgb_to_bgr), ( | |
'`bgr2rgb` and `rgb2bgr` cannot be set to True at the same time') | |
self.channel_conversion = rgb_to_bgr or bgr_to_rgb | |
if mean is not None: | |
assert std is not None, 'To enable the normalization in ' \ | |
'preprocessing, please specify both ' \ | |
'`mean` and `std`.' | |
# Enable the normalization in preprocessing. | |
self._enable_normalize = True | |
self.register_buffer('mean', | |
torch.tensor(mean).view(-1, 1, 1), False) | |
self.register_buffer('std', | |
torch.tensor(std).view(-1, 1, 1), False) | |
else: | |
self._enable_normalize = False | |
# TODO: support batch augmentations. | |
self.batch_augments = batch_augments | |
# Support different padding methods in testing | |
self.test_cfg = test_cfg | |
def forward(self, data: dict, training: bool = False) -> Dict[str, Any]: | |
"""Perform normalization、padding and bgr2rgb conversion based on | |
``BaseDataPreprocessor``. | |
Args: | |
data (dict): data sampled from dataloader. | |
training (bool): Whether to enable training time augmentation. | |
Returns: | |
Dict: Data in the same format as the model input. | |
""" | |
data = self.cast_data(data) # type: ignore | |
inputs = data['inputs'] | |
data_samples = data.get('data_samples', None) | |
# TODO: whether normalize should be after stack_batch | |
if self.channel_conversion and inputs[0].size(0) == 6: | |
inputs = [_input[[2, 1, 0, 5, 4, 3], ...] for _input in inputs] | |
inputs = [_input.float() for _input in inputs] | |
if self._enable_normalize: | |
inputs = [(_input - self.mean) / self.std for _input in inputs] | |
if training: | |
assert data_samples is not None, ('During training, ', | |
'`data_samples` must be define.') | |
inputs, data_samples = stack_batch( | |
inputs=inputs, | |
data_samples=data_samples, | |
size=self.size, | |
size_divisor=self.size_divisor, | |
pad_val=self.pad_val, | |
seg_pad_val=self.seg_pad_val) | |
if self.batch_augments is not None: | |
inputs, data_samples = self.batch_augments( | |
inputs, data_samples) | |
else: | |
assert len(inputs) == 1, ( | |
'Batch inference is not support currently, ' | |
'as the image size might be different in a batch') | |
# pad images when testing | |
if self.test_cfg: | |
inputs, padded_samples = stack_batch( | |
inputs=inputs, | |
size=self.test_cfg.get('size', None), | |
size_divisor=self.test_cfg.get('size_divisor', None), | |
pad_val=self.pad_val, | |
seg_pad_val=self.seg_pad_val) | |
for data_sample, pad_info in zip(data_samples, padded_samples): | |
data_sample.set_metainfo({**pad_info}) | |
else: | |
inputs = torch.stack(inputs, dim=0) | |
return dict(inputs=inputs, data_samples=data_samples) | |