import os import shutil from tempfile import TemporaryDirectory, NamedTemporaryFile from typing import List, Union, Optional, Tuple, Dict, Any, Generator from pathlib import Path from huggingface_hub import ( CommitOperationAdd, HfApi, ModelCard, Discussion, CommitInfo, create_repo, RepoUrl, ) from huggingface_hub.file_download import repo_folder_name from optimum.exporters.tasks import TasksManager from optimum.exporters.neuron.model_configs import * from optimum.neuron import ( NeuronModelForFeatureExtraction, NeuronModelForSentenceTransformers, NeuronModelForMaskedLM, NeuronModelForQuestionAnswering, NeuronModelForSequenceClassification, NeuronModelForTokenClassification, NeuronModelForMultipleChoice, NeuronModelForImageClassification, NeuronModelForSemanticSegmentation, NeuronModelForObjectDetection, NeuronModelForAudioClassification, NeuronModelForAudioFrameClassification, NeuronModelForCTC, NeuronModelForXVector, NeuronModelForCausalLM, NeuronModelForConditionalGeneration, ) from optimum.neuron import ( NeuronDiffusionPipelineBase, NeuronStableDiffusionPipeline, NeuronStableDiffusionImg2ImgPipeline, NeuronStableDiffusionInpaintPipeline, NeuronStableDiffusionInstructPix2PixPipeline, NeuronLatentConsistencyModelPipeline, NeuronStableDiffusionXLPipeline, NeuronStableDiffusionXLImg2ImgPipeline, NeuronStableDiffusionXLInpaintPipeline, NeuronStableDiffusionControlNetPipeline, NeuronStableDiffusionXLControlNetPipeline, NeuronPixArtAlphaPipeline, NeuronPixArtSigmaPipeline, NeuronFluxPipeline ) from optimum.neuron.cache.entries.cache_entry import ModelCacheEntry SPACES_URL = "https://huggingface.co/spaces/optimum/neuron-export" CACHE_REPO_ID = "badaoui/optimum-neuron_compile-cache" # Task to NeuronModel mapping for transformers TASK_TO_MODEL_CLASS = { "feature-extraction": NeuronModelForFeatureExtraction, "sentence-transformers": NeuronModelForSentenceTransformers, "fill-mask": NeuronModelForMaskedLM, "question-answering": NeuronModelForQuestionAnswering, "text-classification": NeuronModelForSequenceClassification, "token-classification": NeuronModelForTokenClassification, "multiple-choice": NeuronModelForMultipleChoice, "image-classification": NeuronModelForImageClassification, "semantic-segmentation": NeuronModelForSemanticSegmentation, "object-detection": NeuronModelForObjectDetection, "audio-classification": NeuronModelForAudioClassification, "audio-frame-classification": NeuronModelForAudioFrameClassification, "automatic-speech-recognition": NeuronModelForCTC, "audio-xvector": NeuronModelForXVector, "text-generation": NeuronModelForCausalLM, "text2text-generation": NeuronModelForSeq2SeqLM, } # Diffusion pipeline mapping DIFFUSION_PIPELINE_MAPPING = { "text-to-image": NeuronStableDiffusionPipeline, "image-to-image": NeuronStableDiffusionImg2ImgPipeline, "inpaint": NeuronStableDiffusionInpaintPipeline, "instruct-pix2pix": NeuronStableDiffusionInstructPix2PixPipeline, "latent-consistency": NeuronLatentConsistencyModelPipeline, "stable_diffusion": NeuronStableDiffusionPipeline, "stable-diffusion-xl": NeuronStableDiffusionXLPipeline, "stable-diffusion-xl-img2img": NeuronStableDiffusionXLImg2ImgPipeline, "stable-diffusion-xl-inpaint": NeuronStableDiffusionXLInpaintPipeline, "controlnet": NeuronStableDiffusionControlNetPipeline, "controlnet-xl": NeuronStableDiffusionXLControlNetPipeline, "pixart-alpha": NeuronPixArtAlphaPipeline, "pixart-sigma": NeuronPixArtSigmaPipeline, "flux": NeuronFluxPipeline, } def get_default_input_shapes(task_or_pipeline: str) -> Dict[str, int]: """Get default input shapes based on task type or diffusion pipeline type.""" if task_or_pipeline in ["feature-extraction", "sentence-transformers", "fill-mask", "question-answering", "text-classification", "token-classification","text-generation","text2text-generation"]: return {"batch_size": 1, "sequence_length": 128} elif task_or_pipeline == "multiple-choice": return {"batch_size": 1, "num_choices": 4, "sequence_length": 128} elif task_or_pipeline in ["image-classification", "semantic-segmentation", "object-detection"]: return {"batch_size": 1, "num_channels": 3, "height": 224, "width": 224} elif task_or_pipeline in ["audio-classification", "audio-frame-classification", "automatic-speech-recognition", "audio-xvector"]: return {"batch_size": 1, "audio_sequence_length": 16000} elif task_or_pipeline in DIFFUSION_PIPELINE_MAPPING: return {"batch_size": 1, "height": 1024, "width": 1024, "num_images_per_prompt": 1} else: # Default to text-based shapes return {"batch_size": 1, "sequence_length": 128} def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: try: discussions = api.get_repo_discussions(repo_id=model_id) except Exception: return None for discussion in discussions: if ( discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title ): return discussion return None def get_local_cache_structure(local_cache_base: str = "/var/tmp/neuron-compile-cache") -> Dict[str, List[str]]: """ Get the structure of the local Neuron cache to preserve it in the hub. Returns a dict mapping neuronxcc folders to their MODULE folders. """ cache_structure = {} if not os.path.exists(local_cache_base): return cache_structure try: for item in os.listdir(local_cache_base): item_path = os.path.join(local_cache_base, item) if os.path.isdir(item_path) and item.startswith('neuronxcc-'): modules = [] for subitem in os.listdir(item_path): subitem_path = os.path.join(item_path, subitem) if os.path.isdir(subitem_path) and subitem.startswith('MODULE_'): modules.append(subitem) if modules: cache_structure[item] = modules except Exception as e: print(f"Warning: Could not read local cache structure: {e}") return cache_structure def upload_cache_files(cache_dir: str, cache_repo_id: str, token: str) -> Generator[Union[str, CommitInfo], None, None]: """ Upload cache files to the cache repository and create PR. This is a generator function. """ try: api = HfApi(token=token) # Create cache operations cache_operations = [] for root, _, files in os.walk(cache_dir): for file in files: file_path = os.path.join(root, file) rel_path = os.path.relpath(file_path, cache_dir) cache_operations.append( CommitOperationAdd( path_in_repo=rel_path, path_or_fileobj=file_path, ) ) yield f"📤 Found {len(cache_operations)} cache files to upload." if cache_operations: # Create PR in cache repository cache_pr_title = f"Add Neuron cache for {os.path.basename(cache_dir)}" cache_commit_description = """ 🤖 Neuron Cache Bot: Adding compiled Neuron cache artifacts. This PR contains the compiled neuronxcc cache files that can be used to speed up model loading for AWS Neuron devices. """ cache_pr = api.create_commit( repo_id=cache_repo_id, operations=cache_operations, commit_message=cache_pr_title, commit_description=cache_commit_description, create_pr=True, token=token, ) yield f"✅ Cache PR created successfully: https://huggingface.co/{cache_repo_id}/discussions/{cache_pr.pr_num}" # Yield the final PR object so the caller can use it yield cache_pr else: yield "⚠️ No cache files found to upload." yield None except Exception as e: yield f"❌ Cache upload failed: {e}" raise def export_and_git_add(model_id: str, task_or_pipeline: str, model_type: str, folder: str, token: str) -> Any: if task_or_pipeline == "auto": try: task_or_pipeline = TasksManager.infer_task_from_model(model_id) except Exception as e: raise Exception(f"❌ Could not infer task for model {model_id}: {e}") yield f"📦 Exporting model `{model_id}` for task `{task_or_pipeline}`..." model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline) if model_type == "transformers" else DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline) if model_class is None: supported = list(TASK_TO_MODEL_CLASS.keys()) if model_type == "transformers" else list(DIFFUSION_PIPELINE_MAPPING.keys()) raise Exception(f"❌ Unsupported task/pipeline: {task_or_pipeline}. Supported: {supported}") input_shapes = get_default_input_shapes(task_or_pipeline) yield f"🔧 Using input shapes: {input_shapes}" try: model = model_class.from_pretrained( model_id, torch_dtype=torch.bfloat16, export=True, token=token, tensor_parallel_size=4, **input_shapes, ) model.save_pretrained(folder) yield "✅ Export completed successfully." except Exception as e: yield f"❌ Export failed with error: {e}" raise operations = [] for root, _, files in os.walk(folder): for filename in files: file_path = os.path.join(root, filename) repo_path = os.path.relpath(file_path, folder) operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_path)) yield f"📁 Found {len(operations)} files to upload" try: card = ModelCard.load(model_id, token=token) if not hasattr(card.data, "tags") or card.data.tags is None: card.data.tags = [] if "neuron" not in card.data.tags: card.data.tags.append("neuron") readme_path = os.path.join(folder, "README.md") card.save(readme_path) # Check if README.md is already in operations, if so update, else add readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None) if readme_op: readme_op.path_or_fileobj = readme_path else: operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=readme_path)) except Exception as e: yield f"⚠️ Warning: Could not update model card: {e}" yield ("__RETURN__", operations) def generate_neuron_repo_name(api, original_model_id: str, task_or_pipeline: str, token:str) -> str: """Generate a name for the Neuron-optimized repository.""" # Replace '©' with '-' and add neuron suffix requesting_user = api.whoami(token=token)["name"] base_name = original_model_id.replace('/', '-') return f"{requesting_user}/{base_name}-neuron" def create_neuron_repo_and_upload( operations: List[CommitOperationAdd], original_model_id: str, model_type: str, task_or_pipeline: str, requesting_user: str, token: str, ) -> Generator[Union[str, RepoUrl], None, None]: """ Creates a new repository with Neuron files and uploads them. """ api = HfApi(token=token) if task_or_pipeline == "auto": try: task_or_pipeline = TasksManager.infer_task_from_model(original_model_id) except Exception as e: raise Exception(f"❌ Could not infer task for model {original_model_id}: {e}") # Generate repository name neuron_repo_name = generate_neuron_repo_name(api, original_model_id, task_or_pipeline, token) yield f"🏗️ Creating new repository: {neuron_repo_name}" try: # Create the repository repo_url = create_repo( repo_id=neuron_repo_name, token=token, repo_type="model", private=False, exist_ok=True, ) yield f"✅ Repository created: {repo_url}" # Get the appropriate class name for the Python example if model_type == "transformers": model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline) else: model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline) model_class_name = model_class.__name__ if model_class else "NeuronModel" # Create enhanced model card for the Neuron repo neuron_readme_content = f"""--- tags: - neuron - optimized - aws-neuron - {task_or_pipeline} base_model: {original_model_id} --- # Neuron-Optimized {original_model_id} This repository contains AWS Neuron-optimized files for [{original_model_id}](https://huggingface.co/{original_model_id}). ## Model Details - **Base Model**: [{original_model_id}](https://huggingface.co/{original_model_id}) - **Task**: {task_or_pipeline} - **Optimization**: AWS Neuron compilation - **Generated by**: [{requesting_user}](https://huggingface.co/{requesting_user}) - **Generated using**: [Optimum Neuron Compiler Space]({SPACES_URL}) ## Usage This model has been optimized for AWS Neuron devices (Inferentia/Trainium). To use it: ```python from optimum.neuron import {model_class_name} model = {model_class_name}.from_pretrained("{neuron_repo_name}") ``` ## Performance These files are pre-compiled for AWS Neuron devices and should provide improved inference performance compared to the original model when deployed on Inferentia or Trainium instances. ## Original Model For the original model, training details, and more information, please visit: [{original_model_id}](https://huggingface.co/{original_model_id}) """ # Update the README in operations readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None) if readme_op: # Create a temporary file with the new content with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(neuron_readme_content) readme_op.path_or_fileobj = f.name else: # Add new README operation with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(neuron_readme_content) operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=f.name)) # Upload files to the new repository commit_message = f"Add Neuron-optimized files for {original_model_id}" commit_description = f""" 🤖 Neuron Export Bot: Adding AWS Neuron-optimized model files. Original model: [{original_model_id}](https://huggingface.co/{original_model_id}) Task: {task_or_pipeline} Generated by: [{requesting_user}](https://huggingface.co/{requesting_user}) Generated using: [Optimum Neuron Compiler Space]({SPACES_URL}) These files have been pre-compiled for AWS Neuron devices (Inferentia/Trainium) and should provide improved inference performance. """ yield f"📤 Uploading {len(operations)} files to {neuron_repo_name}..." commit_info = api.create_commit( repo_id=neuron_repo_name, operations=operations, commit_message=commit_message, commit_description=commit_description, token=token, ) yield f"✅ Files uploaded successfully to: https://huggingface.co/{neuron_repo_name}" yield repo_url except Exception as e: yield f"❌ Failed to create/upload to Neuron repository: {e}" raise def create_readme_pr_for_original_model( original_model_id: str, neuron_repo_name: str, task_or_pipeline: str, requesting_user: str, token: str, ) -> Generator[Union[str, CommitInfo], None, None]: """ Creates a PR on the original model repository to add a link to the Neuron-optimized version. """ api = HfApi(token=token) yield f"📝 Creating PR to add Neuron repo link in {original_model_id}..." try: # Check if there's already an open PR pr_title = "Add link to Neuron-optimized version" existing_pr = previous_pr(api, original_model_id, pr_title) if existing_pr: yield f"⚠️ PR already exists: https://huggingface.co/{original_model_id}/discussions/{existing_pr.num}" return # Get the current README try: current_readme_path = api.hf_hub_download( repo_id=original_model_id, filename="README.md", token=token, ) with open(current_readme_path, 'r', encoding='utf-8') as f: readme_content = f.read() except Exception: # If README doesn't exist, create a basic one readme_content = f"# {original_model_id}\n\n" # Add Neuron optimization section, separated by a horizontal rule neuron_section = f""" --- ## 🚀 AWS Neuron Optimized Version Available A Neuron-optimized version of this model is available for improved performance on AWS Inferentia/Trainium instances: **[{neuron_repo_name}](https://huggingface.co/{neuron_repo_name})** The Neuron-optimized version provides: - Pre-compiled artifacts for faster loading - Optimized performance on AWS Neuron devices - Same model capabilities with improved inference speed """ # Append the Neuron section to the end of the README updated_readme = readme_content.rstrip() + "\n" + neuron_section # Create temporary file with updated README with NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding="utf-8") as f: f.write(updated_readme) temp_readme_path = f.name # Create the PR operations = [CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=temp_readme_path)] commit_description = f""" 🤖 Neuron Export Bot: Adding link to Neuron-optimized version. A Neuron-optimized version of this model has been created at [{neuron_repo_name}](https://huggingface.co/{neuron_repo_name}). The optimized version provides improved performance on AWS Inferentia/Trainium instances with pre-compiled artifacts. Generated by: [{requesting_user}](https://huggingface.co/{requesting_user}) Generated using: [Optimum Neuron Compiler Space]({SPACES_URL}) """ pr = api.create_commit( repo_id=original_model_id, operations=operations, commit_message=pr_title, commit_description=commit_description, create_pr=True, token=token, ) yield f"✅ README PR created: https://huggingface.co/{original_model_id}/discussions/{pr.pr_num}" yield pr # Clean up temporary file os.unlink(temp_readme_path) except Exception as e: yield f"❌ Failed to create README PR: {e}" raise # --- Updated upload_to_custom_repo function (unchanged) --- def upload_to_custom_repo( operations: List[CommitOperationAdd], custom_repo_id: str, original_model_id: str, requesting_user: str, token: str, ) -> Generator[Union[str, CommitInfo], None, None]: """ Uploads neuron files to a custom repository and creates a PR. """ yield f"📤 Preparing to upload to custom repo: {custom_repo_id}" api = HfApi(token=token) try: # Ensure the custom repo exists api.repo_info(repo_id=custom_repo_id, repo_type="model") except Exception as e: yield f"❌ Could not access custom repository `{custom_repo_id}`. Please ensure it exists and you have write access. Error: {e}" raise pr_title = f"Add Neuron-optimized files for {original_model_id}" commit_description = f""" 🤖 Neuron Export Bot: On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), adding AWS Neuron-optimized model files for `{original_model_id}`. These files were generated using the [Optimum Neuron Compiler Space](https://huggingface.co/spaces/optimum/neuron-export). """ try: custom_pr = api.create_commit( repo_id=custom_repo_id, operations=operations, commit_message=pr_title, commit_description=commit_description, create_pr=True, token=token, ) yield f"✅ Custom PR created successfully: https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}" yield custom_pr except Exception as e: yield f"❌ Failed to create PR in custom repository: {e}" raise def convert( api: "HfApi", model_id: str, task_or_pipeline: str, model_type: str = "transformers", force: bool = False, token: str = None, pr_options: Dict = None, ) -> Generator[Tuple[str, Any], None, None]: if pr_options is None: pr_options = {} info = api.model_info(model_id, token=token) filenames = {s.rfilename for s in info.siblings} requesting_user = api.whoami(token=token)["name"] if not any(pr_options.values()): yield "1", "⚠️ No option selected. Please choose at least one option." return if pr_options.get("create_custom_pr") and not pr_options.get("custom_repo_id"): yield "1", "⚠️ Custom PR selected but no repository ID was provided." return yield "0", f"🚀 Starting export process with options: {pr_options}..." with TemporaryDirectory() as temp_dir: export_folder = os.path.join(temp_dir, "export") cache_mirror_dir = os.path.join(temp_dir, "cache_mirror") os.makedirs(export_folder, exist_ok=True) os.makedirs(cache_mirror_dir, exist_ok=True) result_info = {} try: # --- Export Logic --- export_gen = export_and_git_add(model_id, task_or_pipeline, model_type, export_folder, token=token) operations = None for message in export_gen: if isinstance(message, tuple) and message[0] == "__RETURN__": operations = message[1] break else: yield "0", message if not operations: raise Exception("Export process did not produce any files to commit.") # --- Cache Handling --- cache_files_available = False if pr_options.get("create_cache_pr"): yield "0", "Checking for local cache files..." local_cache_structure = get_local_cache_structure() yield "0", f"🗂️ Found cache structure: {len(local_cache_structure)} neuronxcc folders" if local_cache_structure: cache_files_available = True local_cache_base = "/var/tmp/neuron-compile-cache" # Copy cache files to a temporary mirror directory for upload shutil.copytree(local_cache_base, cache_mirror_dir, dirs_exist_ok=True) yield "0", "Copied cache files to a temporary location for upload." # --- New Repository Creation (Replaces Model PR) --- if pr_options.get("create_neuron_repo"): yield "0", "🏗️ Creating new Neuron-optimized repository..." neuron_repo_url = None # Generate the repo name first so we can use it consistently neuron_repo_name = generate_neuron_repo_name(api, model_id, task_or_pipeline, token) repo_creation_gen = create_neuron_repo_and_upload( operations, model_id, model_type, task_or_pipeline, requesting_user, token ) for msg in repo_creation_gen: if isinstance(msg, str): yield "0", msg else: neuron_repo_url = msg result_info["neuron_repo"] = f"https://huggingface.co/{neuron_repo_name}" # Automatically create a PR on the original model to add a link yield "0", "📝 Creating PR to add Neuron repo link to original model..." readme_pr = None readme_pr_gen = create_readme_pr_for_original_model( model_id, neuron_repo_name, task_or_pipeline, requesting_user, token ) for msg in readme_pr_gen: if isinstance(msg, str): yield "0", msg else: readme_pr = msg if readme_pr: result_info["readme_pr"] = f"https://huggingface.co/{model_id}/discussions/{readme_pr.pr_num}" # --- Cache Repository PR --- if pr_options.get("create_cache_pr"): if cache_files_available: yield "0", "📤 Creating PR in cache repository..." cache_pr = None cache_upload_gen = upload_cache_files(cache_mirror_dir, CACHE_REPO_ID, token) for msg in cache_upload_gen: if isinstance(msg, str): yield "0", msg else: cache_pr = msg if cache_pr: result_info["cache_pr"] = f"https://huggingface.co/{CACHE_REPO_ID}/discussions/{cache_pr.pr_num}" else: yield "0", "⚠️ No new cache files were generated to upload." # --- Custom Repository PR --- if pr_options.get("create_custom_pr"): custom_repo_id = pr_options["custom_repo_id"] yield "0", f"📤 Creating PR in custom repository: {custom_repo_id}..." custom_pr = None custom_upload_gen = upload_to_custom_repo(operations, custom_repo_id, model_id, requesting_user, token) for msg in custom_upload_gen: if isinstance(msg, str): yield "0", msg else: custom_pr = msg if custom_pr: result_info["custom_pr"] = f"https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}" yield "0", result_info except Exception as e: yield "1", f"❌ Conversion failed with a critical error: {e}" # Re-raise the exception to be caught by the outer try-except in the Gradio app if needed raise def list_cached_models(cache_repo_id: str, token: str = None) -> Dict[str, List[str]]: """ List all cached neuronxcc folders in the repository. """ try: api = HfApi(token=token) repo_files = api.list_repo_files(cache_repo_id, token=token) # Group files by neuronxcc folder neuronxcc_cache = {} for file_path in repo_files: # Extract neuronxcc folder from path parts = file_path.split('/') if len(parts) >= 3 and parts[0].startswith('neuronxcc-'): neuronxcc_folder = parts[0] module_folder = parts[1] if neuronxcc_folder not in neuronxcc_cache: neuronxcc_cache[neuronxcc_folder] = set() neuronxcc_cache[neuronxcc_folder].add(module_folder) # Convert sets to lists return {k: list(v) for k, v in neuronxcc_cache.items()} except Exception as e: print(f"Failed to list cached models: {e}") return {}