from fastapi import FastAPI, UploadFile, File, HTTPException import cv2 import torch import pandas as pd from PIL import Image from transformers import AutoImageProcessor, AutoModelForImageClassification from tqdm import tqdm import shutil from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from huggingface_hub import HfApi import os from dotenv import load_dotenv from typing import Optional # Charger les variables d'environnement, y compris la clé API Hugging Face load_dotenv() api_key = os.getenv("HUGGINGFACE_API_KEY") if not api_key: raise ValueError("La clé API Hugging Face n'est pas définie dans le fichier .env.") # Initialiser l'API Hugging Face hf_api = HfApi() app = FastAPI() # Add CORS middleware to allow requests from Vue.js frontend app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:8080", "https://labeling-tools.onrender.com/", ], allow_credentials=True, allow_methods=["*"], # Permet toutes les méthodes HTTP (GET, POST, etc.) allow_headers=["*"], # Permet tous les en-têtes (Content-Type, Authorization, etc.) ) # Charger le processeur d'image et le modèle fine-tuné localement local_model_path = r'./vit-finetuned-ucf101' processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224") model = AutoModelForImageClassification.from_pretrained(local_model_path) model.eval() # Fonction pour classifier une image def classifier_image(image): image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) inputs = processor(images=image_pil, return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits predicted_class_idx = logits.argmax(-1).item() predicted_class = model.config.id2label[predicted_class_idx] return predicted_class # Fonction pour traiter la vidéo et identifier les séquences de "Surfing" def identifier_sequences_surfing(video_path, intervalle=0.5): cap = cv2.VideoCapture(video_path) frame_rate = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_interval = int(frame_rate * intervalle) sequences_surfing = [] frame_index = 0 in_surf_sequence = False start_timestamp = None with tqdm(total=total_frames, desc="Traitement des frames de la vidéo", unit="frame") as pbar: success, frame = cap.read() while success: if frame_index % frame_interval == 0: timestamp = round(frame_index / frame_rate, 2) classe = classifier_image(frame) if classe == "Surfing" and not in_surf_sequence: in_surf_sequence = True start_timestamp = timestamp elif classe != "Surfing" and in_surf_sequence: in_surf_sequence = False end_timestamp = timestamp sequences_surfing.append((start_timestamp, end_timestamp)) success, frame = cap.read() frame_index += 1 pbar.update(1) if in_surf_sequence: sequences_surfing.append((start_timestamp, round(frame_index / frame_rate, 2))) cap.release() dataframe_sequences = pd.DataFrame(sequences_surfing, columns=["Début", "Fin"]) return dataframe_sequences # Fonction pour convertir les séquences en format JSON def convertir_sequences_en_json(dataframe): events = [] blocks = [] for idx, row in dataframe.iterrows(): block = { "id": f"Surfing{idx + 1}", "start": round(row["Début"], 2), "end": round(row["Fin"], 2) } blocks.append(block) event = { "event": "Surfing", "blocks": blocks } events.append(event) return events # # Endpoint pour analyser la vidéo et uploader sur Hugging Face # @app.post("/analyze_video/") # async def analyze_video(user_name: str, file: UploadFile = File(...)): # try: # # Sauvegarder la vidéo temporairement # temp_file_path = f"/tmp/{file.filename}" # with open(temp_file_path, "wb") as buffer: # shutil.copyfileobj(file.file, buffer) # # Uploader la vidéo sur Hugging Face Hub # dataset_name = "2nzi/Video-Sequence-Labeling" # target_path_in_repo = f"{user_name}/raw/{file.filename}" # hf_api.upload_file( # path_or_fileobj=temp_file_path, # path_in_repo=target_path_in_repo, # repo_id=dataset_name, # repo_type="dataset", # token=api_key # ) # # Analyser la vidéo pour trouver des séquences "Surfing" # dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1) # json_result = convertir_sequences_en_json(dataframe_sequences) # # Supprimer le fichier temporaire après l'upload # os.remove(temp_file_path) # return {"message": "Video uploaded and analyzed successfully!", # "file_url": f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}", # "analysis": json_result} # except Exception as e: # raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}") # @app.post("/analyze_video/") # async def analyze_video(user_name: str, file: Optional[UploadFile] = File(None), video_url: Optional[str] = None): # try: # # Vérifier si la vidéo est fournie sous forme de fichier ou d'URL # if file: # # Sauvegarder la vidéo temporairement # temp_file_path = f"/tmp/{file.filename}" # with open(temp_file_path, "wb") as buffer: # shutil.copyfileobj(file.file, buffer) # # Uploader la vidéo sur Hugging Face Hub # dataset_name = "2nzi/Video-Sequence-Labeling" # target_path_in_repo = f"{user_name}/raw/{file.filename}" # hf_api.upload_file( # path_or_fileobj=temp_file_path, # path_in_repo=target_path_in_repo, # repo_id=dataset_name, # repo_type="dataset", # token=os.getenv("HUGGINGFACE_WRITE_API_KEY") # ) # # URL de la vidéo sur Hugging Face # video_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{target_path_in_repo}" # # Supprimer le fichier temporaire après l'upload # os.remove(temp_file_path) # # Assurez-vous d'avoir une URL valide à ce stade # if not video_url: # raise HTTPException(status_code=400, detail="No valid video URL or file provided.") # # Analyser la vidéo via l'URL # dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1) # json_result = convertir_sequences_en_json(dataframe_sequences) # return { # "message": "Video uploaded and analyzed successfully!", # "file_url": video_url, # "analysis": json_result # } # except Exception as e: # raise HTTPException(status_code=500, detail=f"Failed to upload or analyze video: {str(e)}") # Endpoint pour analyser la vidéo via une URL @app.post("/analyze_video/") async def analyze_video(user_name: str, video_url: Optional[str] = None): try: # Assurez-vous d'avoir une URL valide à ce stade if not video_url: raise HTTPException(status_code=400, detail="No valid video URL provided.") # Analyser la vidéo via l'URL dataframe_sequences = identifier_sequences_surfing(video_url, intervalle=1) json_result = convertir_sequences_en_json(dataframe_sequences) return { "message": "Video analyzed successfully!", "file_url": video_url, "analysis": json_result } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}") # # Endpoint pour analyser la vidéo à partir d'une URL # @app.post("/analyze_video/") # async def analyze_video(user_name: str, video_url: Optional[str] = None): # try: # # Log for debugging purposes # print(f"Received user_name: {user_name}") # print(f"Received video_url: {video_url}") # # Assurez-vous d'avoir une URL valide à ce stade # if not video_url: # raise HTTPException(status_code=400, detail="No valid video URL provided.") # # Télécharge la vidéo temporairement pour l'analyser # temp_file_path = "/tmp/video_to_analyze.mp4" # os.system(f"wget -O {temp_file_path} {video_url}") # # Analyser la vidéo via l'URL # dataframe_sequences = identifier_sequences_surfing(temp_file_path, intervalle=1) # json_result = convertir_sequences_en_json(dataframe_sequences) # # Supprimer la vidéo temporaire après l'analyse # os.remove(temp_file_path) # return { # "message": "Video analyzed successfully!", # "file_url": video_url, # "analysis": json_result # } # except Exception as e: # print(f"Error during video analysis: {str(e)}") # Log the error # raise HTTPException(status_code=500, detail=f"Failed to analyze video: {str(e)}") @app.post("/upload_video/") async def upload_video(user_name: str, file: UploadFile = File(...)): try: print(f"Received request to upload video for user: {user_name}") # Sauvegarder le fichier temporairement temp_file_path = f"/tmp/{file.filename}" with open(temp_file_path, "wb") as buffer: buffer.write(await file.read()) # Préparer l'upload sur Hugging Face dataset_name = "2nzi/Video-Sequence-Labeling" repo_path = f"{user_name}/raw/{file.filename}" print(f"Uploading {temp_file_path} to Hugging Face at {repo_path}") hf_api.upload_file( path_or_fileobj=temp_file_path, path_in_repo=repo_path, repo_id=dataset_name, repo_type="dataset", token=api_key ) # Générer l'URL finale file_url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}" print(f"File successfully uploaded to: {file_url}") return {"status": "success", "file_url": file_url} except Exception as e: print(f"Error during upload: {str(e)}") raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") # Fonction pour uploader une vidéo vers un dataset Hugging Face def upload_to_hf_dataset(user_name: str, video_path: str): dataset_name = "2nzi/Video-Sequence-Labeling" repo_path = f"{user_name}/raw/{os.path.basename(video_path)}" try: hf_api.upload_file( path_or_fileobj=video_path, #chelou path_in_repo=repo_path, repo_id=dataset_name, repo_type="dataset", token=api_key ) # Retourner l'URL de la vidéo après l'upload url = f"https://huggingface.co/datasets/{dataset_name}/resolve/main/{repo_path}" return {"status": "success", "url": url} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/", response_class=HTMLResponse) async def index(): return ( """

Hello world!

This `/` is the most simple and default endpoint.

If you want to learn more, check out the documentation of the API at /docs or external docs.

""" ) # Lancer l'application avec uvicorn (command line) # uvicorn main:app --reload # http://localhost:8000/docs#/ # (.venv) PS C:\Users\antoi\Documents\Work_Learn\Labeling-Deploy\FastAPI> uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1