import os
import torch
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel
from .configuration_MyResnet import MyResnetConfig

# 设置CUDA异常阻塞,用于调试CUDA相关问题
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

"""
定义自己的模型
"""


# 定义残差块
class Residual(nn.Module):
    def __init__(self, input_channels, num_channels,
                 use_1x1conv=False, strides=1):
        super().__init__()
        # 第一个3x3卷积层
        self.conv1 = nn.Conv2d(input_channels, num_channels,
                               kernel_size=3, padding=1, stride=strides)
        # 第二个3x3卷积层
        self.conv2 = nn.Conv2d(num_channels, num_channels,
                               kernel_size=3, padding=1)
        # 可选的1x1卷积层,用于调整输入的通道数
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        # 批量归一化层
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        # 第一个卷积 -> 批量归一化 -> ReLU激活
        Y = F.relu(self.bn1(self.conv1(X)))
        # 第二个卷积 -> 批量归一化
        Y = self.bn2(self.conv2(Y))
        # 如果使用1x1卷积,调整输入的通道数
        if self.conv3:
            X = self.conv3(X)
        # 将输入与输出相加
        Y += X
        return F.relu(Y)  # 返回激活后的结果


# 组合多个残差块
def resnet_block(input_channels, num_channels, num_residuals,
                 first_block=False):
    """
    :param first_block: 是否为第一个块,用于确定是否需要1x1卷积
    :param input_channels: 输入通道数
    :param num_channels: 残差块的输出通道数
    :param num_residuals: 残差块的数量
    :return: 组合后的多个残差块
    """
    blk = []
    for i in range(num_residuals):
        # 第一个残差块需要降维
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels,
                                use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk


# 定义残差网络
def net(in_channels, num_channels, num_residuals, num_classes):
    """
    :param in_channels: 输入图像的通道数
    :param num_channels: 第一个卷积层的输出通道数
    :param num_residuals: 每个阶段的残差块数量
    :param num_classes: 分类的数量
    :return: 构建的残差网络模型
    """
    # 首先是一个7x7卷积层,接着是批量归一化、ReLU激活和3x3最大池化
    b1 = nn.Sequential(nn.Conv2d(in_channels, num_channels, kernel_size=7, stride=2, padding=3),
                       nn.BatchNorm2d(64), nn.ReLU(),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

    # 构建多个残差块
    b2 = nn.Sequential(*resnet_block(64, num_channels, num_residuals[0], first_block=True))
    b3 = nn.Sequential(*resnet_block(num_channels, num_channels * 2, num_residuals[1]))
    b4 = nn.Sequential(*resnet_block(num_channels * 2, num_channels * 4, num_residuals[2]))
    b5 = nn.Sequential(*resnet_block(num_channels * 4, num_channels * 8, num_residuals[3]))

    # 全局平均池化后,连接一个全连接层进行分类
    resnet = nn.Sequential(b1, b2, b3, b4, b5,
                           nn.AdaptiveAvgPool2d((1, 1)),
                           nn.Flatten(), nn.Linear(num_channels * 8, num_classes))
    return resnet


"""
把模型封装成huggingface的模型,
可以使用transformers库进行训练和推理
这里定义了两个模型类:一个用于从一批图像中提取隐藏特征(类似于 BertModel),
另一个适用于图像分类(类似于 BertForSequenceClassification)。
"""


class MyResnetModel(PreTrainedModel):
    config_class = MyResnetConfig  # 指定配置类

    def __init__(self, config):
        super().__init__(config)
        # 根据配置初始化模型
        self.model = net(
            in_channels=config.in_channels,
            num_channels=config.num_channels,
            num_residuals=config.num_residuals,
            num_classes=config.num_classes
        )

    def forward(self, tensor, labels=None):
        return self.model.forward_features(tensor)  # 返回特征


class MyResnetModelForImageClassification(PreTrainedModel):
    config_class = MyResnetConfig  # 指定配置类

    def __init__(self, config):
        super().__init__(config)
        # 根据配置初始化模型
        self.model = net(
            in_channels=config.in_channels,
            num_channels=config.num_channels,
            num_residuals=config.num_residuals,
            num_classes=config.num_classes
        )

    """
    你可以让模型返回任何你想要的内容,
    但是像这样返回一个字典,并在传递标签时包含loss,可以使你的模型能够在 Trainer 类中直接使用。
    只要你计划使用自己的训练循环或其他库进行训练,也可以使用其他输出格式。
    """

    def forward(self, X, y):
        # 前向传播,计算模型输出
        # print(y)
        y_hat = self.model(X)
        if y is not None:
            # 计算损失
            loss = torch.nn.functional.cross_entropy(y_hat, y)
            return {"loss": loss, "logits": y_hat}  # 返回损失和输出
        return {"logits": y_hat}

    def forward_features(self, X):
        # 返回特征
        for layer in self.model:
            X = layer(X)
            print(layer.__class__.__name__, 'output shape:\t', X.shape)