|
|
|
import torch
|
|
import torch.distributed as dist
|
|
from fvcore.nn.distributed import differentiable_all_reduce
|
|
from torch import nn
|
|
from torch.nn import functional as F
|
|
|
|
from detectron2.utils import comm, env
|
|
|
|
from .wrappers import BatchNorm2d
|
|
|
|
|
|
class FrozenBatchNorm2d(nn.Module):
|
|
"""
|
|
BatchNorm2d where the batch statistics and the affine parameters are fixed.
|
|
|
|
It contains non-trainable buffers called
|
|
"weight" and "bias", "running_mean", "running_var",
|
|
initialized to perform identity transformation.
|
|
|
|
The pre-trained backbone models from Caffe2 only contain "weight" and "bias",
|
|
which are computed from the original four parameters of BN.
|
|
The affine transform `x * weight + bias` will perform the equivalent
|
|
computation of `(x - running_mean) / sqrt(running_var) * weight + bias`.
|
|
When loading a backbone model from Caffe2, "running_mean" and "running_var"
|
|
will be left unchanged as identity transformation.
|
|
|
|
Other pre-trained backbone models may contain all 4 parameters.
|
|
|
|
The forward is implemented by `F.batch_norm(..., training=False)`.
|
|
"""
|
|
|
|
_version = 3
|
|
|
|
def __init__(self, num_features, eps=1e-5):
|
|
super().__init__()
|
|
self.num_features = num_features
|
|
self.eps = eps
|
|
self.register_buffer("weight", torch.ones(num_features))
|
|
self.register_buffer("bias", torch.zeros(num_features))
|
|
self.register_buffer("running_mean", torch.zeros(num_features))
|
|
self.register_buffer("running_var", torch.ones(num_features) - eps)
|
|
self.register_buffer("num_batches_tracked", None)
|
|
|
|
def forward(self, x):
|
|
if x.requires_grad:
|
|
|
|
|
|
scale = self.weight * (self.running_var + self.eps).rsqrt()
|
|
bias = self.bias - self.running_mean * scale
|
|
scale = scale.reshape(1, -1, 1, 1)
|
|
bias = bias.reshape(1, -1, 1, 1)
|
|
out_dtype = x.dtype
|
|
return x * scale.to(out_dtype) + bias.to(out_dtype)
|
|
else:
|
|
|
|
|
|
return F.batch_norm(
|
|
x,
|
|
self.running_mean,
|
|
self.running_var,
|
|
self.weight,
|
|
self.bias,
|
|
training=False,
|
|
eps=self.eps,
|
|
)
|
|
|
|
def _load_from_state_dict(
|
|
self,
|
|
state_dict,
|
|
prefix,
|
|
local_metadata,
|
|
strict,
|
|
missing_keys,
|
|
unexpected_keys,
|
|
error_msgs,
|
|
):
|
|
version = local_metadata.get("version", None)
|
|
|
|
if version is None or version < 2:
|
|
|
|
|
|
if prefix + "running_mean" not in state_dict:
|
|
state_dict[prefix + "running_mean"] = torch.zeros_like(self.running_mean)
|
|
if prefix + "running_var" not in state_dict:
|
|
state_dict[prefix + "running_var"] = torch.ones_like(self.running_var)
|
|
|
|
super()._load_from_state_dict(
|
|
state_dict,
|
|
prefix,
|
|
local_metadata,
|
|
strict,
|
|
missing_keys,
|
|
unexpected_keys,
|
|
error_msgs,
|
|
)
|
|
|
|
def __repr__(self):
|
|
return "FrozenBatchNorm2d(num_features={}, eps={})".format(self.num_features, self.eps)
|
|
|
|
@classmethod
|
|
def convert_frozen_batchnorm(cls, module):
|
|
"""
|
|
Convert all BatchNorm/SyncBatchNorm in module into FrozenBatchNorm.
|
|
|
|
Args:
|
|
module (torch.nn.Module):
|
|
|
|
Returns:
|
|
If module is BatchNorm/SyncBatchNorm, returns a new module.
|
|
Otherwise, in-place convert module and return it.
|
|
|
|
Similar to convert_sync_batchnorm in
|
|
https://github.com/pytorch/pytorch/blob/master/torch/nn/modules/batchnorm.py
|
|
"""
|
|
bn_module = nn.modules.batchnorm
|
|
bn_module = (bn_module.BatchNorm2d, bn_module.SyncBatchNorm)
|
|
res = module
|
|
if isinstance(module, bn_module):
|
|
res = cls(module.num_features)
|
|
if module.affine:
|
|
res.weight.data = module.weight.data.clone().detach()
|
|
res.bias.data = module.bias.data.clone().detach()
|
|
res.running_mean.data = module.running_mean.data
|
|
res.running_var.data = module.running_var.data
|
|
res.eps = module.eps
|
|
res.num_batches_tracked = module.num_batches_tracked
|
|
else:
|
|
for name, child in module.named_children():
|
|
new_child = cls.convert_frozen_batchnorm(child)
|
|
if new_child is not child:
|
|
res.add_module(name, new_child)
|
|
return res
|
|
|
|
@classmethod
|
|
def convert_frozenbatchnorm2d_to_batchnorm2d(cls, module: nn.Module) -> nn.Module:
|
|
"""
|
|
Convert all FrozenBatchNorm2d to BatchNorm2d
|
|
|
|
Args:
|
|
module (torch.nn.Module):
|
|
|
|
Returns:
|
|
If module is FrozenBatchNorm2d, returns a new module.
|
|
Otherwise, in-place convert module and return it.
|
|
|
|
This is needed for quantization:
|
|
https://fb.workplace.com/groups/1043663463248667/permalink/1296330057982005/
|
|
"""
|
|
|
|
res = module
|
|
if isinstance(module, FrozenBatchNorm2d):
|
|
res = torch.nn.BatchNorm2d(module.num_features, module.eps)
|
|
|
|
res.weight.data = module.weight.data.clone().detach()
|
|
res.bias.data = module.bias.data.clone().detach()
|
|
res.running_mean.data = module.running_mean.data.clone().detach()
|
|
res.running_var.data = module.running_var.data.clone().detach()
|
|
res.eps = module.eps
|
|
res.num_batches_tracked = module.num_batches_tracked
|
|
else:
|
|
for name, child in module.named_children():
|
|
new_child = cls.convert_frozenbatchnorm2d_to_batchnorm2d(child)
|
|
if new_child is not child:
|
|
res.add_module(name, new_child)
|
|
return res
|
|
|
|
|
|
def get_norm(norm, out_channels):
|
|
"""
|
|
Args:
|
|
norm (str or callable): either one of BN, SyncBN, FrozenBN, GN;
|
|
or a callable that takes a channel number and returns
|
|
the normalization layer as a nn.Module.
|
|
|
|
Returns:
|
|
nn.Module or None: the normalization layer
|
|
"""
|
|
if norm is None:
|
|
return None
|
|
if isinstance(norm, str):
|
|
if len(norm) == 0:
|
|
return None
|
|
norm = {
|
|
"BN": BatchNorm2d,
|
|
|
|
"SyncBN": NaiveSyncBatchNorm if env.TORCH_VERSION <= (1, 5) else nn.SyncBatchNorm,
|
|
"FrozenBN": FrozenBatchNorm2d,
|
|
"GN": lambda channels: nn.GroupNorm(32, channels),
|
|
|
|
"nnSyncBN": nn.SyncBatchNorm,
|
|
"naiveSyncBN": NaiveSyncBatchNorm,
|
|
|
|
"naiveSyncBN_N": lambda channels: NaiveSyncBatchNorm(channels, stats_mode="N"),
|
|
"LN": lambda channels: LayerNorm(channels),
|
|
}[norm]
|
|
return norm(out_channels)
|
|
|
|
|
|
class NaiveSyncBatchNorm(BatchNorm2d):
|
|
"""
|
|
In PyTorch<=1.5, ``nn.SyncBatchNorm`` has incorrect gradient
|
|
when the batch size on each worker is different.
|
|
(e.g., when scale augmentation is used, or when it is applied to mask head).
|
|
|
|
This is a slower but correct alternative to `nn.SyncBatchNorm`.
|
|
|
|
Note:
|
|
There isn't a single definition of Sync BatchNorm.
|
|
|
|
When ``stats_mode==""``, this module computes overall statistics by using
|
|
statistics of each worker with equal weight. The result is true statistics
|
|
of all samples (as if they are all on one worker) only when all workers
|
|
have the same (N, H, W). This mode does not support inputs with zero batch size.
|
|
|
|
When ``stats_mode=="N"``, this module computes overall statistics by weighting
|
|
the statistics of each worker by their ``N``. The result is true statistics
|
|
of all samples (as if they are all on one worker) only when all workers
|
|
have the same (H, W). It is slower than ``stats_mode==""``.
|
|
|
|
Even though the result of this module may not be the true statistics of all samples,
|
|
it may still be reasonable because it might be preferrable to assign equal weights
|
|
to all workers, regardless of their (H, W) dimension, instead of putting larger weight
|
|
on larger images. From preliminary experiments, little difference is found between such
|
|
a simplified implementation and an accurate computation of overall mean & variance.
|
|
"""
|
|
|
|
def __init__(self, *args, stats_mode="", **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
assert stats_mode in ["", "N"]
|
|
self._stats_mode = stats_mode
|
|
|
|
def forward(self, input):
|
|
if comm.get_world_size() == 1 or not self.training:
|
|
return super().forward(input)
|
|
|
|
B, C = input.shape[0], input.shape[1]
|
|
|
|
half_input = input.dtype == torch.float16
|
|
if half_input:
|
|
|
|
input = input.float()
|
|
mean = torch.mean(input, dim=[0, 2, 3])
|
|
meansqr = torch.mean(input * input, dim=[0, 2, 3])
|
|
|
|
if self._stats_mode == "":
|
|
assert B > 0, 'SyncBatchNorm(stats_mode="") does not support zero batch size.'
|
|
vec = torch.cat([mean, meansqr], dim=0)
|
|
vec = differentiable_all_reduce(vec) * (1.0 / dist.get_world_size())
|
|
mean, meansqr = torch.split(vec, C)
|
|
momentum = self.momentum
|
|
else:
|
|
if B == 0:
|
|
vec = torch.zeros([2 * C + 1], device=mean.device, dtype=mean.dtype)
|
|
vec = vec + input.sum()
|
|
else:
|
|
vec = torch.cat(
|
|
[
|
|
mean,
|
|
meansqr,
|
|
torch.ones([1], device=mean.device, dtype=mean.dtype),
|
|
],
|
|
dim=0,
|
|
)
|
|
vec = differentiable_all_reduce(vec * B)
|
|
|
|
total_batch = vec[-1].detach()
|
|
momentum = total_batch.clamp(max=1) * self.momentum
|
|
mean, meansqr, _ = torch.split(vec / total_batch.clamp(min=1), C)
|
|
|
|
var = meansqr - mean * mean
|
|
invstd = torch.rsqrt(var + self.eps)
|
|
scale = self.weight * invstd
|
|
bias = self.bias - mean * scale
|
|
scale = scale.reshape(1, -1, 1, 1)
|
|
bias = bias.reshape(1, -1, 1, 1)
|
|
|
|
self.running_mean += momentum * (mean.detach() - self.running_mean)
|
|
self.running_var += momentum * (var.detach() - self.running_var)
|
|
ret = input * scale + bias
|
|
if half_input:
|
|
ret = ret.half()
|
|
return ret
|
|
|
|
|
|
class CycleBatchNormList(nn.ModuleList):
|
|
"""
|
|
Implement domain-specific BatchNorm by cycling.
|
|
|
|
When a BatchNorm layer is used for multiple input domains or input
|
|
features, it might need to maintain a separate test-time statistics
|
|
for each domain. See Sec 5.2 in :paper:`rethinking-batchnorm`.
|
|
|
|
This module implements it by using N separate BN layers
|
|
and it cycles through them every time a forward() is called.
|
|
|
|
NOTE: The caller of this module MUST guarantee to always call
|
|
this module by multiple of N times. Otherwise its test-time statistics
|
|
will be incorrect.
|
|
"""
|
|
|
|
def __init__(self, length: int, bn_class=nn.BatchNorm2d, **kwargs):
|
|
"""
|
|
Args:
|
|
length: number of BatchNorm layers to cycle.
|
|
bn_class: the BatchNorm class to use
|
|
kwargs: arguments of the BatchNorm class, such as num_features.
|
|
"""
|
|
self._affine = kwargs.pop("affine", True)
|
|
super().__init__([bn_class(**kwargs, affine=False) for k in range(length)])
|
|
if self._affine:
|
|
|
|
channels = self[0].num_features
|
|
self.weight = nn.Parameter(torch.ones(channels))
|
|
self.bias = nn.Parameter(torch.zeros(channels))
|
|
self._pos = 0
|
|
|
|
def forward(self, x):
|
|
ret = self[self._pos](x)
|
|
self._pos = (self._pos + 1) % len(self)
|
|
|
|
if self._affine:
|
|
w = self.weight.reshape(1, -1, 1, 1)
|
|
b = self.bias.reshape(1, -1, 1, 1)
|
|
return ret * w + b
|
|
else:
|
|
return ret
|
|
|
|
def extra_repr(self):
|
|
return f"affine={self._affine}"
|
|
|
|
|
|
class LayerNorm(nn.Module):
|
|
"""
|
|
A LayerNorm variant, popularized by Transformers, that performs point-wise mean and
|
|
variance normalization over the channel dimension for inputs that have shape
|
|
(batch_size, channels, height, width).
|
|
https://github.com/facebookresearch/ConvNeXt/blob/d1fa8f6fef0a165b27399986cc2bdacc92777e40/models/convnext.py#L119 # noqa B950
|
|
"""
|
|
|
|
def __init__(self, normalized_shape, eps=1e-6):
|
|
super().__init__()
|
|
self.weight = nn.Parameter(torch.ones(normalized_shape))
|
|
self.bias = nn.Parameter(torch.zeros(normalized_shape))
|
|
self.eps = eps
|
|
self.normalized_shape = (normalized_shape,)
|
|
|
|
def forward(self, x):
|
|
u = x.mean(1, keepdim=True)
|
|
s = (x - u).pow(2).mean(1, keepdim=True)
|
|
x = (x - u) / torch.sqrt(s + self.eps)
|
|
x = self.weight[:, None, None] * x + self.bias[:, None, None]
|
|
return x
|
|
|