Spaces:
Running
Running
| from huggingface_hub import HfApi, hf_hub_download | |
| from typing import Dict, Optional | |
| import json | |
| import os | |
| import logging | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class SimpleMemoryCalculator: | |
| def __init__(self): | |
| logger.info("Initializing SimpleMemoryCalculator") | |
| try: | |
| self.hf_api = HfApi() | |
| logger.debug("HuggingFace API initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize HuggingFace API: {e}") | |
| raise | |
| self.cache = {} | |
| # Known model memory requirements (in GB for FP16) | |
| self.known_models = { | |
| "black-forest-labs/FLUX.1-schnell": { | |
| "params_billions": 12.0, | |
| "fp16_gb": 24.0, | |
| "inference_fp16_gb": 36.0 | |
| }, | |
| "black-forest-labs/FLUX.1-dev": { | |
| "params_billions": 12.0, | |
| "fp16_gb": 24.0, | |
| "inference_fp16_gb": 36.0 | |
| }, | |
| "stabilityai/stable-diffusion-xl-base-1.0": { | |
| "params_billions": 3.5, | |
| "fp16_gb": 7.0, | |
| "inference_fp16_gb": 12.0 | |
| }, | |
| "runwayml/stable-diffusion-v1-5": { | |
| "params_billions": 0.86, | |
| "fp16_gb": 1.7, | |
| "inference_fp16_gb": 4.0 | |
| } | |
| } | |
| logger.debug(f"Known models in database: {len(self.known_models)}") | |
| def get_model_memory_requirements(self, model_id: str) -> Dict: | |
| """ | |
| Get memory requirements for a model, using known values or estimating from file sizes. | |
| """ | |
| logger.info(f"Getting memory requirements for model: {model_id}") | |
| if model_id in self.cache: | |
| logger.debug(f"Using cached memory data for {model_id}") | |
| return self.cache[model_id] | |
| # Check if we have known values | |
| if model_id in self.known_models: | |
| logger.info(f"Using known memory data for {model_id}") | |
| known = self.known_models[model_id] | |
| logger.debug(f"Known data: {known}") | |
| result = { | |
| 'model_id': model_id, | |
| 'total_params': int(known['params_billions'] * 1e9), | |
| 'total_params_billions': known['params_billions'], | |
| 'memory_fp32_gb': known['fp16_gb'] * 2, | |
| 'memory_fp16_gb': known['fp16_gb'], | |
| 'memory_bf16_gb': known['fp16_gb'], | |
| 'memory_int8_gb': known['fp16_gb'] / 2, | |
| 'estimated_inference_memory_fp16_gb': known['inference_fp16_gb'], | |
| 'estimated_inference_memory_bf16_gb': known['inference_fp16_gb'], | |
| 'source': 'known_values' | |
| } | |
| self.cache[model_id] = result | |
| return result | |
| # Try to estimate from HuggingFace API | |
| try: | |
| return self._estimate_from_api(model_id) | |
| except Exception as e: | |
| # Fallback to generic estimation | |
| return self._generic_estimation(model_id, str(e)) | |
| def _estimate_from_api(self, model_id: str) -> Dict: | |
| """Estimate memory from HuggingFace model info.""" | |
| try: | |
| print(f"Fetching model info for: {model_id}") | |
| model_info = self.hf_api.model_info(model_id) | |
| print(f"Successfully fetched model info for: {model_id}") | |
| # Get file sizes from model repo | |
| total_size_bytes = 0 | |
| safetensor_files = [] | |
| files_without_size = 0 | |
| for sibling in model_info.siblings: | |
| if sibling.rfilename.endswith('.safetensors'): | |
| file_size_bytes = sibling.size | |
| if file_size_bytes is None or file_size_bytes == 0: | |
| files_without_size += 1 | |
| print(f"Warning: No size info for {sibling.rfilename}") | |
| # Try to estimate based on typical safetensor file sizes | |
| if 'unet' in sibling.rfilename.lower(): | |
| file_size_bytes = 3_400_000_000 # ~3.4GB typical for UNet | |
| elif 'text_encoder' in sibling.rfilename.lower(): | |
| file_size_bytes = 500_000_000 # ~500MB typical for text encoder | |
| elif 'vae' in sibling.rfilename.lower(): | |
| file_size_bytes = 160_000_000 # ~160MB typical for VAE | |
| else: | |
| file_size_bytes = 500_000_000 # Default fallback | |
| print(f" β Using estimated size: {file_size_bytes / (1024**3):.2f} GB") | |
| else: | |
| print(f"File {sibling.rfilename}: {file_size_bytes / (1024**3):.2f} GB") | |
| size_mb = file_size_bytes / (1024 * 1024) | |
| safetensor_files.append({ | |
| 'filename': sibling.rfilename, | |
| 'size_mb': size_mb, | |
| 'estimated': file_size_bytes != sibling.size | |
| }) | |
| total_size_bytes += file_size_bytes | |
| print(f"Found {len(safetensor_files)} safetensor files, total size: {total_size_bytes / (1024**3):.2f} GB") | |
| if files_without_size > 0: | |
| print(f"Warning: {files_without_size} files had no size info, used estimates") | |
| # Estimate parameters from file size (assuming FP16) | |
| total_size_gb = total_size_bytes / (1024**3) | |
| estimated_params = int((total_size_bytes / 2)) # 2 bytes per param for FP16 | |
| estimated_params_billions = estimated_params / 1e9 | |
| # Estimate inference memory (model + activations) | |
| inference_multiplier = 1.5 # Conservative estimate | |
| estimated_inference_memory = total_size_gb * inference_multiplier | |
| result = { | |
| 'model_id': model_id, | |
| 'total_params': estimated_params, | |
| 'total_params_billions': estimated_params_billions, | |
| 'memory_fp32_gb': total_size_gb * 2, | |
| 'memory_fp16_gb': total_size_gb, | |
| 'memory_bf16_gb': total_size_gb, | |
| 'memory_int8_gb': total_size_gb / 2, | |
| 'estimated_inference_memory_fp16_gb': estimated_inference_memory, | |
| 'estimated_inference_memory_bf16_gb': estimated_inference_memory, | |
| 'safetensors_files': safetensor_files, | |
| 'files_without_size': files_without_size, | |
| 'source': 'api_estimation' | |
| } | |
| self.cache[model_id] = result | |
| logger.info(f"Successfully estimated memory for {model_id} via API") | |
| logger.debug(f"API estimation result: {result}") | |
| return result | |
| except Exception as api_error: | |
| logger.error(f"API Error for model {model_id}: {type(api_error).__name__}: {str(api_error)}") | |
| # Re-raise with more context | |
| raise Exception(f"HuggingFace API Error: {type(api_error).__name__}: {str(api_error)}") | |
| def _generic_estimation(self, model_id: str, error_msg: str) -> Dict: | |
| """Generic fallback estimation.""" | |
| logger.warning(f"Using generic estimation for {model_id} due to: {error_msg}") | |
| # Default to medium-sized model estimates | |
| default_params_billions = 3.0 | |
| default_fp16_gb = 6.0 | |
| logger.debug(f"Generic estimation parameters: {default_params_billions}B params, {default_fp16_gb}GB FP16") | |
| result = { | |
| 'model_id': model_id, | |
| 'total_params': int(default_params_billions * 1e9), | |
| 'total_params_billions': default_params_billions, | |
| 'memory_fp32_gb': default_fp16_gb * 2, | |
| 'memory_fp16_gb': default_fp16_gb, | |
| 'memory_bf16_gb': default_fp16_gb, | |
| 'memory_int8_gb': default_fp16_gb / 2, | |
| 'estimated_inference_memory_fp16_gb': default_fp16_gb * 1.5, | |
| 'estimated_inference_memory_bf16_gb': default_fp16_gb * 1.5, | |
| 'source': 'generic_fallback', | |
| 'error': error_msg | |
| } | |
| logger.info(f"Generic estimation completed for {model_id}") | |
| return result | |
| def get_memory_recommendation(self, model_id: str, available_vram_gb: float) -> Dict: | |
| """Get memory recommendations based on available VRAM.""" | |
| logger.info(f"Generating memory recommendations for {model_id} with {available_vram_gb}GB VRAM") | |
| memory_info = self.get_model_memory_requirements(model_id) | |
| recommendations = { | |
| 'model_id': model_id, | |
| 'available_vram_gb': available_vram_gb, | |
| 'model_memory_fp16_gb': memory_info['memory_fp16_gb'], | |
| 'estimated_inference_memory_fp16_gb': memory_info['estimated_inference_memory_fp16_gb'], | |
| 'recommendations': [] | |
| } | |
| inference_memory_fp16 = memory_info['estimated_inference_memory_fp16_gb'] | |
| model_memory_fp16 = memory_info['memory_fp16_gb'] | |
| logger.debug(f"Model memory: {model_memory_fp16}GB, Inference memory: {inference_memory_fp16}GB") | |
| # Determine recommendations | |
| if available_vram_gb >= inference_memory_fp16: | |
| recommendations['recommendations'].append("β Full model can fit in VRAM") | |
| recommendations['recommended_precision'] = 'float16' | |
| recommendations['cpu_offload'] = False | |
| recommendations['attention_slicing'] = False | |
| elif available_vram_gb >= model_memory_fp16: | |
| recommendations['recommendations'].append("β οΈ Model weights fit, enable memory optimizations") | |
| recommendations['recommended_precision'] = 'float16' | |
| recommendations['cpu_offload'] = False | |
| recommendations['attention_slicing'] = True | |
| recommendations['vae_slicing'] = True | |
| elif available_vram_gb >= model_memory_fp16 * 0.7: | |
| recommendations['recommendations'].append("π Use CPU offloading for some components") | |
| recommendations['recommended_precision'] = 'float16' | |
| recommendations['cpu_offload'] = True | |
| recommendations['attention_slicing'] = True | |
| recommendations['vae_slicing'] = True | |
| else: | |
| recommendations['recommendations'].append("π Requires sequential CPU offloading") | |
| recommendations['recommended_precision'] = 'float16' | |
| recommendations['sequential_offload'] = True | |
| recommendations['attention_slicing'] = True | |
| recommendations['vae_slicing'] = True | |
| return recommendations | |
| def format_memory_info(self, model_id: str) -> str: | |
| """Format memory information for display.""" | |
| info = self.get_model_memory_requirements(model_id) | |
| source_text = { | |
| 'known_values': 'π Known model specifications', | |
| 'api_estimation': 'π Estimated from model files', | |
| 'generic_fallback': 'β οΈ Generic estimation (API error)' | |
| }.get(info.get('source', 'unknown'), 'β Unknown source') | |
| # Add warning if file sizes were estimated | |
| if info.get('files_without_size', 0) > 0: | |
| source_text += f" (β οΈ {info['files_without_size']} files used size estimates)" | |
| output = f""" | |
| π€ **Memory Analysis for {model_id}** | |
| {source_text} | |
| π’ **Parameters**: {info['total_params_billions']:.1f}B parameters | |
| πΎ **Model Memory Requirements**: | |
| β’ FP32: {info['memory_fp32_gb']:.1f} GB | |
| β’ FP16/BF16: {info['memory_fp16_gb']:.1f} GB | |
| β’ INT8: {info['memory_int8_gb']:.1f} GB | |
| π **Estimated Inference Memory**: | |
| β’ FP16: {info['estimated_inference_memory_fp16_gb']:.1f} GB | |
| β’ BF16: {info['estimated_inference_memory_bf16_gb']:.1f} GB | |
| """ | |
| if 'error' in info: | |
| output += f"\nβ οΈ **Note**: {info['error']}" | |
| return output.strip() | |
| # Quick test | |
| if __name__ == "__main__": | |
| calc = SimpleMemoryCalculator() | |
| models = [ | |
| "black-forest-labs/FLUX.1-schnell", | |
| "stabilityai/stable-diffusion-xl-base-1.0", | |
| "runwayml/stable-diffusion-v1-5" | |
| ] | |
| for model in models: | |
| print(f"\n{'='*60}") | |
| print(calc.format_memory_info(model)) | |
| # Test recommendations | |
| for vram in [8, 16, 24]: | |
| rec = calc.get_memory_recommendation(model, vram) | |
| print(f"\nπ‘ {vram}GB VRAM: {rec['recommendations'][0]}") |