model-convert / app.py
tumuyan2's picture
Update app.py
1d066cb
import gradio as gr
import os
import requests
import shutil
import uuid
from ftplib import FTP
from spandrel import ImageModelDescriptor, ModelLoader
import torch
import subprocess
# 定义 downloaded_files 变量
downloaded_files = {}
# 新增日志开关
log_to_terminal = True
# 新增全局任务计数器
task_counter = 0
# 新增日志函数
def print_log(task_id, filename, stage, status):
if log_to_terminal:
print(f"任务{task_id}: {filename}, [{status}] {stage}")
# 修改 start_process 函数,处理新增输入
def start_process(input1, input2, shape0_str, shape1_str, input_suffix=".pth"):
global task_counter
task_counter += 1
task_id = task_counter
print_log(task_id, input2, input1, "input1")
log = "转换过程非常慢,请耐心等待。显示文件列表不代表转换完成。如果未发生错误,转换结束会显示”任务完成“\n"
yield [], log
if input2 == None or input2.strip() == "":
split_input = os.path.splitext(os.path.basename(input1))
if len(split_input) > 1:
suffix = split_input[1].split('?')[0].lower()
if suffix not in [".pth" , ".safetensors" , ".ckpt"]:
print_log(task_id, input2, "不支持此文件的格式 suffix="+suffix, "错误")
log += f"不支持此文件的格式\n"
return [] , log
input2 = split_input[0]
print_log(task_id, input2, "检查文件名", "开始")
log += f"检查文件名…\n"
yield [], log
if input2 == None or input2.strip() == "":
input2 = str(task_id)
log += f"未提供文件名,使用{input2}\n"
print_log(task_id, input2, f"未提供文件名,使用{input2}", "修正")
yield [], log
try:
# 判断 input1 是地址还是文件,增加对 ftp 和 webdav 协议的支持
supported_protocols = ('http://', 'https://', 'ftp://', 'webdav://')
if isinstance(input1, str) and input1.startswith(supported_protocols):
url = input1
if url in downloaded_files and os.path.exists(downloaded_files[url]):
file_path = downloaded_files[url]
print_log(task_id, input2, "检查下载状态", "跳过下载")
log += f"跳过下载,文件已存在: {file_path}\n"
yield [], log
else:
print_log(task_id, input2, "下载文件", "开始")
log += f"开始下载文件…\n"
yield [], log
# 生成唯一文件名
file_name = str(task_id) + input_suffix
file_path = os.path.join(os.getcwd(), file_name)
if url.startswith('ftp://'):
try:
# 解析 ftp 地址
parts = url.replace('ftp://', '').split('/')
host = parts[0]
remote_file_path = '/'.join(parts[1:])
ftp = FTP(host)
ftp.login()
with open(file_path, 'wb') as f:
ftp.retrbinary('RETR ' + remote_file_path, f.write)
ftp.quit()
downloaded_files[url] = file_path
print_log(task_id, input2, "下载文件", "成功")
log += f"文件下载成功: {file_path}\n"
yield [], log
except Exception as e:
print_log(task_id, input2, "下载文件", f"失败 (FTP): {str(e)}")
log += f"FTP 文件下载失败: {str(e)}\n"
yield [], log
return
else:
if url.startswith(('http://', 'https://')):
response = requests.get(url)
if response.status_code == 200:
with open(file_path, 'wb') as f:
f.write(response.content)
downloaded_files[url] = file_path
print_log(task_id, input2, "下载文件", "成功")
log += f"文件下载成功: {file_path}\n"
yield [], log
else:
print_log(task_id, input2, f"下载文件(HTTP): {response.status_code}", "失败")
log += f"文件下载失败,状态码: {response.status_code}\n"
yield [], log
return
elif input1 is not None:
print("check file" , input1, os.path.exists(input1))
file_path = input1
log += f"使用上传的文件: {file_path}\n"
print_log(task_id, input2, "使用上传文件", "开始")
yield [], log
else:
log += "未提供有效文件或地址\n"
print_log(task_id, input2, "检查文件输入", "失败 (无有效输入)")
yield [], log
return
# 检查文件大小
try:
file_size = os.path.getsize(file_path) / 1024 /1024 # 转换为 KB
if file_size > 100 :
log += f"文件太大,建议 100MB 以内,当前文件大小为 {file_size } MB。\n"
print_log(task_id, input2, "文件太大("+ file_size +"MB)", "失败")
yield [], log
return
except Exception as e:
log += f"获取文件大小失败: {str(e)}\n"
print_log(task_id, input2, "检查文件大小", f"失败: {str(e)}")
yield [], log
return
# 生成新文件夹用于暂存结果
output_folder = os.path.join(os.getcwd(), str(uuid.uuid4()))
os.makedirs(output_folder, exist_ok=True)
print_log(task_id, input2, "创建临时文件夹", "完成")
log += f"创建临时文件夹: {output_folder}\n生成张量\n"
yield [], log
# 解析输入的字符串为数组
try:
# 尝试解析 shape0_str
shape0 = [int(x) for x in shape0_str.split(',')] if shape0_str else [0, 0, 0, 0]
# 检查 shape0 是否为 4 个元素,如果不是则设置为全 0
if len(shape0) != 4:
shape0 = [0, 0, 0, 0]
# 尝试解析 shape1_str
shape1 = [int(x) for x in shape1_str.split(',')] if shape1_str else [0, 0, 0, 0]
# 检查 shape1 是否为 4 个元素,如果不是则设置为全 0
if len(shape1) != 4:
shape1 = [0, 0, 0, 0]
except ValueError:
# 如果解析过程中出现 ValueError,将 shape0 和 shape1 设置为全 0
shape0 = [0, 0, 0, 0]
shape1 = [0, 0, 0, 0]
log += "输入的 shape 字符串格式不正确,请使用逗号分隔的整数。\n"
yield [], log
return
# 以下是 process_file 函数的代码
# 使用 torch.rand 生成 input_shape
print_log(task_id, input2, "生成输入张量", "开始")
log += "生成张量…\n"
yield [], log
pt_path = output_folder + "/" + input2 + ".pt"
# onnx_path = output_folder + "/" + input2 + ".onnx"
input_tensor0 = torch.rand(shape0) if any(shape0) else None
input_tensor1 = torch.rand(shape1) if any(shape1) else None
if input_tensor0 is not None and input_tensor1 is not None:
example_input = (input_tensor0, input_tensor1)
# 修改此处,去除 shape 字符串中的空格
command = f"pnnx {pt_path} inputshape={str(shape0).replace(' ', '')} inputshape2={str(shape1).replace(' ', '')}"
elif input_tensor0 is not None:
example_input = input_tensor0
command = f"pnnx {pt_path} inputshape={str(shape0).replace(' ', '')}"
else:
example_input = input_tensor1
command = f"pnnx {pt_path}"
print_log(task_id, input2, "生成输入张量", "完成")
# 确保 output_folder 存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print_log(task_id, input2, "加载模型", "开始")
log += "加载模型…\n"
yield [], log
# load a model from disk
model = ModelLoader().load_from_file(file_path)
# make sure it's an image to image model
assert isinstance(model, ImageModelDescriptor)
print_log(task_id, input2, "获得模型对象", "开始")
log += "获得模型对象…\n"
yield [], log
# send it to the GPU and put it in inference mode
# model.cuda().eval()
model.eval()
torch_model = model.model
print_log(task_id, input2, "获得模型对象", "完成")
yield [], log
width_ratio = 4
if os.path.exists(pt_path):
print_log(task_id, input2, "转换为TorchScript模型", "跳过")
log += "跳过转换为TorchScript模型\n"
yield [], log
else:
print_log(task_id, input2, "转换为TorchScript模型", "开始")
log+= "转换为TorchScript模型…\n"
yield [], log
# 使用 torch.jit.trace 进行模型转换
traced_torch_model = torch.jit.trace(torch_model, example_input)
traced_torch_model.save(output_folder + "/" + input2 + ".pt")
print_log(task_id, input2, "转换为TorchScript模型", "完成")
# 获取输出
example_output = traced_torch_model(example_input)
width_ratio = example_output.shape[2] / example_input.shape[2]
print_log(task_id, input2, "获得缩放倍率="+ str(width_ratio)+", 输出shape="+str(list(example_output.shape)), "完成")
log+= ("获得缩放倍率="+str(width_ratio)+", 输出shape="+str(list(example_output.shape))+"\n")
yield [], log
print_log(task_id, input2, "执行命令" + command, "开始")
log += "执行命令…\n"
yield [], log
try:
# 使用 subprocess.Popen 执行命令
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
#
if log_to_terminal:
print(output.strip())
log += output.strip() + '\n'
yield [], log
returncode = process.poll()
if returncode != 0:
log += f"执行命令失败,返回码: {returncode},命令: {command} \n"
else:
log += f"执行命令成功: {command} \n"
except Exception as e:
log += f"执行命令: {command} 失败,错误信息: {str(e)}\n"
# 查找 output_folder 目录下以 .ncnn.bin 和 .ncnn.param 结尾的文件
bin_files = [f for f in os.listdir(output_folder) if f.endswith('.ncnn.bin')]
param_files = [f for f in os.listdir(output_folder) if f.endswith('.ncnn.param')]
if bin_files and param_files:
param_file = os.path.join(output_folder, param_files[0])
bin_file = os.path.join(output_folder, bin_files[0])
import zipfile
# 压缩包名称
zip_file_name = os.path.join(output_folder, f"models-{input2}.zip")
# 压缩包内文件夹名称
zip_folder_name = f"models-{input2}"
# 重命名后的文件名
scale = int(width_ratio)
new_bin_name = f"x{scale}.bin"
new_param_name = f"x{scale}.param"
# 创建压缩包
with zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 写入重命名后的.bin文件
zipf.write(bin_file, os.path.join(zip_folder_name, new_bin_name))
# 写入重命名后的.param文件
zipf.write(param_file, os.path.join(zip_folder_name, new_param_name))
log += f"已创建压缩包: {zip_file_name}\n"
print_log(task_id, input2, "创建压缩包"+zip_file_name, "完成")
yield [], log
else:
log += f"未找到 ncnn 文件\n"
print_log(task_id, input2, "查找 ncnn 文件", "失败")
yield [], log
output_files = [os.path.join(output_folder, f) for f in os.listdir(output_folder) if os.path.isfile(os.path.join(output_folder, f))]
log += f"任务完成\n"
print_log(task_id, input2, "执行命令", "完成")
yield output_files, log
except Exception as e:
log += f"发生错误: {e}\n"
print_log(task_id, input2, e , f"失败")
yield [], log
# 创建 Gradio 界面
with gr.Blocks() as demo:
gr.Markdown("文件处理界面")
with gr.Row():
# 左侧列,包含输入组件和按钮
with gr.Column():
# 添加文本提示
gr.Markdown("请输入的url,或者上传一个文件。限制文件为小于100M的*.pth模型")
with gr.Row():
input1 = gr.Textbox(label="粘贴地址")
# 新增文件上传组件
input1_file = gr.File(label="上传文件", file_types=[".pth", ".safetensors", ".ckpt"])
input2 = gr.Textbox(label="自定义文件名")
# 修改为字符串输入控件
shape0_str = gr.Textbox(label="shape0 (逗号分隔的整数)", value="1,3,128,128")
shape1_str = gr.Textbox(label="shape1 (逗号分隔的整数)", value="0,0,0,0")
with gr.Row():
start_button = gr.Button("开始")
# 添加取消按钮
cancel_button = gr.Button("取消")
# 右侧列,包含输出组件和日志文本框
with gr.Column():
output = gr.File(label="输出文件", file_count="multiple")
log_textbox = gr.Textbox(label="日志", lines=10, interactive=False)
# 绑定事件,修改输入参数
process = start_button.click(
fn=start_process,
inputs=[input1_file if input1_file.value else input1, input2, shape0_str, shape1_str],
outputs=[output, log_textbox]
)
# 为取消按钮添加点击事件绑定,使用 cancels 属性取消 start_process 任务
cancel_button.click(
fn=None,
inputs=None,
outputs=None,
cancels=[process]
)
# 添加范例
examples = [
["https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth", "", "1,3,128,128", "0,0,0,0"],
["https://github.com/Phhofm/models/releases/download/4xNomos8kSC/4xNomos8kSC.pth", "", "1,3,128,128", "0,0,0,0"],
["https://github.com/Phhofm/models/releases/download/1xDeJPG/1xDeJPG_SRFormer_light.pth", "", "1,3,128,128", "0,0,0,0"],
["https://objectstorage.us-phoenix-1.oraclecloud.com/n/ax6ygfvpvzka/b/open-modeldb-files/o/4x-WTP-ColorDS.pth", "", "1,3,128,128", "0,0,0,0"],
]
gr.Examples(
examples=examples,
inputs=[input1, input2, shape0_str, shape1_str],
outputs=[output, log_textbox],
fn=start_process
)
demo.launch()