import torch import torch.nn as nn import torch.nn.functional as F from torch.nn import init as init from torch.nn.modules.batchnorm import _BatchNorm import matplotlib.pyplot as plt def default_conv(in_channels, out_channels, kernel_size,stride=1, bias=True): return nn.Conv2d( in_channels, out_channels, kernel_size, padding=(kernel_size//2),stride=stride, bias=bias) def conv1x1(in_channels, out_channels, stride=1): return nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=True) def conv3x3(in_channels, out_channels, stride=1): return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=True) def conv5x5(in_channels, out_channels, stride=1): return nn.Conv2d(in_channels, out_channels, kernel_size=5, stride=stride, padding=2, bias=True) def make_layer(basic_block, num_basic_block, **kwarg): """Make layers by stacking the same blocks. Args: basic_block (nn.module): nn.module class for basic block. num_basic_block (int): number of blocks. Returns: nn.Sequential: Stacked blocks in nn.Sequential. """ layers = [] for _ in range(num_basic_block): layers.append(basic_block(**kwarg)) return nn.Sequential(*layers) #30个 (0): ResidualBlockNoBN( class RBNoBN(nn.Module): """Residual block without BN. Args: num_feat (int): Channel number of intermediate features. Default: 64. res_scale (float): Residual scale. Default: 1. pytorch_init (bool): If set to True, use pytorch default init, otherwise, use default_init_weights. Default: False. """ def __init__(self, num_feat=64, res_scale=1, pytorch_init=False): super(RBNoBN, self).__init__() self.res_scale = res_scale self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True) self.relu = nn.ReLU(inplace=True) if not pytorch_init: default_init_weights([self.conv1, self.conv2], 0.1) def forward(self, x): identity = x out = self.conv2(self.relu(self.conv1(x))) return identity + out * self.res_scale class ResBlock(nn.Module): def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1): super(ResBlock, self).__init__() self.res_scale = res_scale self.conv1 = conv3x3(in_channels, out_channels, stride) self.relu = nn.LeakyReLU(0.2, inplace=True) self.conv2 = conv3x3(out_channels, out_channels) def forward(self, x): x1 = x out = self.conv1(x) out = self.relu(out) out = self.conv2(out) out = out * self.res_scale + x1 return out # class ConvResidualBlocks(nn.Module): # """Conv and residual block used in BasicVSR. # Args: # num_in_ch (int): Number of input channels. Default: 3. # num_out_ch (int): Number of output channels. Default: 64. # num_block (int): Number of residual blocks. Default: 15. # """ # def __init__(self, num_in_ch=3, num_out_ch=64, num_block=15): # super().__init__() # self.main = nn.Sequential( # nn.Conv2d(num_in_ch, num_out_ch, 3, 1, 1, bias=True), nn.LeakyReLU(negative_slope=0.2, inplace=True), # make_layer(RBNoBN, num_block, num_feat=num_out_ch)) # def forward(self, fea): # return self.main(fea) class Encoder_input(nn.Module): def __init__(self, num_res_blocks, n_feats, img_channel, res_scale=1): super(Encoder_input, self).__init__() self.num_res_blocks = num_res_blocks self.conv_head = conv3x3(img_channel, n_feats) self.RBs = nn.ModuleList() for i in range(self.num_res_blocks): self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats, res_scale=res_scale)) self.conv_tail = conv3x3(n_feats, n_feats) self.relu = nn.LeakyReLU(0.2, inplace=True) def forward(self, x): x = self.relu(self.conv_head(x)) x1 = x for i in range(self.num_res_blocks): x = self.RBs[i](x) x = self.conv_tail(x) x = x + x1 return x class ResList(nn.Module): def __init__(self, num_res_blocks, n_feats, res_scale=1): super(ResList, self).__init__() self.num_res_blocks = num_res_blocks self.RBs = nn.ModuleList() for i in range(self.num_res_blocks): self.RBs.append(ResBlock(in_channels=n_feats, out_channels=n_feats)) self.conv_tail = conv3x3(n_feats, n_feats) def forward(self, x): x1 = x for i in range(self.num_res_blocks): x = self.RBs[i](x) x = self.conv_tail(x) x = x + x1 return x class Res_Attention_List(nn.Module): def __init__(self, num_res_blocks, n_feats, res_scale=1): super(Res_Attention_List, self).__init__() self.num_res_blocks = num_res_blocks self.RBs = nn.ModuleList() for i in range(self.num_res_blocks): self.RBs.append(Res_Attention(in_channels=n_feats, out_channels=n_feats)) self.conv_tail = conv3x3(n_feats, n_feats) def forward(self, x): x1 = x for i in range(self.num_res_blocks): x = self.RBs[i](x) x = self.conv_tail(x) x = x + x1 return x class PixelShufflePack(nn.Module): """ Pixel Shuffle upsample layer. Args: in_channels (int): Number of input channels. out_channels (int): Number of output channels. scale_factor (int): Upsample ratio. upsample_kernel (int): Kernel size of Conv layer to expand channels. Returns: Upsampled feature map. """ def __init__(self, in_channels, out_channels, scale_factor, upsample_kernel): super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.scale_factor = scale_factor self.upsample_kernel = upsample_kernel self.upsample_conv = nn.Conv2d( self.in_channels, self.out_channels * scale_factor * scale_factor, self.upsample_kernel, padding=(self.upsample_kernel - 1) // 2) self.init_weights() def init_weights(self): """Initialize weights for PixelShufflePack. """ default_init_weights(self, 1) def forward(self, x): """Forward function for PixelShufflePack. Args: x (Tensor): Input tensor with shape (n, c, h, w). Returns: Tensor: Forward results. """ x = self.upsample_conv(x) x = F.pixel_shuffle(x, self.scale_factor) return x class BasicBlock(nn.Sequential): def __init__( self, conv, in_channels, out_channels, kernel_size, stride=1, bias=True, bn=False,In=False,act=nn.PReLU()): m = [conv(in_channels, out_channels, kernel_size, stride=stride, bias=bias)] if bn: m.append(nn.BatchNorm2d(out_channels)) if In: m.append(nn.InstanceNorm2d(out_channels)) if act is not None: m.append(act) super(BasicBlock, self).__init__(*m) class MeanShift(nn.Conv2d): def __init__(self, rgb_range, rgb_mean, rgb_std, sign=-1): super(MeanShift, self).__init__(3, 3, kernel_size=1) std = torch.Tensor(rgb_std) self.weight.data = torch.eye(3).view(3, 3, 1, 1) self.weight.data.div_(std.view(3, 1, 1, 1)) self.bias.data = sign * rgb_range * torch.Tensor(rgb_mean) self.bias.data.div_(std) self.weight.requires_grad = False self.bias.requires_grad = False def flow_warp(x, flow, interp_mode='bilinear', padding_mode='zeros', align_corners=True): """Warp an image or feature map with optical flow. Args: x (Tensor): Tensor with size (n, c, h, w). flow (Tensor): Tensor with size (n, h, w, 2), normal value. interp_mode (str): 'nearest' or 'bilinear'. Default: 'bilinear'. padding_mode (str): 'zeros' or 'border' or 'reflection'. Default: 'zeros'. align_corners (bool): Before pytorch 1.3, the default value is align_corners=True. After pytorch 1.3, the default value is align_corners=False. Here, we use the True as default. Returns: Tensor: Warped image or feature map. """ assert x.size()[-2:] == flow.size()[1:3] _, _, h, w = x.size() # create mesh grid grid_y, grid_x = torch.meshgrid(torch.arange(0, h).type_as(x), torch.arange(0, w).type_as(x)) grid = torch.stack((grid_x, grid_y), 2).float() # W(x), H(y), 2 grid.requires_grad = False vgrid = grid + flow # scale grid to [-1,1] vgrid_x = 2.0 * vgrid[:, :, :, 0] / max(w - 1, 1) - 1.0 vgrid_y = 2.0 * vgrid[:, :, :, 1] / max(h - 1, 1) - 1.0 vgrid_scaled = torch.stack((vgrid_x, vgrid_y), dim=3) output = F.grid_sample(x, vgrid_scaled, mode=interp_mode, padding_mode=padding_mode, align_corners=align_corners) # TODO, what if align_corners=False return output @torch.no_grad() def default_init_weights(module_list, scale=1, bias_fill=0, **kwargs): """Initialize network weights. Args: module_list (list[nn.Module] | nn.Module): Modules to be initialized. scale (float): Scale initialized weights, especially for residual blocks. Default: 1. bias_fill (float): The value to fill bias. Default: 0 kwargs (dict): Other arguments for initialization function. """ if not isinstance(module_list, list): module_list = [module_list] for module in module_list: for m in module.modules(): if isinstance(m, nn.Conv2d): init.kaiming_normal_(m.weight, **kwargs) m.weight.data *= scale if m.bias is not None: m.bias.data.fill_(bias_fill) elif isinstance(m, nn.Linear): init.kaiming_normal_(m.weight, **kwargs) m.weight.data *= scale if m.bias is not None: m.bias.data.fill_(bias_fill) elif isinstance(m, _BatchNorm): init.constant_(m.weight, 1) if m.bias is not None: m.bias.data.fill_(bias_fill) class ChannelPool(nn.Module): def forward(self, x): #是一个元祖 第一个是最大值 第二个是坐标 所以要[0] return torch.cat((torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1 ) ## Channel Attention (CA) Layer class CALayer(nn.Module): def __init__(self, channel, reduction=16): super(CALayer, self).__init__() # global average pooling: feature --> point self.avg_pool = nn.AdaptiveAvgPool2d(1) # feature channel downscale and upscale --> channel weight self.conv_du = nn.Sequential( nn.Conv2d(channel, channel // reduction, 1, padding=0, bias=True), nn.ReLU(inplace=True), nn.Conv2d(channel // reduction, channel, 1, padding=0, bias=True), nn.Sigmoid() ) def forward(self, x): y = self.avg_pool(x) y = self.conv_du(y) return x * y class SpatialGate(nn.Module): def __init__(self): super(SpatialGate, self).__init__() kernel_size = 7 self.compress = ChannelPool() # self.spatial = BasicConv(2, 1, kernel_size, stride=1, padding=(kernel_size-1) // 2, relu=False) self.spatial = nn.Conv2d(2, 1, 7, 1, 3) self.sigmoid = nn.Sigmoid() def forward(self, x): x_compress = self.compress(x) #torch.Size([4, 2, 64, 64]) x_out = F.relu(self.spatial(x_compress)) # import pdb # pdb.set_trace() scale = self.sigmoid(x_out)# broadcasting return x * scale class Res_Attention_Conf(nn.Module): def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False): super(Res_Attention_Conf, self).__init__() conv=default_conv self.res_scale = res_scale self.conv1 = conv3x3(in_channels, out_channels, stride) self.relu = nn.LeakyReLU(0.2, inplace=True) self.conv2 = conv3x3(out_channels, out_channels) self.channel_attention = CALayer(out_channels, reduction=16) self.spatial_attention = SpatialGate() # self.conv3 = conv3x3(out_channels, out_channels) self.CA = CA self.SA = SA def forward(self, x): x1 = x out = self.relu(self.conv1(x)) if self.SA: out = self.spatial_attention(out) out = out if self.CA: out = self.channel_attention(out) out = self.relu(self.conv2(out)) # out = self.conv3(out) out = out * self.res_scale + x1 return out class Res_CA_Block(nn.Module): def __init__(self, in_channels, out_channels, stride=1, res_scale=1, CA=False): super(Res_CA_Block, self).__init__() # conv=default_conv self.res_scale = res_scale self.conv1 = conv3x3(in_channels, out_channels, stride) self.relu = nn.LeakyReLU(0.2, inplace=True) self.conv2 = conv3x3(out_channels, out_channels) self.channel_attention = CALayer(out_channels, reduction=16) # self.conv3 = conv3x3(out_channels, out_channels) self.CA = CA def forward(self, x): x1 = x out = self.relu(self.conv1(x)) if self.CA: out = self.channel_attention(out) out = self.relu(self.conv2(out)) # out = self.conv3(out) out = out * self.res_scale + x1 return out class Res_Attention_List(nn.Module): def __init__(self, num_res_blocks, n_feats, res_scale=1): super(Res_Attention_List, self).__init__() self.num_res_blocks = num_res_blocks self.RBs = nn.ModuleList() for i in range(self.num_res_blocks): self.RBs.append(Res_CA_Block(in_channels=n_feats, out_channels=n_feats)) self.conv_tail = conv3x3(n_feats, n_feats) def forward(self, x): x1 = x for i in range(self.num_res_blocks): x = self.RBs[i](x) x = self.conv_tail(x) x = x + x1 return x class Res_Attention(nn.Module): def __init__(self, in_channels, out_channels, stride=1, downsample=None, res_scale=1, SA=False, CA=False): super(Res_Attention, self).__init__() self.res_scale = res_scale self.conv1 = conv3x3(in_channels, out_channels, stride) self.relu = nn.LeakyReLU(0.2, inplace=True) self.conv2 = conv3x3(out_channels, out_channels) self.channel_attention = CALayer(out_channels, reduction=16) self.spatial_attention = SpatialGate() # self.conv3 = conv3x3(out_channels, out_channels) self.CA = CA self.SA = SA def forward(self, x): x1 = x out = self.relu(self.conv1(x)) if self.SA: out = self.spatial_attention(out) if self.CA: out = self.channel_attention(out) out = self.relu(self.conv2(out)) # out = self.conv3(out) out = out * self.res_scale + x1 return out def record(fea, path): fea = fea[0][0] mean = fea.mean() std = fea.std() fea_norm = (fea- mean)/std # fea = (fea.cpu().numpy()*255).round().astype(np.uint8) fea_norm = fea_norm.detach().cpu().numpy() # cv2.imwrite(path, fea_norm) plt.imsave(path, fea_norm, cmap = 'gray') pass def record2(fea, path): fea = fea[0][0] mean = fea.mean() std = fea.std() fea_norm = (fea- mean)/std # fea = (fea.cpu().numpy()*255).round().astype(np.uint8) fea_norm = fea_norm.detach().cpu().numpy() # cv2.imwrite(path, fea_norm) plt.imsave(path, fea_norm, cmap = 'gray') pass