|
''' |
|
from flask import Flask, request, jsonify, send_from_directory, render_template |
|
from flask_cors import CORS |
|
from ultralytics import YOLO |
|
import gradio as gr |
|
from threading import Thread |
|
import os |
|
import uuid |
|
import logging |
|
from PIL import Image |
|
|
|
# 配置日志记录 |
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s', datefmt='%Y-%m-%d %H:%M:%S') |
|
|
|
# 创建 Flask 应用 |
|
app = Flask(__name__, static_folder='static') |
|
CORS(app) |
|
|
|
# 定义模型路径 |
|
models = { |
|
'追踪': 'models/yolov8n.pt', |
|
'检测': 'models/danzhu.pt', |
|
'分类': 'models/yolov8n-cls.pt', |
|
'姿势': 'models/yolov8n-pose.pt', |
|
'分割': 'models/yolov8n-seg.pt' |
|
} |
|
|
|
model_instances = {} |
|
|
|
def load_model(model_path): |
|
"""加载模型""" |
|
try: |
|
logging.info(f"正在从 {model_path} 加载模型...") |
|
model = YOLO(model_path) |
|
logging.info(f"模型从 {model_path} 成功加载") |
|
return model |
|
except Exception as e: |
|
logging.error(f"从 {model_path} 加载模型失败: {e}") |
|
return None |
|
|
|
def convert_image_format(img_path, target_format='JPEG'): |
|
"""转换图像格式""" |
|
try: |
|
with Image.open(img_path) as img: |
|
if img.mode != 'RGB': |
|
img = img.convert('RGB') |
|
base_name, _ = os.path.splitext(img_path) |
|
target_path = f"{base_name}.{target_format.lower()}" |
|
img.save(target_path, format=target_format) |
|
logging.info(f"图像格式成功转换为 {target_format},保存到 {target_path}") |
|
return target_path |
|
except Exception as e: |
|
logging.error(f"图像格式转换失败: {e}") |
|
raise |
|
|
|
def predict(model_name, img_path): |
|
"""进行预测""" |
|
try: |
|
if model_name not in models: |
|
logging.error("选择的模型无效。") |
|
return "选择的模型无效。" |
|
|
|
model_path = models[model_name] |
|
if model_name not in model_instances: |
|
model_instances[model_name] = load_model(model_path) |
|
model = model_instances[model_name] |
|
|
|
if model is None: |
|
logging.error("由于连接错误,模型未加载。") |
|
return "由于连接错误,模型未加载。" |
|
|
|
unique_name = str(uuid.uuid4()) |
|
save_dir = './runs/detect' |
|
os.makedirs(save_dir, exist_ok=True) |
|
logging.info(f"保存目录: {save_dir}") |
|
|
|
# 转换图像格式 |
|
img_path_converted = convert_image_format(img_path, 'JPEG') |
|
img_path_converted = os.path.normpath(img_path_converted) |
|
logging.info(f"对 {img_path_converted} 进行预测...") |
|
|
|
results = model.predict(img_path_converted, save=True, project=save_dir, name=unique_name, device='cpu') |
|
logging.info(f"预测结果: {results}") |
|
|
|
result_dir = os.path.join(save_dir, unique_name) |
|
result_dir = os.path.normpath(result_dir) |
|
logging.info(f"结果目录: {result_dir}") |
|
|
|
if not os.path.exists(result_dir): |
|
logging.error(f"结果目录 {result_dir} 不存在") |
|
return "未找到预测结果。" |
|
|
|
# 查找预测结果文件 |
|
predicted_img_path = None |
|
for file in os.listdir(result_dir): |
|
if file.lower().endswith(('.jpeg', '.jpg')): |
|
predicted_img_path = os.path.join(result_dir, file) |
|
break |
|
|
|
if predicted_img_path: |
|
logging.info(f"找到预测图像: {predicted_img_path}") |
|
return predicted_img_path |
|
else: |
|
logging.error(f"在 {result_dir} 中未找到预测图像") |
|
return "未找到预测结果。" |
|
except Exception as e: |
|
logging.error(f"预测过程中出错: {e}") |
|
return f"预测过程中出错: {e}" |
|
|
|
# 定义 Gradio 界面 |
|
iface = gr.Interface( |
|
fn=predict, |
|
inputs=[ |
|
gr.Dropdown(choices=list(models.keys()), label="选择模型"), |
|
gr.Image(type="filepath", label="输入图像") |
|
], |
|
outputs=gr.Image(type="filepath", label="输出图像") |
|
) |
|
|
|
@app.route('/') |
|
def home(): |
|
"""主页""" |
|
return render_template('index.html') |
|
|
|
@app.route('/request', methods=['POST']) |
|
def handle_request(): |
|
"""处理请求""" |
|
try: |
|
selected_model = request.form.get('model') |
|
if selected_model not in models: |
|
logging.error("选择的模型无效。") |
|
return jsonify({'error': '选择的模型无效。'}), 400 |
|
|
|
model_path = models[selected_model] |
|
if selected_model not in model_instances: |
|
model_instances[selected_model] = load_model(model_path) |
|
model = model_instances[selected_model] |
|
|
|
if model is None: |
|
logging.error("由于连接错误,模型未加载。") |
|
return jsonify({'error': '由于连接错误,模型未加载。'}), 500 |
|
|
|
img = request.files.get('img') |
|
if img is None: |
|
logging.error("未提供图像。") |
|
return jsonify({'error': '未提供图像。'}), 400 |
|
|
|
img_name = str(uuid.uuid4()) + '.jpg' |
|
img_path = os.path.join('./img', img_name) |
|
os.makedirs(os.path.dirname(img_path), exist_ok=True) |
|
img.save(img_path) |
|
logging.info(f"图像已保存到: {img_path}") |
|
|
|
save_dir = './runs/detect' |
|
os.makedirs(save_dir, exist_ok=True) |
|
unique_name = str(uuid.uuid4()) |
|
logging.info(f"对 {img_path} 进行预测...") |
|
results = model.predict(img_path, save=True, project=save_dir, name=unique_name, device='cpu') |
|
logging.info(f"预测结果: {results}") |
|
|
|
result_dir = os.path.join(save_dir, unique_name) |
|
|
|
# 查找预测结果文件 |
|
predicted_img_path = None |
|
for file in os.listdir(result_dir): |
|
if file.endswith('.jpeg') or file.endswith('.jpg'): |
|
predicted_img_path = os.path.join(result_dir, file) |
|
break |
|
|
|
if predicted_img_path: |
|
img_url = f'/get/{unique_name}/{os.path.basename(predicted_img_path)}' |
|
return jsonify({'message': '预测成功!', 'img_path': img_url}) |
|
else: |
|
saved_files = os.listdir(result_dir) |
|
logging.error(f"保存目录中包含文件: {saved_files}") |
|
return jsonify({'error': '未找到预测结果。'}), 500 |
|
except Exception as e: |
|
logging.error(f"处理请求时出错: {e}") |
|
return jsonify({'error': f'处理过程中发生错误: {e}'}), 500 |
|
|
|
@app.route('/get/<unique_name>/<filename>') |
|
def get_image(unique_name, filename): |
|
"""获取图像""" |
|
try: |
|
return send_from_directory(os.path.join('runs/detect', unique_name), filename) |
|
except Exception as e: |
|
logging.error(f"提供文件时出错: {e}") |
|
return jsonify({'error': '文件未找到。'}), 404 |
|
|
|
def run_gradio(): |
|
"""运行 Gradio 界面""" |
|
logging.info("启动 Gradio 界面...") |
|
iface.launch(share=True) # 设置 share=True 以便公开访问 |
|
|
|
def run_flask(): |
|
"""运行 Flask 应用""" |
|
logging.info("启动 Flask 应用...") |
|
app.run(host="0.0.0.0", port=5000) |
|
|
|
if __name__ == '__main__': |
|
# 启动 Flask 和 Gradio 线程 |
|
gradio_thread = Thread(target=run_gradio) |
|
flask_thread = Thread(target=run_flask) |
|
|
|
gradio_thread.start() |
|
flask_thread.start() |
|
|
|
gradio_thread.join() |
|
flask_thread.join() |
|
''' |
|
|
|
|
|
''' |
|
from ultralytics import YOLO |
|
|
|
# Load a model |
|
model = YOLO("yolov8n.yaml") # build a new model from YAML |
|
model = YOLO("yolov8n.pt") # load a pretrained model (recommended for training) |
|
model = YOLO("yolov8n.yaml").load("yolov8n.pt") # build from YAML and transfer weights |
|
|
|
# Train the model |
|
results = model.train(data="coco8.yaml", epochs=100, imgsz=640) |
|
''' |
|
|
|
''' |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torchvision import datasets, transforms |
|
from torch.utils.data import DataLoader |
|
|
|
# 定义模型 |
|
class SimpleCNN(nn.Module): |
|
def __init__(self, num_classes=10): |
|
super(SimpleCNN, self).__init__() |
|
self.conv1 = nn.Conv2d(1, 20, 5, 1) |
|
self.conv2 = nn.Conv2d(20, 50, 5, 1) |
|
self.fc1 = nn.Linear(4*4*50, 500) |
|
self.fc2 = nn.Linear(500, num_classes) |
|
|
|
def forward(self, x): |
|
x = torch.relu(self.conv1(x)) |
|
x = torch.max_pool2d(x, 2, 2) |
|
x = torch.relu(self.conv2(x)) |
|
x = torch.max_pool2d(x, 2, 2) |
|
x = x.view(-1, 4*4*50) |
|
x = torch.relu(self.fc1(x)) |
|
x = self.fc2(x) |
|
return x |
|
|
|
# 加载数据集 |
|
transform = transforms.Compose([ |
|
transforms.ToTensor(), |
|
transforms.Normalize((0.5,), (0.5,)) |
|
]) |
|
|
|
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True) |
|
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) |
|
|
|
# 初始化模型和优化器 |
|
model = SimpleCNN(num_classes=10) |
|
optimizer = optim.Adam(model.parameters(), lr=0.001) |
|
criterion = nn.CrossEntropyLoss() |
|
|
|
# 训练模型 |
|
num_epochs = 5 |
|
for epoch in range(num_epochs): |
|
for i, (images, labels) in enumerate(train_loader): |
|
# 前向传播 |
|
outputs = model(images) |
|
loss = criterion(outputs, labels) |
|
|
|
# 反向传播和优化 |
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
if (i+1) % 100 == 0: |
|
print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}') |
|
|
|
# 保存模型(可选) |
|
torch.save(model.state_dict(), 'model.pth') |
|
''' |
|
|
|
''' |
|
from datasets import load_dataset |
|
|
|
# 加载数据集 |
|
dataset = load_dataset('glue', 'sst2') # 这里的'sst2'是GLUE数据集下的一个子集 |
|
|
|
# 查看数据集内容 |
|
print(dataset['train'][:2]) # 查看训练集的前两个样本 |
|
''' |
|
|
|
''' |
|
from datasets import load_dataset |
|
# 加载数据集 |
|
dataset = load_dataset('fka/awesome-chatgpt-prompts') |
|
# 查看数据集的子集 |
|
print(dataset.keys()) # 这将输出数据集中所有可用的子集名称,例如:dict_keys(['train', 'validation', 'test']) |
|
# 访问特定子集的数据 |
|
train_dataset = dataset['train'] |
|
print(train_dataset[:2]) # 查看训练集的前两个样本 |
|
# 如果你知道确切的子集名称,也可以直接加载它 |
|
# train_dataset = load_dataset('fka/awesome-chatgpt-prompts', split='train') |
|
''' |
|
|
|
''' |
|
from datasets import load_dataset |
|
dataset = load_dataset("aspnet/yoloensembledata") |
|
#print(dataset) |
|
print(dataset['train']) |
|
print(dataset.keys()) |
|
print(dataset['test']) |
|
print(dataset['validation']) |
|
''' |
|
|
|
''' |
|
from ultralytics import YOLO |
|
|
|
# Load a model |
|
model = YOLO("yolov8n.yaml") # build a new model from YAML |
|
model = YOLO("yolov8n.pt") # load a pretrained model (recommended for training) |
|
model = YOLO("yolov8n.yaml").load("yolov8n.pt") # build from YAML and transfer weights |
|
|
|
# Train the model |
|
results = model.train(data="coco8.yaml", epochs=100, imgsz=640) |
|
''' |
|
|
|
import zipfile |
|
|
|
def unzip_file(zip_path, extract_to): |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(extract_to) |
|
|
|
zip_file_path = 'Math Equation by YOLO-NAS.v2i.yolov8.zip' |
|
extract_to_path = 'MathEquationbyYOLO-NAS.v2i.yolov8' |
|
|
|
unzip_file(zip_file_path, extract_to_path) |
|
|