Spaces:
Running
on
Zero
Running
on
Zero
| import mediapipe as mp | |
| from mediapipe.tasks import python | |
| from mediapipe.tasks.python import vision | |
| import cv2 | |
| import numpy as np | |
| import json | |
| from pathlib import Path | |
| import decord | |
| from typing import Dict, Optional, Tuple, Any | |
| class HolisticDetector: | |
| """ | |
| A class for detecting face, hand, and pose landmarks in videos using MediaPipe. | |
| """ | |
| def __init__(self, face_model_path: str, hand_model_path: str, | |
| min_detection_confidence: float = 0.1, | |
| min_hand_detection_confidence: float = 0.05, | |
| max_faces: int = 6, max_hands: int = 6): | |
| """ | |
| Initialize the HolisticDetector with model paths and configuration. | |
| Args: | |
| face_model_path: Path to the face detection model | |
| hand_model_path: Path to the hand detection model | |
| min_detection_confidence: Minimum confidence for pose detection | |
| min_hand_detection_confidence: Minimum confidence for hand detection | |
| max_faces: Maximum number of faces to detect | |
| max_hands: Maximum number of hands to detect | |
| """ | |
| self.face_model_path = face_model_path | |
| self.hand_model_path = hand_model_path | |
| self.min_detection_confidence = min_detection_confidence | |
| self.min_hand_detection_confidence = min_hand_detection_confidence | |
| self.max_faces = max_faces | |
| self.max_hands = max_hands | |
| self._initialize_detectors() | |
| def _initialize_detectors(self): | |
| """Initialize the MediaPipe detectors.""" | |
| # Initialize face detector | |
| base_options_face = python.BaseOptions(model_asset_path=self.face_model_path) | |
| options_face = vision.FaceLandmarkerOptions( | |
| base_options=base_options_face, | |
| output_face_blendshapes=True, | |
| output_facial_transformation_matrixes=True, | |
| num_faces=self.max_faces | |
| ) | |
| self.face_detector = vision.FaceLandmarker.create_from_options(options_face) | |
| # Initialize hand detector | |
| base_options_hand = python.BaseOptions(model_asset_path=self.hand_model_path) | |
| options_hand = vision.HandLandmarkerOptions( | |
| base_options=base_options_hand, | |
| num_hands=self.max_hands, | |
| min_hand_detection_confidence=self.min_hand_detection_confidence | |
| ) | |
| self.hand_detector = vision.HandLandmarker.create_from_options(options_hand) | |
| # Initialize holistic model for pose | |
| self.mp_holistic = mp.solutions.holistic.Holistic( | |
| min_detection_confidence=self.min_detection_confidence | |
| ) | |
| def detect_frame_landmarks(self, image: np.ndarray) -> Tuple[Dict[str, int], Dict[str, Any]]: | |
| """ | |
| Detect landmarks in a single frame. | |
| Args: | |
| image: Input image as numpy array | |
| Returns: | |
| Tuple of (bounding_boxes_count, landmarks_data) | |
| """ | |
| results = self.mp_holistic.process(image) | |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image) | |
| face_prediction = self.face_detector.detect(mp_image) | |
| hand_prediction = self.hand_detector.detect(mp_image) | |
| bounding_boxes = {} | |
| landmarks_data = {} | |
| # Process face landmarks | |
| if face_prediction.face_landmarks: | |
| bounding_boxes['#face'] = len(face_prediction.face_landmarks) | |
| landmarks_data['face_landmarks'] = [] | |
| for face in face_prediction.face_landmarks: | |
| landmarks_face = [[landmark.x, landmark.y, landmark.z] for landmark in face] | |
| landmarks_data['face_landmarks'].append(landmarks_face) | |
| else: | |
| bounding_boxes['#face'] = 0 | |
| landmarks_data['face_landmarks'] = None | |
| # Process hand landmarks | |
| if hand_prediction.hand_landmarks: | |
| bounding_boxes['#hands'] = len(hand_prediction.hand_landmarks) | |
| landmarks_data['hand_landmarks'] = [] | |
| for hand in hand_prediction.hand_landmarks: | |
| landmarks_hand = [[landmark.x, landmark.y, landmark.z] for landmark in hand] | |
| landmarks_data['hand_landmarks'].append(landmarks_hand) | |
| else: | |
| bounding_boxes['#hands'] = 0 | |
| landmarks_data['hand_landmarks'] = None | |
| # Process pose landmarks | |
| if results.pose_landmarks: | |
| bounding_boxes['#pose'] = 1 | |
| landmarks_data['pose_landmarks'] = [] | |
| pose_landmarks = [[landmark.x, landmark.y, landmark.z] for landmark in results.pose_landmarks.landmark] | |
| landmarks_data['pose_landmarks'].append(pose_landmarks) | |
| else: | |
| bounding_boxes['#pose'] = 0 | |
| landmarks_data['pose_landmarks'] = None | |
| return bounding_boxes, landmarks_data | |
| def process_video(self, video_input, save_results: bool = False, | |
| output_dir: Optional[str] = None, video_name: Optional[str] = None) -> Dict[int, Any]: | |
| """ | |
| Process a video and extract landmarks from all frames. | |
| Args: | |
| video_input: Either a path to video file (str) or a decord.VideoReader object | |
| save_results: Whether to save results to files | |
| output_dir: Directory to save results (required if save_results=True) | |
| video_name: Name for output files (required if save_results=True and video_input is VideoReader) | |
| Returns: | |
| Dictionary containing landmarks for each frame | |
| Raises: | |
| FileNotFoundError: If video file doesn't exist | |
| ValueError: If save_results=True but output_dir is None, or if video_name is None when needed | |
| TypeError: If video_input is neither string nor VideoReader | |
| """ | |
| if save_results and output_dir is None: | |
| raise ValueError("output_dir must be provided when save_results=True") | |
| # Handle different input types | |
| if isinstance(video_input, str): | |
| # Input is a file path | |
| video_path = Path(video_input) | |
| if not video_path.exists(): | |
| raise FileNotFoundError(f"Video file not found: {video_input}") | |
| try: | |
| video = decord.VideoReader(str(video_path)) | |
| except Exception as e: | |
| raise RuntimeError(f"Error loading video {video_input}: {e}") | |
| file_name = video_path.stem | |
| # elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'): | |
| else: | |
| # Input is a VideoReader object or similar | |
| video = video_input | |
| if save_results and video_name is None: | |
| raise ValueError("video_name must be provided when save_results=True and video_input is a VideoReader object") | |
| file_name = video_name or "video" | |
| # else: | |
| # raise TypeError("video_input must be either a file path (str) or a VideoReader object") | |
| result_dict = {} | |
| stats = {} | |
| # Process each frame | |
| for i in range(len(video)): | |
| try: | |
| # frame_rgb = video[i].asnumpy() | |
| frame_rgb = video[i] | |
| if hasattr(video, 'seek'): | |
| video.seek(0) | |
| bounding_boxes, landmarks = self.detect_frame_landmarks(frame_rgb) | |
| result_dict[i] = landmarks | |
| stats[i] = bounding_boxes | |
| except Exception as e: | |
| print(f"Error processing frame {i}: {e}") | |
| result_dict[i] = None | |
| stats[i] = {'#face': 0, '#hands': 0, '#pose': 0} | |
| # Save results if requested | |
| if save_results: | |
| self._save_results(file_name, result_dict, stats, output_dir) | |
| return result_dict | |
| def process_video_frames(self, frames: list, save_results: bool = False, | |
| output_dir: Optional[str] = None, video_name: str = "video") -> Dict[int, Any]: | |
| """ | |
| Process a list of frames and extract landmarks. | |
| Args: | |
| frames: List of frame images as numpy arrays | |
| save_results: Whether to save results to files | |
| output_dir: Directory to save results (required if save_results=True) | |
| video_name: Name for output files | |
| Returns: | |
| Dictionary containing landmarks for each frame | |
| """ | |
| if save_results and output_dir is None: | |
| raise ValueError("output_dir must be provided when save_results=True") | |
| result_dict = {} | |
| stats = {} | |
| # Process each frame | |
| for i, frame in enumerate(frames): | |
| try: | |
| bounding_boxes, landmarks = self.detect_frame_landmarks(frame) | |
| result_dict[i] = landmarks | |
| stats[i] = bounding_boxes | |
| except Exception as e: | |
| print(f"Error processing frame {i}: {e}") | |
| result_dict[i] = None | |
| stats[i] = {'#face': 0, '#hands': 0, '#pose': 0} | |
| # Save results if requested | |
| if save_results: | |
| self._save_results(video_name, result_dict, stats, output_dir) | |
| return result_dict | |
| def _save_results(self, video_name: str, landmarks_data: Dict, stats_data: Dict, output_dir: str): | |
| """Save landmarks and stats to JSON files.""" | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Save landmarks | |
| landmarks_file = output_path / f"{video_name}_pose.json" | |
| with open(landmarks_file, 'w') as f: | |
| json.dump(landmarks_data, f) | |
| # Save stats | |
| stats_file = output_path / f"{video_name}_stats.json" | |
| with open(stats_file, 'w') as f: | |
| json.dump(stats_data, f) | |
| def compute_video_stats(self, landmarks_data: Dict) -> Dict[str, Any]: | |
| """ | |
| Compute statistics from landmarks data. | |
| Args: | |
| landmarks_data: Dictionary containing landmarks for each frame | |
| Returns: | |
| Dictionary containing frame-by-frame stats and maximums | |
| """ | |
| stats = {} | |
| max_counts = {'#face': 0, '#hands': 0, '#pose': 0} | |
| for frame, landmarks in landmarks_data.items(): | |
| if landmarks is None: | |
| presence = {'#face': 0, '#hands': 0, '#pose': 0} | |
| else: | |
| presence = { | |
| '#face': len(landmarks.get('face_landmarks', [])) if landmarks.get('face_landmarks') else 0, | |
| '#hands': len(landmarks.get('hand_landmarks', [])) if landmarks.get('hand_landmarks') else 0, | |
| '#pose': len(landmarks.get('pose_landmarks', [])) if landmarks.get('pose_landmarks') else 0 | |
| } | |
| stats[frame] = presence | |
| # Update max counts | |
| for key in max_counts: | |
| max_counts[key] = max(max_counts[key], presence[key]) | |
| stats['max'] = max_counts | |
| return stats | |
| # Convenience function for backward compatibility and simple usage | |
| def video_holistic(video_input, face_model_path: str, hand_model_path: str, | |
| save_results: bool = False, output_dir: Optional[str] = None, | |
| video_name: Optional[str] = None) -> Dict[int, Any]: | |
| """ | |
| Convenience function to process a video and extract holistic landmarks. | |
| Args: | |
| video_input: Either a path to video file (str) or a decord.VideoReader object | |
| face_model_path: Path to the face detection model | |
| hand_model_path: Path to the hand detection model | |
| save_results: Whether to save results to files | |
| output_dir: Directory to save results | |
| video_name: Name for output files (required if save_results=True and video_input is VideoReader) | |
| Returns: | |
| Dictionary containing landmarks for each frame | |
| """ | |
| detector = HolisticDetector(face_model_path, hand_model_path) | |
| return detector.process_video(video_input, save_results, output_dir, video_name) | |
| # Utility functions for batch processing | |
| def load_file(filename: str): | |
| """Load a pickled and gzipped file.""" | |
| import pickle | |
| import gzip | |
| with gzip.open(filename, "rb") as f: | |
| return pickle.load(f) | |
| def is_string_in_file(file_path: str, target_string: str) -> bool: | |
| """Check if a string exists in a file.""" | |
| try: | |
| with Path(file_path).open("r") as f: | |
| for line in f: | |
| if target_string in line: | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return False | |
| def main(): | |
| """Main function for command-line usage.""" | |
| import argparse | |
| import time | |
| import os | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--index', type=int, required=True, | |
| help='index of the sub_list to work with') | |
| parser.add_argument('--batch_size', type=int, required=True, | |
| help='batch size') | |
| parser.add_argument('--pose_path', type=str, required=True, | |
| help='path to where the pose data will be saved') | |
| parser.add_argument('--stats_path', type=str, required=True, | |
| help='path to where the stats data will be saved') | |
| parser.add_argument('--time_limit', type=int, required=True, | |
| help='time limit') | |
| parser.add_argument('--files_list', type=str, required=True, | |
| help='files list') | |
| parser.add_argument('--problem_file_path', type=str, required=True, | |
| help='problem file path') | |
| parser.add_argument('--face_model_path', type=str, required=True, | |
| help='face model path') | |
| parser.add_argument('--hand_model_path', type=str, required=True, | |
| help='hand model path') | |
| args = parser.parse_args() | |
| start_time = time.time() | |
| # Initialize detector | |
| detector = HolisticDetector(args.face_model_path, args.hand_model_path) | |
| # Load the files list | |
| fixed_list = load_file(args.files_list) | |
| # Create folders if they do not exist | |
| Path(args.pose_path).mkdir(parents=True, exist_ok=True) | |
| Path(args.stats_path).mkdir(parents=True, exist_ok=True) | |
| # Create problem file if it doesn't exist | |
| if not os.path.exists(args.problem_file_path): | |
| with open(args.problem_file_path, 'w') as f: | |
| pass | |
| # Process videos in batches | |
| video_batches = [fixed_list[i:i + args.batch_size] for i in range(0, len(fixed_list), args.batch_size)] | |
| for video_file in video_batches[args.index]: | |
| current_time = time.time() | |
| if current_time - start_time > args.time_limit: | |
| print("Time limit reached. Stopping execution.") | |
| break | |
| # Check if output files already exist | |
| video_name = Path(video_file).stem | |
| landmark_json_path = Path(args.pose_path) / f"{video_name}_pose.json" | |
| stats_json_path = Path(args.stats_path) / f"{video_name}_stats.json" | |
| if landmark_json_path.exists() and stats_json_path.exists(): | |
| print(f"Skipping {video_file} - output files already exist") | |
| continue | |
| elif is_string_in_file(args.problem_file_path, video_file): | |
| print(f"Skipping {video_file} - found in problem file") | |
| continue | |
| else: | |
| try: | |
| print(f"Processing {video_file}") | |
| result_dict = detector.process_video( | |
| video_file_path=video_file, | |
| save_results=True, | |
| output_dir=args.pose_path | |
| ) | |
| # Also save stats separately for compatibility | |
| stats = detector.compute_video_stats(result_dict) | |
| with open(stats_json_path, 'w') as f: | |
| json.dump(stats, f) | |
| print(f"Successfully processed {video_file}") | |
| except Exception as e: | |
| print(f"Error processing {video_file}: {e}") | |
| # Add to problem file | |
| with open(args.problem_file_path, "a") as p: | |
| p.write(video_file + "\n") | |
| if __name__ == "__main__": | |
| main() |