Artyom
IIRLab
6721043 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init as init
from torch.nn.modules.batchnorm import _BatchNorm
import matplotlib.pyplot as plt
def default_conv(in_channels, out_channels, kernel_size,stride=1, bias=True):
return nn.Conv2d(
in_channels, out_channels, kernel_size,
padding=(kernel_size//2),stride=stride, bias=bias)
def conv1x1(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=1,
stride=stride, padding=0, bias=True)
def conv3x3(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=True)
def conv5x5(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=5,
stride=stride, padding=2, bias=True)
def make_layer(basic_block, num_basic_block, **kwarg):
"""Make layers by stacking the same blocks.
Args:
basic_block (nn.module): nn.module class for basic block.
num_basic_block (int): number of blocks.
Returns:
nn.Sequential: Stacked blocks in nn.Sequential.
"""
layers = []
for _ in range(num_basic_block):
layers.append(basic_block(**kwarg))
return nn.Sequential(*layers) #30个 (0): ResidualBlockNoBN(
class RBNoBN(nn.Module):
"""Residual block without BN.
Args:
num_feat (int): Channel number of intermediate features.
Default: 64.
res_scale (float): Residual scale. Default: 1.
pytorch_init (bool): If set to True, use pytorch default init,
otherwise, use default_init_weights. Default: False.
"""
def __init__(self, num_feat=64, res_scale=1, pytorch_init=False):
super(RBNoBN, self).__init__()
self.res_scale = res_scale
self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)
self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)
self.relu = nn.ReLU(inplace=True)
if not pytorch_init:
default_init_weights([self.conv1, self.conv2], 0.1)
def forward(self, x):
identity = x
out = self.conv2(self.relu(self.conv1(x)))
return identity + out * self.res_scale
class ResBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1):
super(ResBlock, self).__init__()
self.res_scale = res_scale
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.relu = nn.LeakyReLU(0.2, inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
def forward(self, x):
x1 = x
out = self.conv1(x)
out = self.relu(out)
out = self.conv2(out)
out = out * self.res_scale + x1
return out
# class ConvResidualBlocks(nn.Module):
# """Conv and residual block used in BasicVSR.
# Args:
# num_in_ch (int): Number of input channels. Default: 3.
# num_out_ch (int): Number of output channels. Default: 64.
# num_block (int): Number of residual blocks. Default: 15.
# """
# def __init__(self, num_in_ch=3, num_out_ch=64, num_block=15):
# super().__init__()
# self.main = nn.Sequential(
# nn.Conv2d(num_in_ch, num_out_ch, 3, 1, 1, bias=True), nn.LeakyReLU(negative_slope=0.2, inplace=True),
# make_layer(RBNoBN, num_block, num_feat=num_out_ch))
# def forward(self, fea):
# return self.main(fea)
class Encoder_input(nn.Module):
def __init__(self, num_res_blocks, n_feats, img_channel, res_scale=1):
super(Encoder_input, self).__init__()
self.num_res_blocks = num_res_blocks
self.conv_head = conv3x3(img_channel, n_feats)
self.RBs = nn.ModuleList()
for i in range(self.num_res_blocks):
self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats,
res_scale=res_scale))
self.conv_tail = conv3x3(n_feats, n_feats)
self.relu = nn.LeakyReLU(0.2, inplace=True)
def forward(self, x):
x = self.relu(self.conv_head(x))
x1 = x
for i in range(self.num_res_blocks):
x = self.RBs[i](x)
x = self.conv_tail(x)
x = x + x1
return x
class ResList(nn.Module):
def __init__(self, num_res_blocks, n_feats, res_scale=1):
super(ResList, self).__init__()
self.num_res_blocks = num_res_blocks
self.RBs = nn.ModuleList()
for i in range(self.num_res_blocks):
self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats))
self.conv_tail = conv3x3(n_feats, n_feats)
def forward(self, x):
x1 = x
for i in range(self.num_res_blocks):
x = self.RBs[i](x)
x = self.conv_tail(x)
x = x + x1
return x
class Res_Attention_List(nn.Module):
def __init__(self, num_res_blocks, n_feats, res_scale=1):
super(Res_Attention_List, self).__init__()
self.num_res_blocks = num_res_blocks
self.RBs = nn.ModuleList()
for i in range(self.num_res_blocks):
self.RBs.append(Res_Attention(in_channels=n_feats, out_channels=n_feats))
self.conv_tail = conv3x3(n_feats, n_feats)
def forward(self, x):
x1 = x
for i in range(self.num_res_blocks):
x = self.RBs[i](x)
x = self.conv_tail(x)
x = x + x1
return x
class PixelShufflePack(nn.Module):
""" Pixel Shuffle upsample layer.
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
scale_factor (int): Upsample ratio.
upsample_kernel (int): Kernel size of Conv layer to expand channels.
Returns:
Upsampled feature map.
"""
def __init__(self, in_channels, out_channels, scale_factor,
upsample_kernel):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.scale_factor = scale_factor
self.upsample_kernel = upsample_kernel
self.upsample_conv = nn.Conv2d(
self.in_channels,
self.out_channels * scale_factor * scale_factor,
self.upsample_kernel,
padding=(self.upsample_kernel - 1) // 2)
self.init_weights()
def init_weights(self):
"""Initialize weights for PixelShufflePack.
"""
default_init_weights(self, 1)
def forward(self, x):
"""Forward function for PixelShufflePack.
Args:
x (Tensor): Input tensor with shape (n, c, h, w).
Returns:
Tensor: Forward results.
"""
x = self.upsample_conv(x)
x = F.pixel_shuffle(x, self.scale_factor)
return x
class BasicBlock(nn.Sequential):
def __init__(
self, conv, in_channels, out_channels, kernel_size, stride=1, bias=True,
bn=False,In=False,act=nn.PReLU()):
m = [conv(in_channels, out_channels, kernel_size, stride=stride, bias=bias)]
if bn:
m.append(nn.BatchNorm2d(out_channels))
if In:
m.append(nn.InstanceNorm2d(out_channels))
if act is not None:
m.append(act)
super(BasicBlock, self).__init__(*m)
class MeanShift(nn.Conv2d):
def __init__(self, rgb_range, rgb_mean, rgb_std, sign=-1):
super(MeanShift, self).__init__(3, 3, kernel_size=1)
std = torch.Tensor(rgb_std)
self.weight.data = torch.eye(3).view(3, 3, 1, 1)
self.weight.data.div_(std.view(3, 1, 1, 1))
self.bias.data = sign * rgb_range * torch.Tensor(rgb_mean)
self.bias.data.div_(std)
self.weight.requires_grad = False
self.bias.requires_grad = False
def flow_warp(x, flow, interp_mode='bilinear', padding_mode='zeros', align_corners=True):
"""Warp an image or feature map with optical flow.
Args:
x (Tensor): Tensor with size (n, c, h, w).
flow (Tensor): Tensor with size (n, h, w, 2), normal value.
interp_mode (str): 'nearest' or 'bilinear'. Default: 'bilinear'.
padding_mode (str): 'zeros' or 'border' or 'reflection'.
Default: 'zeros'.
align_corners (bool): Before pytorch 1.3, the default value is
align_corners=True. After pytorch 1.3, the default value is
align_corners=False. Here, we use the True as default.
Returns:
Tensor: Warped image or feature map.
"""
assert x.size()[-2:] == flow.size()[1:3]
_, _, h, w = x.size()
# create mesh grid
grid_y, grid_x = torch.meshgrid(torch.arange(0, h).type_as(x), torch.arange(0, w).type_as(x))
grid = torch.stack((grid_x, grid_y), 2).float() # W(x), H(y), 2
grid.requires_grad = False
vgrid = grid + flow
# scale grid to [-1,1]
vgrid_x = 2.0 * vgrid[:, :, :, 0] / max(w - 1, 1) - 1.0
vgrid_y = 2.0 * vgrid[:, :, :, 1] / max(h - 1, 1) - 1.0
vgrid_scaled = torch.stack((vgrid_x, vgrid_y), dim=3)
output = F.grid_sample(x, vgrid_scaled, mode=interp_mode, padding_mode=padding_mode, align_corners=align_corners)
# TODO, what if align_corners=False
return output
@torch.no_grad()
def default_init_weights(module_list, scale=1, bias_fill=0, **kwargs):
"""Initialize network weights.
Args:
module_list (list[nn.Module] | nn.Module): Modules to be initialized.
scale (float): Scale initialized weights, especially for residual
blocks. Default: 1.
bias_fill (float): The value to fill bias. Default: 0
kwargs (dict): Other arguments for initialization function.
"""
if not isinstance(module_list, list):
module_list = [module_list]
for module in module_list:
for m in module.modules():
if isinstance(m, nn.Conv2d):
init.kaiming_normal_(m.weight, **kwargs)
m.weight.data *= scale
if m.bias is not None:
m.bias.data.fill_(bias_fill)
elif isinstance(m, nn.Linear):
init.kaiming_normal_(m.weight, **kwargs)
m.weight.data *= scale
if m.bias is not None:
m.bias.data.fill_(bias_fill)
elif isinstance(m, _BatchNorm):
init.constant_(m.weight, 1)
if m.bias is not None:
m.bias.data.fill_(bias_fill)
class ChannelPool(nn.Module):
def forward(self, x): #是一个元祖 第一个是最大值 第二个是坐标 所以要[0]
return torch.cat((torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1 )
## Channel Attention (CA) Layer
class CALayer(nn.Module):
def __init__(self, channel, reduction=16):
super(CALayer, self).__init__()
# global average pooling: feature --> point
self.avg_pool = nn.AdaptiveAvgPool2d(1)
# feature channel downscale and upscale --> channel weight
self.conv_du = nn.Sequential(
nn.Conv2d(channel, channel // reduction, 1, padding=0, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(channel // reduction, channel, 1, padding=0, bias=True),
nn.Sigmoid()
)
def forward(self, x):
y = self.avg_pool(x)
y = self.conv_du(y)
return x * y
class SpatialGate(nn.Module):
def __init__(self):
super(SpatialGate, self).__init__()
kernel_size = 7
self.compress = ChannelPool()
# self.spatial = BasicConv(2, 1, kernel_size, stride=1, padding=(kernel_size-1) // 2, relu=False)
self.spatial = nn.Conv2d(2, 1, 7, 1, 3)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x_compress = self.compress(x) #torch.Size([4, 2, 64, 64])
x_out = F.relu(self.spatial(x_compress))
# import pdb
# pdb.set_trace()
scale = self.sigmoid(x_out)# broadcasting
return x * scale
class Res_Attention_Conf(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False):
super(Res_Attention_Conf, self).__init__()
conv=default_conv
self.res_scale = res_scale
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.relu = nn.LeakyReLU(0.2, inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.channel_attention = CALayer(out_channels, reduction=16)
self.spatial_attention = SpatialGate()
# self.conv3 = conv3x3(out_channels, out_channels)
self.CA = CA
self.SA = SA
def forward(self, x):
x1 = x
out = self.relu(self.conv1(x))
if self.SA:
out = self.spatial_attention(out)
out = out
if self.CA:
out = self.channel_attention(out)
out = self.relu(self.conv2(out))
# out = self.conv3(out)
out = out * self.res_scale + x1
return out
class Res_CA_Block(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, res_scale=1, CA=False):
super(Res_CA_Block, self).__init__()
# conv=default_conv
self.res_scale = res_scale
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.relu = nn.LeakyReLU(0.2, inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.channel_attention = CALayer(out_channels, reduction=16)
# self.conv3 = conv3x3(out_channels, out_channels)
self.CA = CA
def forward(self, x):
x1 = x
out = self.relu(self.conv1(x))
if self.CA:
out = self.channel_attention(out)
out = self.relu(self.conv2(out))
# out = self.conv3(out)
out = out * self.res_scale + x1
return out
class Res_Attention_List(nn.Module):
def __init__(self, num_res_blocks, n_feats, res_scale=1):
super(Res_Attention_List, self).__init__()
self.num_res_blocks = num_res_blocks
self.RBs = nn.ModuleList()
for i in range(self.num_res_blocks):
self.RBs.append(Res_CA_Block(in_channels=n_feats, out_channels=n_feats))
self.conv_tail = conv3x3(n_feats, n_feats)
def forward(self, x):
x1 = x
for i in range(self.num_res_blocks):
x = self.RBs[i](x)
x = self.conv_tail(x)
x = x + x1
return x
class Res_Attention(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False):
super(Res_Attention, self).__init__()
self.res_scale = res_scale
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.relu = nn.LeakyReLU(0.2, inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.channel_attention = CALayer(out_channels, reduction=16)
self.spatial_attention = SpatialGate()
# self.conv3 = conv3x3(out_channels, out_channels)
self.CA = CA
self.SA = SA
def forward(self, x):
x1 = x
out = self.relu(self.conv1(x))
if self.SA:
out = self.spatial_attention(out)
if self.CA:
out = self.channel_attention(out)
out = self.relu(self.conv2(out))
# out = self.conv3(out)
out = out * self.res_scale + x1
return out
def record(fea, path):
fea = fea[0][0]
mean = fea.mean()
std = fea.std()
fea_norm = (fea- mean)/std
# fea = (fea.cpu().numpy()*255).round().astype(np.uint8)
fea_norm = fea_norm.detach().cpu().numpy()
# cv2.imwrite(path, fea_norm)
plt.imsave(path, fea_norm, cmap = 'gray')
pass
def record2(fea, path):
fea = fea[0][0]
mean = fea.mean()
std = fea.std()
fea_norm = (fea- mean)/std
# fea = (fea.cpu().numpy()*255).round().astype(np.uint8)
fea_norm = fea_norm.detach().cpu().numpy()
# cv2.imwrite(path, fea_norm)
plt.imsave(path, fea_norm, cmap = 'gray')
pass