neuron-export / optimum_neuron_export.py
badaoui's picture
badaoui HF Staff
Update optimum_neuron_export.py
900a193 verified
raw
history blame
28.2 kB
import os
import shutil
from tempfile import TemporaryDirectory, NamedTemporaryFile
from typing import List, Union, Optional, Tuple, Dict, Any, Generator
from pathlib import Path
from huggingface_hub import (
CommitOperationAdd,
HfApi,
ModelCard,
Discussion,
CommitInfo,
create_repo,
RepoUrl,
)
from huggingface_hub.file_download import repo_folder_name
from optimum.exporters.tasks import TasksManager
from optimum.exporters.neuron.model_configs import *
from optimum.neuron import (
NeuronModelForFeatureExtraction,
NeuronModelForSentenceTransformers,
NeuronModelForMaskedLM,
NeuronModelForQuestionAnswering,
NeuronModelForSequenceClassification,
NeuronModelForTokenClassification,
NeuronModelForMultipleChoice,
NeuronModelForImageClassification,
NeuronModelForSemanticSegmentation,
NeuronModelForObjectDetection,
NeuronModelForAudioClassification,
NeuronModelForAudioFrameClassification,
NeuronModelForCTC,
NeuronModelForXVector,
NeuronModelForCausalLM,
NeuronModelForConditionalGeneration,
)
from optimum.neuron import (
NeuronDiffusionPipelineBase,
NeuronStableDiffusionPipeline,
NeuronStableDiffusionImg2ImgPipeline,
NeuronStableDiffusionInpaintPipeline,
NeuronStableDiffusionInstructPix2PixPipeline,
NeuronLatentConsistencyModelPipeline,
NeuronStableDiffusionXLPipeline,
NeuronStableDiffusionXLImg2ImgPipeline,
NeuronStableDiffusionXLInpaintPipeline,
NeuronStableDiffusionControlNetPipeline,
NeuronStableDiffusionXLControlNetPipeline,
NeuronPixArtAlphaPipeline,
NeuronPixArtSigmaPipeline,
NeuronFluxPipeline
)
from optimum.neuron.cache.entries.cache_entry import ModelCacheEntry
SPACES_URL = "https://huggingface.co/spaces/optimum/neuron-export"
CACHE_REPO_ID = "badaoui/optimum-neuron_compile-cache"
# Task to NeuronModel mapping for transformers
TASK_TO_MODEL_CLASS = {
"feature-extraction": NeuronModelForFeatureExtraction,
"sentence-transformers": NeuronModelForSentenceTransformers,
"fill-mask": NeuronModelForMaskedLM,
"question-answering": NeuronModelForQuestionAnswering,
"text-classification": NeuronModelForSequenceClassification,
"token-classification": NeuronModelForTokenClassification,
"multiple-choice": NeuronModelForMultipleChoice,
"image-classification": NeuronModelForImageClassification,
"semantic-segmentation": NeuronModelForSemanticSegmentation,
"object-detection": NeuronModelForObjectDetection,
"audio-classification": NeuronModelForAudioClassification,
"audio-frame-classification": NeuronModelForAudioFrameClassification,
"automatic-speech-recognition": NeuronModelForCTC,
"audio-xvector": NeuronModelForXVector,
"text-generation": NeuronModelForCausalLM,
"text2text-generation": NeuronModelForSeq2SeqLM,
}
# Diffusion pipeline mapping
DIFFUSION_PIPELINE_MAPPING = {
"text-to-image": NeuronStableDiffusionPipeline,
"image-to-image": NeuronStableDiffusionImg2ImgPipeline,
"inpaint": NeuronStableDiffusionInpaintPipeline,
"instruct-pix2pix": NeuronStableDiffusionInstructPix2PixPipeline,
"latent-consistency": NeuronLatentConsistencyModelPipeline,
"stable_diffusion": NeuronStableDiffusionPipeline,
"stable-diffusion-xl": NeuronStableDiffusionXLPipeline,
"stable-diffusion-xl-img2img": NeuronStableDiffusionXLImg2ImgPipeline,
"stable-diffusion-xl-inpaint": NeuronStableDiffusionXLInpaintPipeline,
"controlnet": NeuronStableDiffusionControlNetPipeline,
"controlnet-xl": NeuronStableDiffusionXLControlNetPipeline,
"pixart-alpha": NeuronPixArtAlphaPipeline,
"pixart-sigma": NeuronPixArtSigmaPipeline,
"flux": NeuronFluxPipeline,
}
def get_default_input_shapes(task_or_pipeline: str) -> Dict[str, int]:
"""Get default input shapes based on task type or diffusion pipeline type."""
if task_or_pipeline in ["feature-extraction", "sentence-transformers", "fill-mask", "question-answering", "text-classification", "token-classification","text-generation","text2text-generation"]:
return {"batch_size": 1, "sequence_length": 128}
elif task_or_pipeline == "multiple-choice":
return {"batch_size": 1, "num_choices": 4, "sequence_length": 128}
elif task_or_pipeline in ["image-classification", "semantic-segmentation", "object-detection"]:
return {"batch_size": 1, "num_channels": 3, "height": 224, "width": 224}
elif task_or_pipeline in ["audio-classification", "audio-frame-classification", "automatic-speech-recognition", "audio-xvector"]:
return {"batch_size": 1, "audio_sequence_length": 16000}
elif task_or_pipeline in DIFFUSION_PIPELINE_MAPPING:
return {"batch_size": 1, "height": 1024, "width": 1024, "num_images_per_prompt": 1}
else:
# Default to text-based shapes
return {"batch_size": 1, "sequence_length": 128}
def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]:
try:
discussions = api.get_repo_discussions(repo_id=model_id)
except Exception:
return None
for discussion in discussions:
if (
discussion.status == "open"
and discussion.is_pull_request
and discussion.title == pr_title
):
return discussion
return None
def get_local_cache_structure(local_cache_base: str = "/var/tmp/neuron-compile-cache") -> Dict[str, List[str]]:
"""
Get the structure of the local Neuron cache to preserve it in the hub.
Returns a dict mapping neuronxcc folders to their MODULE folders.
"""
cache_structure = {}
if not os.path.exists(local_cache_base):
return cache_structure
try:
for item in os.listdir(local_cache_base):
item_path = os.path.join(local_cache_base, item)
if os.path.isdir(item_path) and item.startswith('neuronxcc-'):
modules = []
for subitem in os.listdir(item_path):
subitem_path = os.path.join(item_path, subitem)
if os.path.isdir(subitem_path) and subitem.startswith('MODULE_'):
modules.append(subitem)
if modules:
cache_structure[item] = modules
except Exception as e:
print(f"Warning: Could not read local cache structure: {e}")
return cache_structure
def upload_cache_files(cache_dir: str, cache_repo_id: str, token: str) -> Generator[Union[str, CommitInfo], None, None]:
"""
Upload cache files to the cache repository and create PR.
This is a generator function.
"""
try:
api = HfApi(token=token)
# Create cache operations
cache_operations = []
for root, _, files in os.walk(cache_dir):
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, cache_dir)
cache_operations.append(
CommitOperationAdd(
path_in_repo=rel_path,
path_or_fileobj=file_path,
)
)
yield f"πŸ“€ Found {len(cache_operations)} cache files to upload."
if cache_operations:
# Create PR in cache repository
cache_pr_title = f"Add Neuron cache for {os.path.basename(cache_dir)}"
cache_commit_description = """
πŸ€– Neuron Cache Bot: Adding compiled Neuron cache artifacts.
This PR contains the compiled neuronxcc cache files that can be used to speed up model loading for AWS Neuron devices.
"""
cache_pr = api.create_commit(
repo_id=cache_repo_id,
operations=cache_operations,
commit_message=cache_pr_title,
commit_description=cache_commit_description,
create_pr=True,
token=token,
)
yield f"βœ… Cache PR created successfully: https://huggingface.co/{cache_repo_id}/discussions/{cache_pr.pr_num}"
# Yield the final PR object so the caller can use it
yield cache_pr
else:
yield "⚠️ No cache files found to upload."
yield None
except Exception as e:
yield f"❌ Cache upload failed: {e}"
raise
def export_and_git_add(model_id: str, task_or_pipeline: str, model_type: str, folder: str, token: str) -> Any:
if task_or_pipeline == "auto":
try:
task_or_pipeline = TasksManager.infer_task_from_model(model_id)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {model_id}: {e}")
yield f"πŸ“¦ Exporting model `{model_id}` for task `{task_or_pipeline}`..."
model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline) if model_type == "transformers" else DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline)
if model_class is None:
supported = list(TASK_TO_MODEL_CLASS.keys()) if model_type == "transformers" else list(DIFFUSION_PIPELINE_MAPPING.keys())
raise Exception(f"❌ Unsupported task/pipeline: {task_or_pipeline}. Supported: {supported}")
input_shapes = get_default_input_shapes(task_or_pipeline)
yield f"πŸ”§ Using input shapes: {input_shapes}"
try:
model = model_class.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
export=True,
token=token,
tensor_parallel_size=4,
**input_shapes,
)
model.save_pretrained(folder)
yield "βœ… Export completed successfully."
except Exception as e:
yield f"❌ Export failed with error: {e}"
raise
operations = []
for root, _, files in os.walk(folder):
for filename in files:
file_path = os.path.join(root, filename)
repo_path = os.path.relpath(file_path, folder)
operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_path))
yield f"πŸ“ Found {len(operations)} files to upload"
try:
card = ModelCard.load(model_id, token=token)
if not hasattr(card.data, "tags") or card.data.tags is None:
card.data.tags = []
if "neuron" not in card.data.tags:
card.data.tags.append("neuron")
readme_path = os.path.join(folder, "README.md")
card.save(readme_path)
# Check if README.md is already in operations, if so update, else add
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
readme_op.path_or_fileobj = readme_path
else:
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=readme_path))
except Exception as e:
yield f"⚠️ Warning: Could not update model card: {e}"
yield ("__RETURN__", operations)
def generate_neuron_repo_name(api, original_model_id: str, task_or_pipeline: str, token:str) -> str:
"""Generate a name for the Neuron-optimized repository."""
# Replace 'Β©' with '-' and add neuron suffix
requesting_user = api.whoami(token=token)["name"]
base_name = original_model_id.replace('/', '-')
return f"{requesting_user}/{base_name}-neuron"
def create_neuron_repo_and_upload(
operations: List[CommitOperationAdd],
original_model_id: str,
model_type: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, RepoUrl], None, None]:
"""
Creates a new repository with Neuron files and uploads them.
"""
api = HfApi(token=token)
if task_or_pipeline == "auto":
try:
task_or_pipeline = TasksManager.infer_task_from_model(original_model_id)
except Exception as e:
raise Exception(f"❌ Could not infer task for model {original_model_id}: {e}")
# Generate repository name
neuron_repo_name = generate_neuron_repo_name(api, original_model_id, task_or_pipeline, token)
yield f"πŸ—οΈ Creating new repository: {neuron_repo_name}"
try:
# Create the repository
repo_url = create_repo(
repo_id=neuron_repo_name,
token=token,
repo_type="model",
private=False,
exist_ok=True,
)
yield f"βœ… Repository created: {repo_url}"
# Get the appropriate class name for the Python example
if model_type == "transformers":
model_class = TASK_TO_MODEL_CLASS.get(task_or_pipeline)
else:
model_class = DIFFUSION_PIPELINE_MAPPING.get(task_or_pipeline)
model_class_name = model_class.__name__ if model_class else "NeuronModel"
# Create enhanced model card for the Neuron repo
neuron_readme_content = f"""---
tags:
- neuron
- optimized
- aws-neuron
- {task_or_pipeline}
base_model: {original_model_id}
---
# Neuron-Optimized {original_model_id}
This repository contains AWS Neuron-optimized files for [{original_model_id}](https://huggingface.co/{original_model_id}).
## Model Details
- **Base Model**: [{original_model_id}](https://huggingface.co/{original_model_id})
- **Task**: {task_or_pipeline}
- **Optimization**: AWS Neuron compilation
- **Generated by**: [{requesting_user}](https://huggingface.co/{requesting_user})
- **Generated using**: [Optimum Neuron Compiler Space]({SPACES_URL})
## Usage
This model has been optimized for AWS Neuron devices (Inferentia/Trainium). To use it:
```python
from optimum.neuron import {model_class_name}
model = {model_class_name}.from_pretrained("{neuron_repo_name}")
```
## Performance
These files are pre-compiled for AWS Neuron devices and should provide improved inference performance compared to the original model when deployed on Inferentia or Trainium instances.
## Original Model
For the original model, training details, and more information, please visit: [{original_model_id}](https://huggingface.co/{original_model_id})
"""
# Update the README in operations
readme_op = next((op for op in operations if op.path_in_repo == "README.md"), None)
if readme_op:
# Create a temporary file with the new content
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
readme_op.path_or_fileobj = f.name
else:
# Add new README operation
with NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(neuron_readme_content)
operations.append(CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=f.name))
# Upload files to the new repository
commit_message = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
πŸ€– Neuron Export Bot: Adding AWS Neuron-optimized model files.
Original model: [{original_model_id}](https://huggingface.co/{original_model_id})
Task: {task_or_pipeline}
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
These files have been pre-compiled for AWS Neuron devices (Inferentia/Trainium) and should provide improved inference performance.
"""
yield f"πŸ“€ Uploading {len(operations)} files to {neuron_repo_name}..."
commit_info = api.create_commit(
repo_id=neuron_repo_name,
operations=operations,
commit_message=commit_message,
commit_description=commit_description,
token=token,
)
yield f"βœ… Files uploaded successfully to: https://huggingface.co/{neuron_repo_name}"
yield repo_url
except Exception as e:
yield f"❌ Failed to create/upload to Neuron repository: {e}"
raise
def create_readme_pr_for_original_model(
original_model_id: str,
neuron_repo_name: str,
task_or_pipeline: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""
Creates a PR on the original model repository to add a link to the Neuron-optimized version.
"""
api = HfApi(token=token)
yield f"πŸ“ Creating PR to add Neuron repo link in {original_model_id}..."
try:
# Check if there's already an open PR
pr_title = "Add link to Neuron-optimized version"
existing_pr = previous_pr(api, original_model_id, pr_title)
if existing_pr:
yield f"⚠️ PR already exists: https://huggingface.co/{original_model_id}/discussions/{existing_pr.num}"
return
# Get the current README
try:
current_readme_path = api.hf_hub_download(
repo_id=original_model_id,
filename="README.md",
token=token,
)
with open(current_readme_path, 'r', encoding='utf-8') as f:
readme_content = f.read()
except Exception:
# If README doesn't exist, create a basic one
readme_content = f"# {original_model_id}\n\n"
# Add Neuron optimization section, separated by a horizontal rule
neuron_section = f"""
---
## πŸš€ AWS Neuron Optimized Version Available
A Neuron-optimized version of this model is available for improved performance on AWS Inferentia/Trainium instances:
**[{neuron_repo_name}](https://huggingface.co/{neuron_repo_name})**
The Neuron-optimized version provides:
- Pre-compiled artifacts for faster loading
- Optimized performance on AWS Neuron devices
- Same model capabilities with improved inference speed
"""
# Append the Neuron section to the end of the README
updated_readme = readme_content.rstrip() + "\n" + neuron_section
# Create temporary file with updated README
with NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding="utf-8") as f:
f.write(updated_readme)
temp_readme_path = f.name
# Create the PR
operations = [CommitOperationAdd(path_in_repo="README.md", path_or_fileobj=temp_readme_path)]
commit_description = f"""
πŸ€– Neuron Export Bot: Adding link to Neuron-optimized version.
A Neuron-optimized version of this model has been created at [{neuron_repo_name}](https://huggingface.co/{neuron_repo_name}).
The optimized version provides improved performance on AWS Inferentia/Trainium instances with pre-compiled artifacts.
Generated by: [{requesting_user}](https://huggingface.co/{requesting_user})
Generated using: [Optimum Neuron Compiler Space]({SPACES_URL})
"""
pr = api.create_commit(
repo_id=original_model_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"βœ… README PR created: https://huggingface.co/{original_model_id}/discussions/{pr.pr_num}"
yield pr
# Clean up temporary file
os.unlink(temp_readme_path)
except Exception as e:
yield f"❌ Failed to create README PR: {e}"
raise
# --- Updated upload_to_custom_repo function (unchanged) ---
def upload_to_custom_repo(
operations: List[CommitOperationAdd],
custom_repo_id: str,
original_model_id: str,
requesting_user: str,
token: str,
) -> Generator[Union[str, CommitInfo], None, None]:
"""
Uploads neuron files to a custom repository and creates a PR.
"""
yield f"πŸ“€ Preparing to upload to custom repo: {custom_repo_id}"
api = HfApi(token=token)
try:
# Ensure the custom repo exists
api.repo_info(repo_id=custom_repo_id, repo_type="model")
except Exception as e:
yield f"❌ Could not access custom repository `{custom_repo_id}`. Please ensure it exists and you have write access. Error: {e}"
raise
pr_title = f"Add Neuron-optimized files for {original_model_id}"
commit_description = f"""
πŸ€– Neuron Export Bot: On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), adding AWS Neuron-optimized model files for `{original_model_id}`.
These files were generated using the [Optimum Neuron Compiler Space](https://huggingface.co/spaces/optimum/neuron-export).
"""
try:
custom_pr = api.create_commit(
repo_id=custom_repo_id,
operations=operations,
commit_message=pr_title,
commit_description=commit_description,
create_pr=True,
token=token,
)
yield f"βœ… Custom PR created successfully: https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield custom_pr
except Exception as e:
yield f"❌ Failed to create PR in custom repository: {e}"
raise
def convert(
api: "HfApi",
model_id: str,
task_or_pipeline: str,
model_type: str = "transformers",
force: bool = False,
token: str = None,
pr_options: Dict = None,
) -> Generator[Tuple[str, Any], None, None]:
if pr_options is None:
pr_options = {}
info = api.model_info(model_id, token=token)
filenames = {s.rfilename for s in info.siblings}
requesting_user = api.whoami(token=token)["name"]
if not any(pr_options.values()):
yield "1", "⚠️ No option selected. Please choose at least one option."
return
if pr_options.get("create_custom_pr") and not pr_options.get("custom_repo_id"):
yield "1", "⚠️ Custom PR selected but no repository ID was provided."
return
yield "0", f"πŸš€ Starting export process with options: {pr_options}..."
with TemporaryDirectory() as temp_dir:
export_folder = os.path.join(temp_dir, "export")
cache_mirror_dir = os.path.join(temp_dir, "cache_mirror")
os.makedirs(export_folder, exist_ok=True)
os.makedirs(cache_mirror_dir, exist_ok=True)
result_info = {}
try:
# --- Export Logic ---
export_gen = export_and_git_add(model_id, task_or_pipeline, model_type, export_folder, token=token)
operations = None
for message in export_gen:
if isinstance(message, tuple) and message[0] == "__RETURN__":
operations = message[1]
break
else:
yield "0", message
if not operations:
raise Exception("Export process did not produce any files to commit.")
# --- Cache Handling ---
cache_files_available = False
if pr_options.get("create_cache_pr"):
yield "0", "Checking for local cache files..."
local_cache_structure = get_local_cache_structure()
yield "0", f"πŸ—‚οΈ Found cache structure: {len(local_cache_structure)} neuronxcc folders"
if local_cache_structure:
cache_files_available = True
local_cache_base = "/var/tmp/neuron-compile-cache"
# Copy cache files to a temporary mirror directory for upload
shutil.copytree(local_cache_base, cache_mirror_dir, dirs_exist_ok=True)
yield "0", "Copied cache files to a temporary location for upload."
# --- New Repository Creation (Replaces Model PR) ---
if pr_options.get("create_neuron_repo"):
yield "0", "πŸ—οΈ Creating new Neuron-optimized repository..."
neuron_repo_url = None
# Generate the repo name first so we can use it consistently
neuron_repo_name = generate_neuron_repo_name(api, model_id, task_or_pipeline, token)
repo_creation_gen = create_neuron_repo_and_upload(
operations, model_id, model_type, task_or_pipeline, requesting_user, token
)
for msg in repo_creation_gen:
if isinstance(msg, str):
yield "0", msg
else:
neuron_repo_url = msg
result_info["neuron_repo"] = f"https://huggingface.co/{neuron_repo_name}"
# Automatically create a PR on the original model to add a link
yield "0", "πŸ“ Creating PR to add Neuron repo link to original model..."
readme_pr = None
readme_pr_gen = create_readme_pr_for_original_model(
model_id, neuron_repo_name, task_or_pipeline, requesting_user, token
)
for msg in readme_pr_gen:
if isinstance(msg, str):
yield "0", msg
else:
readme_pr = msg
if readme_pr:
result_info["readme_pr"] = f"https://huggingface.co/{model_id}/discussions/{readme_pr.pr_num}"
# --- Cache Repository PR ---
if pr_options.get("create_cache_pr"):
if cache_files_available:
yield "0", "πŸ“€ Creating PR in cache repository..."
cache_pr = None
cache_upload_gen = upload_cache_files(cache_mirror_dir, CACHE_REPO_ID, token)
for msg in cache_upload_gen:
if isinstance(msg, str):
yield "0", msg
else:
cache_pr = msg
if cache_pr:
result_info["cache_pr"] = f"https://huggingface.co/{CACHE_REPO_ID}/discussions/{cache_pr.pr_num}"
else:
yield "0", "⚠️ No new cache files were generated to upload."
# --- Custom Repository PR ---
if pr_options.get("create_custom_pr"):
custom_repo_id = pr_options["custom_repo_id"]
yield "0", f"πŸ“€ Creating PR in custom repository: {custom_repo_id}..."
custom_pr = None
custom_upload_gen = upload_to_custom_repo(operations, custom_repo_id, model_id, requesting_user, token)
for msg in custom_upload_gen:
if isinstance(msg, str):
yield "0", msg
else:
custom_pr = msg
if custom_pr:
result_info["custom_pr"] = f"https://huggingface.co/{custom_repo_id}/discussions/{custom_pr.pr_num}"
yield "0", result_info
except Exception as e:
yield "1", f"❌ Conversion failed with a critical error: {e}"
# Re-raise the exception to be caught by the outer try-except in the Gradio app if needed
raise
def list_cached_models(cache_repo_id: str, token: str = None) -> Dict[str, List[str]]:
"""
List all cached neuronxcc folders in the repository.
"""
try:
api = HfApi(token=token)
repo_files = api.list_repo_files(cache_repo_id, token=token)
# Group files by neuronxcc folder
neuronxcc_cache = {}
for file_path in repo_files:
# Extract neuronxcc folder from path
parts = file_path.split('/')
if len(parts) >= 3 and parts[0].startswith('neuronxcc-'):
neuronxcc_folder = parts[0]
module_folder = parts[1]
if neuronxcc_folder not in neuronxcc_cache:
neuronxcc_cache[neuronxcc_folder] = set()
neuronxcc_cache[neuronxcc_folder].add(module_folder)
# Convert sets to lists
return {k: list(v) for k, v in neuronxcc_cache.items()}
except Exception as e:
print(f"Failed to list cached models: {e}")
return {}