|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from torch.nn import init as init |
|
from torch.nn.modules.batchnorm import _BatchNorm |
|
import matplotlib.pyplot as plt |
|
|
|
|
|
def default_conv(in_channels, out_channels, kernel_size,stride=1, bias=True): |
|
return nn.Conv2d( |
|
in_channels, out_channels, kernel_size, |
|
padding=(kernel_size//2),stride=stride, bias=bias) |
|
|
|
def conv1x1(in_channels, out_channels, stride=1): |
|
return nn.Conv2d(in_channels, out_channels, kernel_size=1, |
|
stride=stride, padding=0, bias=True) |
|
|
|
def conv3x3(in_channels, out_channels, stride=1): |
|
return nn.Conv2d(in_channels, out_channels, kernel_size=3, |
|
stride=stride, padding=1, bias=True) |
|
|
|
def conv5x5(in_channels, out_channels, stride=1): |
|
return nn.Conv2d(in_channels, out_channels, kernel_size=5, |
|
stride=stride, padding=2, bias=True) |
|
|
|
def make_layer(basic_block, num_basic_block, **kwarg): |
|
"""Make layers by stacking the same blocks. |
|
|
|
Args: |
|
basic_block (nn.module): nn.module class for basic block. |
|
num_basic_block (int): number of blocks. |
|
|
|
Returns: |
|
nn.Sequential: Stacked blocks in nn.Sequential. |
|
""" |
|
layers = [] |
|
for _ in range(num_basic_block): |
|
layers.append(basic_block(**kwarg)) |
|
return nn.Sequential(*layers) |
|
|
|
class RBNoBN(nn.Module): |
|
"""Residual block without BN. |
|
|
|
Args: |
|
num_feat (int): Channel number of intermediate features. |
|
Default: 64. |
|
res_scale (float): Residual scale. Default: 1. |
|
pytorch_init (bool): If set to True, use pytorch default init, |
|
otherwise, use default_init_weights. Default: False. |
|
""" |
|
|
|
def __init__(self, num_feat=64, res_scale=1, pytorch_init=False): |
|
super(RBNoBN, self).__init__() |
|
self.res_scale = res_scale |
|
self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) |
|
self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) |
|
self.relu = nn.ReLU(inplace=True) |
|
|
|
if not pytorch_init: |
|
default_init_weights([self.conv1, self.conv2], 0.1) |
|
|
|
def forward(self, x): |
|
identity = x |
|
out = self.conv2(self.relu(self.conv1(x))) |
|
return identity + out * self.res_scale |
|
|
|
class ResBlock(nn.Module): |
|
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1): |
|
super(ResBlock, self).__init__() |
|
self.res_scale = res_scale |
|
self.conv1 = conv3x3(in_channels, out_channels, stride) |
|
self.relu = nn.LeakyReLU(0.2, inplace=True) |
|
self.conv2 = conv3x3(out_channels, out_channels) |
|
|
|
def forward(self, x): |
|
x1 = x |
|
out = self.conv1(x) |
|
out = self.relu(out) |
|
out = self.conv2(out) |
|
out = out * self.res_scale + x1 |
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Encoder_input(nn.Module): |
|
def __init__(self, num_res_blocks, n_feats, img_channel, res_scale=1): |
|
super(Encoder_input, self).__init__() |
|
self.num_res_blocks = num_res_blocks |
|
self.conv_head = conv3x3(img_channel, n_feats) |
|
|
|
self.RBs = nn.ModuleList() |
|
for i in range(self.num_res_blocks): |
|
self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats, |
|
res_scale=res_scale)) |
|
|
|
self.conv_tail = conv3x3(n_feats, n_feats) |
|
self.relu = nn.LeakyReLU(0.2, inplace=True) |
|
|
|
def forward(self, x): |
|
x = self.relu(self.conv_head(x)) |
|
x1 = x |
|
for i in range(self.num_res_blocks): |
|
x = self.RBs[i](x) |
|
x = self.conv_tail(x) |
|
x = x + x1 |
|
return x |
|
|
|
|
|
|
|
|
|
class ResList(nn.Module): |
|
def __init__(self, num_res_blocks, n_feats, res_scale=1): |
|
super(ResList, self).__init__() |
|
self.num_res_blocks = num_res_blocks |
|
|
|
self.RBs = nn.ModuleList() |
|
for i in range(self.num_res_blocks): |
|
self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats)) |
|
|
|
self.conv_tail = conv3x3(n_feats, n_feats) |
|
|
|
def forward(self, x): |
|
x1 = x |
|
for i in range(self.num_res_blocks): |
|
x = self.RBs[i](x) |
|
x = self.conv_tail(x) |
|
x = x + x1 |
|
return x |
|
|
|
|
|
class Res_Attention_List(nn.Module): |
|
def __init__(self, num_res_blocks, n_feats, res_scale=1): |
|
super(Res_Attention_List, self).__init__() |
|
self.num_res_blocks = num_res_blocks |
|
|
|
self.RBs = nn.ModuleList() |
|
for i in range(self.num_res_blocks): |
|
self.RBs.append(Res_Attention(in_channels=n_feats, out_channels=n_feats)) |
|
|
|
self.conv_tail = conv3x3(n_feats, n_feats) |
|
|
|
def forward(self, x): |
|
x1 = x |
|
for i in range(self.num_res_blocks): |
|
x = self.RBs[i](x) |
|
x = self.conv_tail(x) |
|
x = x + x1 |
|
return x |
|
|
|
|
|
class PixelShufflePack(nn.Module): |
|
""" Pixel Shuffle upsample layer. |
|
|
|
Args: |
|
in_channels (int): Number of input channels. |
|
out_channels (int): Number of output channels. |
|
scale_factor (int): Upsample ratio. |
|
upsample_kernel (int): Kernel size of Conv layer to expand channels. |
|
|
|
Returns: |
|
Upsampled feature map. |
|
""" |
|
|
|
def __init__(self, in_channels, out_channels, scale_factor, |
|
upsample_kernel): |
|
super().__init__() |
|
self.in_channels = in_channels |
|
self.out_channels = out_channels |
|
self.scale_factor = scale_factor |
|
self.upsample_kernel = upsample_kernel |
|
self.upsample_conv = nn.Conv2d( |
|
self.in_channels, |
|
self.out_channels * scale_factor * scale_factor, |
|
self.upsample_kernel, |
|
padding=(self.upsample_kernel - 1) // 2) |
|
self.init_weights() |
|
|
|
def init_weights(self): |
|
"""Initialize weights for PixelShufflePack. |
|
""" |
|
default_init_weights(self, 1) |
|
|
|
def forward(self, x): |
|
"""Forward function for PixelShufflePack. |
|
|
|
Args: |
|
x (Tensor): Input tensor with shape (n, c, h, w). |
|
|
|
Returns: |
|
Tensor: Forward results. |
|
""" |
|
x = self.upsample_conv(x) |
|
x = F.pixel_shuffle(x, self.scale_factor) |
|
return x |
|
|
|
class BasicBlock(nn.Sequential): |
|
def __init__( |
|
self, conv, in_channels, out_channels, kernel_size, stride=1, bias=True, |
|
bn=False,In=False,act=nn.PReLU()): |
|
|
|
m = [conv(in_channels, out_channels, kernel_size, stride=stride, bias=bias)] |
|
if bn: |
|
m.append(nn.BatchNorm2d(out_channels)) |
|
if In: |
|
m.append(nn.InstanceNorm2d(out_channels)) |
|
if act is not None: |
|
m.append(act) |
|
|
|
super(BasicBlock, self).__init__(*m) |
|
|
|
class MeanShift(nn.Conv2d): |
|
def __init__(self, rgb_range, rgb_mean, rgb_std, sign=-1): |
|
super(MeanShift, self).__init__(3, 3, kernel_size=1) |
|
std = torch.Tensor(rgb_std) |
|
self.weight.data = torch.eye(3).view(3, 3, 1, 1) |
|
self.weight.data.div_(std.view(3, 1, 1, 1)) |
|
self.bias.data = sign * rgb_range * torch.Tensor(rgb_mean) |
|
self.bias.data.div_(std) |
|
|
|
self.weight.requires_grad = False |
|
self.bias.requires_grad = False |
|
|
|
def flow_warp(x, flow, interp_mode='bilinear', padding_mode='zeros', align_corners=True): |
|
"""Warp an image or feature map with optical flow. |
|
|
|
Args: |
|
x (Tensor): Tensor with size (n, c, h, w). |
|
flow (Tensor): Tensor with size (n, h, w, 2), normal value. |
|
interp_mode (str): 'nearest' or 'bilinear'. Default: 'bilinear'. |
|
padding_mode (str): 'zeros' or 'border' or 'reflection'. |
|
Default: 'zeros'. |
|
align_corners (bool): Before pytorch 1.3, the default value is |
|
align_corners=True. After pytorch 1.3, the default value is |
|
align_corners=False. Here, we use the True as default. |
|
|
|
Returns: |
|
Tensor: Warped image or feature map. |
|
""" |
|
assert x.size()[-2:] == flow.size()[1:3] |
|
_, _, h, w = x.size() |
|
|
|
grid_y, grid_x = torch.meshgrid(torch.arange(0, h).type_as(x), torch.arange(0, w).type_as(x)) |
|
grid = torch.stack((grid_x, grid_y), 2).float() |
|
grid.requires_grad = False |
|
|
|
vgrid = grid + flow |
|
|
|
vgrid_x = 2.0 * vgrid[:, :, :, 0] / max(w - 1, 1) - 1.0 |
|
vgrid_y = 2.0 * vgrid[:, :, :, 1] / max(h - 1, 1) - 1.0 |
|
vgrid_scaled = torch.stack((vgrid_x, vgrid_y), dim=3) |
|
output = F.grid_sample(x, vgrid_scaled, mode=interp_mode, padding_mode=padding_mode, align_corners=align_corners) |
|
|
|
|
|
return output |
|
|
|
@torch.no_grad() |
|
def default_init_weights(module_list, scale=1, bias_fill=0, **kwargs): |
|
"""Initialize network weights. |
|
|
|
Args: |
|
module_list (list[nn.Module] | nn.Module): Modules to be initialized. |
|
scale (float): Scale initialized weights, especially for residual |
|
blocks. Default: 1. |
|
bias_fill (float): The value to fill bias. Default: 0 |
|
kwargs (dict): Other arguments for initialization function. |
|
""" |
|
if not isinstance(module_list, list): |
|
module_list = [module_list] |
|
for module in module_list: |
|
for m in module.modules(): |
|
if isinstance(m, nn.Conv2d): |
|
init.kaiming_normal_(m.weight, **kwargs) |
|
m.weight.data *= scale |
|
if m.bias is not None: |
|
m.bias.data.fill_(bias_fill) |
|
elif isinstance(m, nn.Linear): |
|
init.kaiming_normal_(m.weight, **kwargs) |
|
m.weight.data *= scale |
|
if m.bias is not None: |
|
m.bias.data.fill_(bias_fill) |
|
elif isinstance(m, _BatchNorm): |
|
init.constant_(m.weight, 1) |
|
if m.bias is not None: |
|
m.bias.data.fill_(bias_fill) |
|
|
|
|
|
|
|
class ChannelPool(nn.Module): |
|
def forward(self, x): |
|
return torch.cat((torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1 ) |
|
|
|
|
|
|
|
class CALayer(nn.Module): |
|
def __init__(self, channel, reduction=16): |
|
super(CALayer, self).__init__() |
|
|
|
self.avg_pool = nn.AdaptiveAvgPool2d(1) |
|
|
|
self.conv_du = nn.Sequential( |
|
nn.Conv2d(channel, channel // reduction, 1, padding=0, bias=True), |
|
nn.ReLU(inplace=True), |
|
nn.Conv2d(channel // reduction, channel, 1, padding=0, bias=True), |
|
nn.Sigmoid() |
|
) |
|
|
|
def forward(self, x): |
|
y = self.avg_pool(x) |
|
y = self.conv_du(y) |
|
return x * y |
|
|
|
|
|
class SpatialGate(nn.Module): |
|
def __init__(self): |
|
super(SpatialGate, self).__init__() |
|
kernel_size = 7 |
|
self.compress = ChannelPool() |
|
|
|
self.spatial = nn.Conv2d(2, 1, 7, 1, 3) |
|
self.sigmoid = nn.Sigmoid() |
|
|
|
def forward(self, x): |
|
x_compress = self.compress(x) |
|
x_out = F.relu(self.spatial(x_compress)) |
|
|
|
|
|
scale = self.sigmoid(x_out) |
|
return x * scale |
|
|
|
|
|
class Res_Attention_Conf(nn.Module): |
|
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False): |
|
super(Res_Attention_Conf, self).__init__() |
|
|
|
conv=default_conv |
|
|
|
|
|
self.res_scale = res_scale |
|
self.conv1 = conv3x3(in_channels, out_channels, stride) |
|
self.relu = nn.LeakyReLU(0.2, inplace=True) |
|
self.conv2 = conv3x3(out_channels, out_channels) |
|
self.channel_attention = CALayer(out_channels, reduction=16) |
|
self.spatial_attention = SpatialGate() |
|
|
|
self.CA = CA |
|
self.SA = SA |
|
|
|
def forward(self, x): |
|
|
|
|
|
x1 = x |
|
out = self.relu(self.conv1(x)) |
|
|
|
if self.SA: |
|
out = self.spatial_attention(out) |
|
out = out |
|
|
|
if self.CA: |
|
out = self.channel_attention(out) |
|
|
|
out = self.relu(self.conv2(out)) |
|
|
|
|
|
out = out * self.res_scale + x1 |
|
return out |
|
|
|
|
|
|
|
|
|
class Res_CA_Block(nn.Module): |
|
def __init__(self, in_channels, out_channels, stride=1, res_scale=1, CA=False): |
|
super(Res_CA_Block, self).__init__() |
|
|
|
|
|
self.res_scale = res_scale |
|
self.conv1 = conv3x3(in_channels, out_channels, stride) |
|
self.relu = nn.LeakyReLU(0.2, inplace=True) |
|
self.conv2 = conv3x3(out_channels, out_channels) |
|
self.channel_attention = CALayer(out_channels, reduction=16) |
|
|
|
|
|
self.CA = CA |
|
|
|
|
|
def forward(self, x): |
|
x1 = x |
|
out = self.relu(self.conv1(x)) |
|
if self.CA: |
|
out = self.channel_attention(out) |
|
|
|
out = self.relu(self.conv2(out)) |
|
|
|
|
|
out = out * self.res_scale + x1 |
|
return out |
|
|
|
|
|
class Res_Attention_List(nn.Module): |
|
def __init__(self, num_res_blocks, n_feats, res_scale=1): |
|
super(Res_Attention_List, self).__init__() |
|
self.num_res_blocks = num_res_blocks |
|
|
|
self.RBs = nn.ModuleList() |
|
for i in range(self.num_res_blocks): |
|
self.RBs.append(Res_CA_Block(in_channels=n_feats, out_channels=n_feats)) |
|
|
|
self.conv_tail = conv3x3(n_feats, n_feats) |
|
|
|
def forward(self, x): |
|
x1 = x |
|
for i in range(self.num_res_blocks): |
|
x = self.RBs[i](x) |
|
x = self.conv_tail(x) |
|
x = x + x1 |
|
return x |
|
|
|
|
|
|
|
class Res_Attention(nn.Module): |
|
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False): |
|
super(Res_Attention, self).__init__() |
|
self.res_scale = res_scale |
|
self.conv1 = conv3x3(in_channels, out_channels, stride) |
|
self.relu = nn.LeakyReLU(0.2, inplace=True) |
|
self.conv2 = conv3x3(out_channels, out_channels) |
|
self.channel_attention = CALayer(out_channels, reduction=16) |
|
self.spatial_attention = SpatialGate() |
|
|
|
self.CA = CA |
|
self.SA = SA |
|
|
|
def forward(self, x): |
|
x1 = x |
|
out = self.relu(self.conv1(x)) |
|
|
|
if self.SA: |
|
out = self.spatial_attention(out) |
|
|
|
if self.CA: |
|
out = self.channel_attention(out) |
|
|
|
out = self.relu(self.conv2(out)) |
|
|
|
|
|
out = out * self.res_scale + x1 |
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def record(fea, path): |
|
fea = fea[0][0] |
|
mean = fea.mean() |
|
std = fea.std() |
|
|
|
fea_norm = (fea- mean)/std |
|
|
|
|
|
fea_norm = fea_norm.detach().cpu().numpy() |
|
|
|
|
|
plt.imsave(path, fea_norm, cmap = 'gray') |
|
pass |
|
|
|
|
|
|
|
def record2(fea, path): |
|
fea = fea[0][0] |
|
mean = fea.mean() |
|
std = fea.std() |
|
|
|
fea_norm = (fea- mean)/std |
|
|
|
|
|
fea_norm = fea_norm.detach().cpu().numpy() |
|
|
|
|
|
plt.imsave(path, fea_norm, cmap = 'gray') |
|
pass |