2nzi's picture
Update app.py
3b47c53 verified
from fastapi import FastAPI, UploadFile, File, HTTPException
import cv2
import torch
import pandas as pd
from PIL import Image
from transformers import AutoImageProcessor, AutoModelForImageClassification
from tqdm import tqdm
import shutil
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from huggingface_hub import HfApi
import os
from dotenv import load_dotenv
from typing import Optional
# Charger les variables d'environnement, y compris la clé API Hugging Face
load_dotenv()
api_key = os.getenv("HUGGINGFACE_API_KEY")
if not api_key:
raise ValueError("La clé API Hugging Face n'est pas définie dans le fichier .env.")
# Initialiser l'API Hugging Face
hf_api = HfApi()
app = FastAPI()
# Add CORS middleware to allow requests from Vue.js frontend
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:8080",
"https://labeling-tools.onrender.com/",
],
allow_credentials=True,
allow_methods=["*"], # Permet toutes les méthodes HTTP (GET, POST, etc.)
allow_headers=["*"], # Permet tous les en-têtes (Content-Type, Authorization, etc.)
)
# Charger le processeur d'image et le modèle fine-tuné localement
local_model_path = r'./vit-finetuned-ucf101'
processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224")
model = AutoModelForImageClassification.from_pretrained(local_model_path)
model.eval()
# Fonction pour classifier une image
def classifier_image(image):
image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
inputs = processor(images=image_pil, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
predicted_class_idx = logits.argmax(-1).item()
predicted_class = model.config.id2label[predicted_class_idx]
return predicted_class
# Fonction pour traiter la vidéo et identifier les séquences de "Surfing"
def identifier_sequences_surfing(video_path, intervalle=0.5):
cap = cv2.VideoCapture(video_path)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_interval = int(frame_rate * intervalle)
sequences_surfing = []
frame_index = 0
in_surf_sequence = False
start_timestamp = None
with tqdm(total=total_frames, desc="Traitement des frames de la vidéo", unit="frame") as pbar:
success, frame = cap.read()
while success:
if frame_index % frame_interval == 0:
timestamp = round(frame_index / frame_rate, 2)
classe = classifier_image(frame)
if classe == "Surfing" and not in_surf_sequence:
in_surf_sequence = True
start_timestamp = timestamp
elif classe != "Surfing" and in_surf_sequence:
in_surf_sequence = False
end_timestamp = timestamp
sequences_surfing.append((start_timestamp, end_timestamp))
success, frame = cap.read()
frame_index += 1
pbar.update(1)
if in_surf_sequence:
sequences_surfing.append((start_timestamp, round(frame_index / frame_rate, 2)))
cap.release()
dataframe_sequences = pd.DataFrame(sequences_surfing, columns=["Début", "Fin"])
return dataframe_sequences
# Fonction pour convertir les séquences en format JSON
def convertir_sequences_en_json(dataframe):
events = []
blocks = []
for idx, row in dataframe.iterrows():
block = {
"id": f"Surfing{idx + 1}",
"start": round(row["Début"], 2),
"end": round(row["Fin"], 2)
}
blocks.append(block)
event = {
"event": "Surfing",
"blocks": blocks
}
events.append(event)
return events
# # Endpoint pour analyser la vidéo et uploader sur Hugging Face
# @app.post("/analyze_video/")
# async def analyze_video(user_name: str, file: UploadFile = File(...)):
# try:
# # Sauvegarder la vidéo temporairement
# temp_file_path = f"/tmp/{file.filename}"
# with open(temp_file_path, "wb") as buffer:
# shutil.copyfileobj(file.file, buffer)
# # Uploader la vidéo sur Hugging Face Hub
# dataset_name = "2nzi/Video-Sequence-Labeling"
# target_path_in_repo = f"{user_name}/raw/{file.filename}"
# hf_api.upload_file(
# path_or_fileobj=temp_file_path,
# path_in_repo=target_path_in_repo,
# repo_id=dataset_name,
# repo_type="dataset",
# token=api_key
# )
# # Analyser la vidéo pour trouver des séquences "Surfing"
# dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1)
# json_result = convertir_sequences_en_json(dataframe_sequences)
# # Supprimer le fichier temporaire après l'upload
# os.remove(temp_file_path)
# return {"message": "Video uploaded and analyzed successfully!",
# "file_url": f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}",
# "analysis": json_result}
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}")
# @app.post("/analyze_video/")
# async def analyze_video(user_name: str, file: Optional[UploadFile] = File(None), video_url: Optional[str] = None):
# try:
# # Vérifier si la vidéo est fournie sous forme de fichier ou d'URL
# if file:
# # Sauvegarder la vidéo temporairement
# temp_file_path = f"/tmp/{file.filename}"
# with open(temp_file_path, "wb") as buffer:
# shutil.copyfileobj(file.file, buffer)
# # Uploader la vidéo sur Hugging Face Hub
# dataset_name = "2nzi/Video-Sequence-Labeling"
# target_path_in_repo = f"{user_name}/raw/{file.filename}"
# hf_api.upload_file(
# path_or_fileobj=temp_file_path,
# path_in_repo=target_path_in_repo,
# repo_id=dataset_name,
# repo_type="dataset",
# token=os.getenv("HUGGINGFACE_WRITE_API_KEY")
# )
# # URL de la vidéo sur Hugging Face
# video_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}"
# # Supprimer le fichier temporaire après l'upload
# os.remove(temp_file_path)
# # Assurez-vous d'avoir une URL valide à ce stade
# if not video_url:
# raise HTTPException(status_code=400, detail="No valid video URL or file provided.")
# # Analyser la vidéo via l'URL
# dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1)
# json_result = convertir_sequences_en_json(dataframe_sequences)
# return {
# "message": "Video uploaded and analyzed successfully!",
# "file_url": video_url,
# "analysis": json_result
# }
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}")
# Endpoint pour analyser la vidéo via une URL
@app.post("/analyze_video/")
async def analyze_video(user_name: str, video_url: Optional[str] = None):
try:
# Assurez-vous d'avoir une URL valide à ce stade
if not video_url:
raise HTTPException(status_code=400, detail="No valid video URL provided.")
# Analyser la vidéo via l'URL
dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1)
json_result = convertir_sequences_en_json(dataframe_sequences)
return {
"message": "Video analyzed successfully!",
"file_url": video_url,
"analysis": json_result
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}")
# # Endpoint pour analyser la vidéo à partir d'une URL
# @app.post("/analyze_video/")
# async def analyze_video(user_name: str, video_url: Optional[str] = None):
# try:
# # Log for debugging purposes
# print(f"Received user_name: {user_name}")
# print(f"Received video_url: {video_url}")
# # Assurez-vous d'avoir une URL valide à ce stade
# if not video_url:
# raise HTTPException(status_code=400, detail="No valid video URL provided.")
# # Télécharge la vidéo temporairement pour l'analyser
# temp_file_path = "/tmp/video_to_analyze.mp4"
# os.system(f"wget -O {temp_file_path} {video_url}")
# # Analyser la vidéo via l'URL
# dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1)
# json_result = convertir_sequences_en_json(dataframe_sequences)
# # Supprimer la vidéo temporaire après l'analyse
# os.remove(temp_file_path)
# return {
# "message": "Video analyzed successfully!",
# "file_url": video_url,
# "analysis": json_result
# }
# except Exception as e:
# print(f"Error during video analysis: {str(e)}") # Log the error
# raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}")
@app.post("/upload_video/")
async def upload_video(user_name: str, file: UploadFile = File(...)):
try:
print(f"Received request to upload video for user: {user_name}")
# Sauvegarder le fichier temporairement
temp_file_path = f"/tmp/{file.filename}"
with open(temp_file_path, "wb") as buffer:
buffer.write(await file.read())
# Préparer l'upload sur Hugging Face
dataset_name = "2nzi/Video-Sequence-Labeling"
repo_path = f"{user_name}/raw/{file.filename}"
print(f"Uploading {temp_file_path} to Hugging Face at {repo_path}")
hf_api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo=repo_path,
repo_id=dataset_name,
repo_type="dataset",
token=api_key
)
# Générer l'URL finale
file_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}"
print(f"File successfully uploaded to: {file_url}")
return {"status": "success", "file_url": file_url}
except Exception as e:
print(f"Error during upload: {str(e)}")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
# Fonction pour uploader une vidéo vers un dataset Hugging Face
def upload_to_hf_dataset(user_name: str, video_path: str):
dataset_name = "2nzi/Video-Sequence-Labeling"
repo_path = f"{user_name}/raw/{os.path.basename(video_path)}"
try:
hf_api.upload_file(
path_or_fileobj=video_path, #chelou
path_in_repo=repo_path,
repo_id=dataset_name,
repo_type="dataset",
token=api_key
)
# Retourner l'URL de la vidéo après l'upload
url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}"
return {"status": "success", "url": url}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/", response_class=HTMLResponse)
async def index():
return (
"""
<html>
<body>
<h1>Hello world!</h1>
<p>This `/` is the most simple and default endpoint.</p>
<p>If you want to learn more, check out the documentation of the API at
<a href='/docs'>/docs</a> or
<a href='https://2nzi-video-sequence-labeling.hf.space/docs' target='_blank'>external docs</a>.
</p>
</body>
</html>
"""
)
# Lancer l'application avec uvicorn (command line)
# uvicorn main:app --reload
# http://localhost:8000/docs#/
# (.venv) PS C:\Users\antoi\Documents\Work_Learn\Labeling-Deploy\FastAPI> uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1