Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, UploadFile, File, HTTPException | |
import cv2 | |
import torch | |
import pandas as pd | |
from PIL import Image | |
from transformers import AutoImageProcessor, AutoModelForImageClassification | |
from tqdm import tqdm | |
import shutil | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import HTMLResponse | |
from huggingface_hub import HfApi | |
import os | |
from dotenv import load_dotenv | |
from typing import Optional | |
# Charger les variables d'environnement, y compris la clé API Hugging Face | |
load_dotenv() | |
api_key = os.getenv("HUGGINGFACE_API_KEY") | |
if not api_key: | |
raise ValueError("La clé API Hugging Face n'est pas définie dans le fichier .env.") | |
# Initialiser l'API Hugging Face | |
hf_api = HfApi() | |
app = FastAPI() | |
# Add CORS middleware to allow requests from Vue.js frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=[ | |
"http://localhost:8080", | |
"https://labeling-tools.onrender.com/", | |
], | |
allow_credentials=True, | |
allow_methods=["*"], # Permet toutes les méthodes HTTP (GET, POST, etc.) | |
allow_headers=["*"], # Permet tous les en-têtes (Content-Type, Authorization, etc.) | |
) | |
# Charger le processeur d'image et le modèle fine-tuné localement | |
local_model_path = r'./vit-finetuned-ucf101' | |
processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224") | |
model = AutoModelForImageClassification.from_pretrained(local_model_path) | |
model.eval() | |
# Fonction pour classifier une image | |
def classifier_image(image): | |
image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
inputs = processor(images=image_pil, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
predicted_class_idx = logits.argmax(-1).item() | |
predicted_class = model.config.id2label[predicted_class_idx] | |
return predicted_class | |
# Fonction pour traiter la vidéo et identifier les séquences de "Surfing" | |
def identifier_sequences_surfing(video_path, intervalle=0.5): | |
cap = cv2.VideoCapture(video_path) | |
frame_rate = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_interval = int(frame_rate * intervalle) | |
sequences_surfing = [] | |
frame_index = 0 | |
in_surf_sequence = False | |
start_timestamp = None | |
with tqdm(total=total_frames, desc="Traitement des frames de la vidéo", unit="frame") as pbar: | |
success, frame = cap.read() | |
while success: | |
if frame_index % frame_interval == 0: | |
timestamp = round(frame_index / frame_rate, 2) | |
classe = classifier_image(frame) | |
if classe == "Surfing" and not in_surf_sequence: | |
in_surf_sequence = True | |
start_timestamp = timestamp | |
elif classe != "Surfing" and in_surf_sequence: | |
in_surf_sequence = False | |
end_timestamp = timestamp | |
sequences_surfing.append((start_timestamp, end_timestamp)) | |
success, frame = cap.read() | |
frame_index += 1 | |
pbar.update(1) | |
if in_surf_sequence: | |
sequences_surfing.append((start_timestamp, round(frame_index / frame_rate, 2))) | |
cap.release() | |
dataframe_sequences = pd.DataFrame(sequences_surfing, columns=["Début", "Fin"]) | |
return dataframe_sequences | |
# Fonction pour convertir les séquences en format JSON | |
def convertir_sequences_en_json(dataframe): | |
events = [] | |
blocks = [] | |
for idx, row in dataframe.iterrows(): | |
block = { | |
"id": f"Surfing{idx + 1}", | |
"start": round(row["Début"], 2), | |
"end": round(row["Fin"], 2) | |
} | |
blocks.append(block) | |
event = { | |
"event": "Surfing", | |
"blocks": blocks | |
} | |
events.append(event) | |
return events | |
# # Endpoint pour analyser la vidéo et uploader sur Hugging Face | |
# @app.post("/analyze_video/") | |
# async def analyze_video(user_name: str, file: UploadFile = File(...)): | |
# try: | |
# # Sauvegarder la vidéo temporairement | |
# temp_file_path = f"/tmp/{file.filename}" | |
# with open(temp_file_path, "wb") as buffer: | |
# shutil.copyfileobj(file.file, buffer) | |
# # Uploader la vidéo sur Hugging Face Hub | |
# dataset_name = "2nzi/Video-Sequence-Labeling" | |
# target_path_in_repo = f"{user_name}/raw/{file.filename}" | |
# hf_api.upload_file( | |
# path_or_fileobj=temp_file_path, | |
# path_in_repo=target_path_in_repo, | |
# repo_id=dataset_name, | |
# repo_type="dataset", | |
# token=api_key | |
# ) | |
# # Analyser la vidéo pour trouver des séquences "Surfing" | |
# dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1) | |
# json_result = convertir_sequences_en_json(dataframe_sequences) | |
# # Supprimer le fichier temporaire après l'upload | |
# os.remove(temp_file_path) | |
# return {"message": "Video uploaded and analyzed successfully!", | |
# "file_url": f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}", | |
# "analysis": json_result} | |
# except Exception as e: | |
# raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}") | |
# @app.post("/analyze_video/") | |
# async def analyze_video(user_name: str, file: Optional[UploadFile] = File(None), video_url: Optional[str] = None): | |
# try: | |
# # Vérifier si la vidéo est fournie sous forme de fichier ou d'URL | |
# if file: | |
# # Sauvegarder la vidéo temporairement | |
# temp_file_path = f"/tmp/{file.filename}" | |
# with open(temp_file_path, "wb") as buffer: | |
# shutil.copyfileobj(file.file, buffer) | |
# # Uploader la vidéo sur Hugging Face Hub | |
# dataset_name = "2nzi/Video-Sequence-Labeling" | |
# target_path_in_repo = f"{user_name}/raw/{file.filename}" | |
# hf_api.upload_file( | |
# path_or_fileobj=temp_file_path, | |
# path_in_repo=target_path_in_repo, | |
# repo_id=dataset_name, | |
# repo_type="dataset", | |
# token=os.getenv("HUGGINGFACE_WRITE_API_KEY") | |
# ) | |
# # URL de la vidéo sur Hugging Face | |
# video_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}" | |
# # Supprimer le fichier temporaire après l'upload | |
# os.remove(temp_file_path) | |
# # Assurez-vous d'avoir une URL valide à ce stade | |
# if not video_url: | |
# raise HTTPException(status_code=400, detail="No valid video URL or file provided.") | |
# # Analyser la vidéo via l'URL | |
# dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1) | |
# json_result = convertir_sequences_en_json(dataframe_sequences) | |
# return { | |
# "message": "Video uploaded and analyzed successfully!", | |
# "file_url": video_url, | |
# "analysis": json_result | |
# } | |
# except Exception as e: | |
# raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}") | |
# Endpoint pour analyser la vidéo via une URL | |
async def analyze_video(user_name: str, video_url: Optional[str] = None): | |
try: | |
# Assurez-vous d'avoir une URL valide à ce stade | |
if not video_url: | |
raise HTTPException(status_code=400, detail="No valid video URL provided.") | |
# Analyser la vidéo via l'URL | |
dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1) | |
json_result = convertir_sequences_en_json(dataframe_sequences) | |
return { | |
"message": "Video analyzed successfully!", | |
"file_url": video_url, | |
"analysis": json_result | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}") | |
# # Endpoint pour analyser la vidéo à partir d'une URL | |
# @app.post("/analyze_video/") | |
# async def analyze_video(user_name: str, video_url: Optional[str] = None): | |
# try: | |
# # Log for debugging purposes | |
# print(f"Received user_name: {user_name}") | |
# print(f"Received video_url: {video_url}") | |
# # Assurez-vous d'avoir une URL valide à ce stade | |
# if not video_url: | |
# raise HTTPException(status_code=400, detail="No valid video URL provided.") | |
# # Télécharge la vidéo temporairement pour l'analyser | |
# temp_file_path = "/tmp/video_to_analyze.mp4" | |
# os.system(f"wget -O {temp_file_path} {video_url}") | |
# # Analyser la vidéo via l'URL | |
# dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1) | |
# json_result = convertir_sequences_en_json(dataframe_sequences) | |
# # Supprimer la vidéo temporaire après l'analyse | |
# os.remove(temp_file_path) | |
# return { | |
# "message": "Video analyzed successfully!", | |
# "file_url": video_url, | |
# "analysis": json_result | |
# } | |
# except Exception as e: | |
# print(f"Error during video analysis: {str(e)}") # Log the error | |
# raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}") | |
async def upload_video(user_name: str, file: UploadFile = File(...)): | |
try: | |
print(f"Received request to upload video for user: {user_name}") | |
# Sauvegarder le fichier temporairement | |
temp_file_path = f"/tmp/{file.filename}" | |
with open(temp_file_path, "wb") as buffer: | |
buffer.write(await file.read()) | |
# Préparer l'upload sur Hugging Face | |
dataset_name = "2nzi/Video-Sequence-Labeling" | |
repo_path = f"{user_name}/raw/{file.filename}" | |
print(f"Uploading {temp_file_path} to Hugging Face at {repo_path}") | |
hf_api.upload_file( | |
path_or_fileobj=temp_file_path, | |
path_in_repo=repo_path, | |
repo_id=dataset_name, | |
repo_type="dataset", | |
token=api_key | |
) | |
# Générer l'URL finale | |
file_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}" | |
print(f"File successfully uploaded to: {file_url}") | |
return {"status": "success", "file_url": file_url} | |
except Exception as e: | |
print(f"Error during upload: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") | |
# Fonction pour uploader une vidéo vers un dataset Hugging Face | |
def upload_to_hf_dataset(user_name: str, video_path: str): | |
dataset_name = "2nzi/Video-Sequence-Labeling" | |
repo_path = f"{user_name}/raw/{os.path.basename(video_path)}" | |
try: | |
hf_api.upload_file( | |
path_or_fileobj=video_path, #chelou | |
path_in_repo=repo_path, | |
repo_id=dataset_name, | |
repo_type="dataset", | |
token=api_key | |
) | |
# Retourner l'URL de la vidéo après l'upload | |
url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}" | |
return {"status": "success", "url": url} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
async def index(): | |
return ( | |
""" | |
<html> | |
<body> | |
<h1>Hello world!</h1> | |
<p>This `/` is the most simple and default endpoint.</p> | |
<p>If you want to learn more, check out the documentation of the API at | |
<a href='/docs'>/docs</a> or | |
<a href='https://2nzi-video-sequence-labeling.hf.space/docs' target='_blank'>external docs</a>. | |
</p> | |
</body> | |
</html> | |
""" | |
) | |
# Lancer l'application avec uvicorn (command line) | |
# uvicorn main:app --reload | |
# http://localhost:8000/docs#/ | |
# (.venv) PS C:\Users\antoi\Documents\Work_Learn\Labeling-Deploy\FastAPI> uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1 | |