dlp / app.py
aspnet's picture
Update app.py
4b30ddd verified
'''
from flask import Flask, request, jsonify, send_from_directory, render_template
from flask_cors import CORS
from ultralytics import YOLO
import gradio as gr
from threading import Thread
import os
import uuid
import logging
from PIL import Image
# 配置日志记录
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# 创建 Flask 应用
app = Flask(__name__, static_folder='static')
CORS(app)
# 定义模型路径
models = {
'追踪': 'models/yolov8n.pt',
'检测': 'models/danzhu.pt',
'分类': 'models/yolov8n-cls.pt',
'姿势': 'models/yolov8n-pose.pt',
'分割': 'models/yolov8n-seg.pt'
}
model_instances = {}
def load_model(model_path):
"""加载模型"""
try:
logging.info(f"正在从 {model_path} 加载模型...")
model = YOLO(model_path)
logging.info(f"模型从 {model_path} 成功加载")
return model
except Exception as e:
logging.error(f"从 {model_path} 加载模型失败: {e}")
return None
def convert_image_format(img_path, target_format='JPEG'):
"""转换图像格式"""
try:
with Image.open(img_path) as img:
if img.mode != 'RGB':
img = img.convert('RGB')
base_name, _ = os.path.splitext(img_path)
target_path = f"{base_name}.{target_format.lower()}"
img.save(target_path, format=target_format)
logging.info(f"图像格式成功转换为 {target_format},保存到 {target_path}")
return target_path
except Exception as e:
logging.error(f"图像格式转换失败: {e}")
raise
def predict(model_name, img_path):
"""进行预测"""
try:
if model_name not in models:
logging.error("选择的模型无效。")
return "选择的模型无效。"
model_path = models[model_name]
if model_name not in model_instances:
model_instances[model_name] = load_model(model_path)
model = model_instances[model_name]
if model is None:
logging.error("由于连接错误,模型未加载。")
return "由于连接错误,模型未加载。"
unique_name = str(uuid.uuid4())
save_dir = './runs/detect'
os.makedirs(save_dir, exist_ok=True)
logging.info(f"保存目录: {save_dir}")
# 转换图像格式
img_path_converted = convert_image_format(img_path, 'JPEG')
img_path_converted = os.path.normpath(img_path_converted)
logging.info(f"对 {img_path_converted} 进行预测...")
results = model.predict(img_path_converted, save=True, project=save_dir, name=unique_name, device='cpu')
logging.info(f"预测结果: {results}")
result_dir = os.path.join(save_dir, unique_name)
result_dir = os.path.normpath(result_dir)
logging.info(f"结果目录: {result_dir}")
if not os.path.exists(result_dir):
logging.error(f"结果目录 {result_dir} 不存在")
return "未找到预测结果。"
# 查找预测结果文件
predicted_img_path = None
for file in os.listdir(result_dir):
if file.lower().endswith(('.jpeg', '.jpg')):
predicted_img_path = os.path.join(result_dir, file)
break
if predicted_img_path:
logging.info(f"找到预测图像: {predicted_img_path}")
return predicted_img_path
else:
logging.error(f"在 {result_dir} 中未找到预测图像")
return "未找到预测结果。"
except Exception as e:
logging.error(f"预测过程中出错: {e}")
return f"预测过程中出错: {e}"
# 定义 Gradio 界面
iface = gr.Interface(
fn=predict,
inputs=[
gr.Dropdown(choices=list(models.keys()), label="选择模型"),
gr.Image(type="filepath", label="输入图像")
],
outputs=gr.Image(type="filepath", label="输出图像")
)
@app.route('/')
def home():
"""主页"""
return render_template('index.html')
@app.route('/request', methods=['POST'])
def handle_request():
"""处理请求"""
try:
selected_model = request.form.get('model')
if selected_model not in models:
logging.error("选择的模型无效。")
return jsonify({'error': '选择的模型无效。'}), 400
model_path = models[selected_model]
if selected_model not in model_instances:
model_instances[selected_model] = load_model(model_path)
model = model_instances[selected_model]
if model is None:
logging.error("由于连接错误,模型未加载。")
return jsonify({'error': '由于连接错误,模型未加载。'}), 500
img = request.files.get('img')
if img is None:
logging.error("未提供图像。")
return jsonify({'error': '未提供图像。'}), 400
img_name = str(uuid.uuid4()) + '.jpg'
img_path = os.path.join('./img', img_name)
os.makedirs(os.path.dirname(img_path), exist_ok=True)
img.save(img_path)
logging.info(f"图像已保存到: {img_path}")
save_dir = './runs/detect'
os.makedirs(save_dir, exist_ok=True)
unique_name = str(uuid.uuid4())
logging.info(f"对 {img_path} 进行预测...")
results = model.predict(img_path, save=True, project=save_dir, name=unique_name, device='cpu')
logging.info(f"预测结果: {results}")
result_dir = os.path.join(save_dir, unique_name)
# 查找预测结果文件
predicted_img_path = None
for file in os.listdir(result_dir):
if file.endswith('.jpeg') or file.endswith('.jpg'):
predicted_img_path = os.path.join(result_dir, file)
break
if predicted_img_path:
img_url = f'/get/{unique_name}/{os.path.basename(predicted_img_path)}'
return jsonify({'message': '预测成功!', 'img_path': img_url})
else:
saved_files = os.listdir(result_dir)
logging.error(f"保存目录中包含文件: {saved_files}")
return jsonify({'error': '未找到预测结果。'}), 500
except Exception as e:
logging.error(f"处理请求时出错: {e}")
return jsonify({'error': f'处理过程中发生错误: {e}'}), 500
@app.route('/get/<unique_name>/<filename>')
def get_image(unique_name, filename):
"""获取图像"""
try:
return send_from_directory(os.path.join('runs/detect', unique_name), filename)
except Exception as e:
logging.error(f"提供文件时出错: {e}")
return jsonify({'error': '文件未找到。'}), 404
def run_gradio():
"""运行 Gradio 界面"""
logging.info("启动 Gradio 界面...")
iface.launch(share=True) # 设置 share=True 以便公开访问
def run_flask():
"""运行 Flask 应用"""
logging.info("启动 Flask 应用...")
app.run(host="0.0.0.0", port=5000)
if __name__ == '__main__':
# 启动 Flask 和 Gradio 线程
gradio_thread = Thread(target=run_gradio)
flask_thread = Thread(target=run_flask)
gradio_thread.start()
flask_thread.start()
gradio_thread.join()
flask_thread.join()
'''
#############################
'''
from ultralytics import YOLO
# Load a model
model = YOLO("yolov8n.yaml") # build a new model from YAML
model = YOLO("yolov8n.pt") # load a pretrained model (recommended for training)
model = YOLO("yolov8n.yaml").load("yolov8n.pt") # build from YAML and transfer weights
# Train the model
results = model.train(data="coco8.yaml", epochs=100, imgsz=640)
'''
###################################
'''
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# 定义模型
class SimpleCNN(nn.Module):
def __init__(self, num_classes=10):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4*4*50, 500)
self.fc2 = nn.Linear(500, num_classes)
def forward(self, x):
x = torch.relu(self.conv1(x))
x = torch.max_pool2d(x, 2, 2)
x = torch.relu(self.conv2(x))
x = torch.max_pool2d(x, 2, 2)
x = x.view(-1, 4*4*50)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
# 加载数据集
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# 初始化模型和优化器
model = SimpleCNN(num_classes=10)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# 训练模型
num_epochs = 5
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(train_loader):
# 前向传播
outputs = model(images)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 100 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
# 保存模型(可选)
torch.save(model.state_dict(), 'model.pth')
'''
####################################
'''
from datasets import load_dataset
# 加载数据集
dataset = load_dataset('glue', 'sst2') # 这里的'sst2'是GLUE数据集下的一个子集
# 查看数据集内容
print(dataset['train'][:2]) # 查看训练集的前两个样本
'''
################################
'''
from datasets import load_dataset
# 加载数据集
dataset = load_dataset('fka/awesome-chatgpt-prompts')
# 查看数据集的子集
print(dataset.keys()) # 这将输出数据集中所有可用的子集名称,例如:dict_keys(['train', 'validation', 'test'])
# 访问特定子集的数据
train_dataset = dataset['train']
print(train_dataset[:2]) # 查看训练集的前两个样本
# 如果你知道确切的子集名称,也可以直接加载它
# train_dataset = load_dataset('fka/awesome-chatgpt-prompts', split='train')
'''
#############################
'''
from datasets import load_dataset
dataset = load_dataset("aspnet/yoloensembledata")
#print(dataset)
print(dataset['train'])
print(dataset.keys())
print(dataset['test'])
print(dataset['validation'])
'''
#########################
'''
from ultralytics import YOLO
# Load a model
model = YOLO("yolov8n.yaml") # build a new model from YAML
model = YOLO("yolov8n.pt") # load a pretrained model (recommended for training)
model = YOLO("yolov8n.yaml").load("yolov8n.pt") # build from YAML and transfer weights
# Train the model
results = model.train(data="coco8.yaml", epochs=100, imgsz=640)
'''
###################################
import zipfile
def unzip_file(zip_path, extract_to):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
zip_file_path = 'Math Equation by YOLO-NAS.v2i.yolov8.zip' # 替换为你的zip文件路径
extract_to_path = 'MathEquationbyYOLO-NAS.v2i.yolov8' # 替换为你希望解压到的目录路径
unzip_file(zip_file_path, extract_to_path)