import os
import subprocess
import signal
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import Optional, Tuple, List, Union
from dataclasses import dataclass, field
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
@dataclass
class QuantizationConfig:
"""Configuration for model quantization."""
method: str
use_imatrix: bool = False
imatrix_method: str = "IQ4_NL"
train_data: str = ""
quant_embedding: bool = False
embedding_tensor_method: str = "Q8_0"
leave_output: bool = False
quant_output: bool = False
output_tensor_method: str = "Q8_0"
# Generated values - These will be set during processing
fp16_model: str = field(default="", init=False)
quantized_gguf: str = field(default="", init=False)
imatrix_file: str = field(default="", init=False)
@dataclass
class SplitConfig:
"""Configuration for model splitting."""
enabled: bool = False
max_tensors: int = 256
max_size: Optional[str] = None
@dataclass
class OutputConfig:
"""Configuration for output settings."""
private_repo: bool = False
repo_name: str = ""
filename: str = ""
@dataclass
class ModelProcessingConfig:
"""Configuration for the entire model processing pipeline."""
token: str
model_id: str
model_name: str
outdir: str
quant_config: QuantizationConfig
split_config: SplitConfig
output_config: OutputConfig
# Generated values - These will be set during processing
new_repo_url: str = field(default="", init=False)
new_repo_id: str = field(default="", init=False)
class GGUFConverterError(Exception):
"""Custom exception for GGUF conversion errors."""
pass
class HuggingFaceModelProcessor:
"""Handles the processing of Hugging Face models to GGUF format."""
ERROR_LOGIN = "You must be logged in to use GGUF-my-repo."
DOWNLOAD_FOLDER = "./downloads"
OUTPUT_FOLDER = "./outputs"
CALIBRATION_FILE = "calibration_data_v5_rc.txt"
QUANTIZE_TIMEOUT=86400
HF_TO_GGUF_TIMEOUT=3600
IMATRIX_TIMEOUT=86400
SPLIT_TIMEOUT=3600
KILL_TIMEOUT=5
def __init__(self):
self.SPACE_ID = os.environ.get("SPACE_ID", "")
self.SPACE_URL = f"https://{self.SPACE_ID.replace('/', '-')}.hf.space/" if self.SPACE_ID else "http://localhost:7860/"
self.HF_TOKEN = os.environ.get("HF_TOKEN")
self.RUN_LOCALLY = os.environ.get("RUN_LOCALLY")
# Create necessary folders
self._create_folder(self.DOWNLOAD_FOLDER)
self._create_folder(self.OUTPUT_FOLDER)
def _create_folder(self, folder_name: str) -> str:
"""Create a folder if it doesn't exist."""
if not os.path.exists(folder_name):
print(f"Creating folder: {folder_name}")
os.makedirs(folder_name)
return folder_name
def _validate_token(self, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Validate the OAuth token and return the token string."""
if oauth_token is None or oauth_token.token is None:
raise GGUFConverterError(self.ERROR_LOGIN)
try:
whoami(oauth_token.token)
return oauth_token.token
except Exception as e:
raise GGUFConverterError(self.ERROR_LOGIN)
def _escape_html(self, s: str) -> str:
"""Escape HTML characters for safe display."""
replacements = [
("&", "&"),
("<", "<"),
(">", ">"),
('"', """),
("\n", "
")
]
for old, new in replacements:
s = s.replace(old, new)
return s
def _get_model_creator(self, model_id: str) -> str:
"""Extract model creator from model ID."""
return model_id.split('/')[0]
def _get_model_name(self, model_id: str) -> str:
"""Extract model name from model ID."""
return model_id.split('/')[-1]
def _upload_file(self, processing_config: ModelProcessingConfig, path_or_fileobj: str, path_in_repo: str) -> None:
"""Upload a file to Hugging Face repository."""
if self.RUN_LOCALLY == "1":
print("Skipping upload...")
return
api = HfApi(token=processing_config.token)
api.upload_file(
path_or_fileobj=path_or_fileobj,
path_in_repo=path_in_repo,
repo_id=processing_config.new_repo_id,
)
def _generate_importance_matrix(self, quant_config: QuantizationConfig) -> None:
"""Generate importance matrix for quantization."""
if not os.path.isfile(quant_config.fp16_model):
raise GGUFConverterError(f"Model file not found: {quant_config.fp16_model}")
if quant_config.train_data:
train_data_path = quant_config.train_data
else:
train_data_path = self.CALIBRATION_FILE
if not os.path.isfile(train_data_path):
raise GGUFConverterError(f"Training data file not found: {train_data_path}")
print(f"Training data file path: {train_data_path}")
print("Running imatrix command...")
imatrix_command = [
"llama-imatrix",
"-m", quant_config.fp16_model,
"-f", train_data_path,
"-ngl", "99",
"--output-frequency", "10",
"-o", quant_config.imatrix_file,
]
process = subprocess.Popen(imatrix_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.IMATRIX_TIMEOUT)
except subprocess.TimeoutExpired:
print("Imatrix computation timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Imatrix proc still didn't term. Forcefully terminating process...")
process.kill()
raise GGUFConverterError("Error generating imatrix: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error generating imatrix: code={process.returncode}.")
print(f"Importance matrix generation completed: {os.path.abspath(quant_config.imatrix_file)}")
def _split_and_upload_model(self, processing_config: ModelProcessingConfig) -> None:
"""Split large model files and upload shards."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
print(f"Model path: {quant_config.quantized_gguf}")
print(f"Output dir: {processing_config.outdir}")
split_cmd = ["llama-gguf-split", "--split"]
if split_config.max_size:
split_cmd.extend(["--split-max-size", split_config.max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_config.max_tensors)])
model_path_prefix = '.'.join(quant_config.quantized_gguf.split('.')[:-1])
split_cmd.extend([quant_config.quantized_gguf, model_path_prefix])
print(f"Split command: {split_cmd}")
process = subprocess.Popen(split_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.SPLIT_TIMEOUT)
except subprocess.TimeoutExpired:
print("Splitting timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Splitting timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error splitting the model: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error splitting the model: code={process.returncode}")
print("Model split successfully!")
# Remove original model file
if os.path.exists(quant_config.quantized_gguf):
os.remove(quant_config.quantized_gguf)
model_file_prefix = model_path_prefix.split('/')[-1]
print(f"Model file name prefix: {model_file_prefix}")
sharded_model_files = [
f for f in os.listdir(processing_config.outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise GGUFConverterError("No sharded files found.")
print(f"Sharded model files: {sharded_model_files}")
for file in sharded_model_files:
file_path = os.path.join(processing_config.outdir, file)
try:
print(f"Uploading file: {file_path}")
self._upload_file(processing_config, file_path, file)
except Exception as e:
raise GGUFConverterError(f"Error uploading file {file_path}: {e}")
print("Sharded model has been uploaded successfully!")
def _download_base_model(self, processing_config: ModelProcessingConfig) -> str:
"""Download and convert Hugging Face model to GGUF FP16 format."""
print(f"Downloading model {processing_config.model_name}")
if os.path.exists(processing_config.quant_config.fp16_model):
print("Skipping fp16 conversion...")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
with tempfile.TemporaryDirectory(dir=self.DOWNLOAD_FOLDER) as tmpdir:
local_dir = f"{Path(tmpdir)}/{processing_config.model_name}"
print(f"Local directory: {os.path.abspath(local_dir)}")
# Download model
api = HfApi(token=processing_config.token)
pattern = (
"*.safetensors"
if any(
file.path.endswith(".safetensors")
for file in api.list_repo_tree(
repo_id=processing_config.model_id,
recursive=True,
)
)
else "*.bin"
)
dl_pattern = ["*.md", "*.json", "*.model"]
dl_pattern += [pattern]
api.snapshot_download(repo_id=processing_config.model_id, local_dir=local_dir, allow_patterns=dl_pattern)
print("Model downloaded successfully!")
print(f"Model directory contents: {os.listdir(local_dir)}")
config_dir = os.path.join(local_dir, "config.json")
adapter_config_dir = os.path.join(local_dir, "adapter_config.json")
if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir):
raise GGUFConverterError(
'adapter_config.json is present.
If you are converting a LoRA adapter to GGUF, '
'please use GGUF-my-lora.'
)
# Convert HF to GGUF
print(f"Converting to GGUF FP16: {os.path.abspath(processing_config.quant_config.fp16_model)}")
convert_command = [
"python3", "/app/convert_hf_to_gguf.py", local_dir,
"--outtype", "f16", "--outfile", processing_config.quant_config.fp16_model
]
process = subprocess.Popen(convert_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.HF_TO_GGUF_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversion timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversion timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error converting to fp16: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error converting to fp16: code={process.returncode}")
print("Model converted to fp16 successfully!")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
def _quantize_model(self, quant_config: QuantizationConfig) -> str:
"""Quantize the GGUF model."""
quantize_cmd = ["llama-quantize"]
if quant_config.quant_embedding:
quantize_cmd.extend(["--token-embedding-type", quant_config.embedding_tensor_method])
if quant_config.leave_output:
quantize_cmd.append("--leave-output-tensor")
else:
if quant_config.quant_output:
quantize_cmd.extend(["--output-tensor-type", quant_config.output_tensor_method])
# Set imatrix file path if needed
if quant_config.use_imatrix:
self._generate_importance_matrix(quant_config)
quantize_cmd.extend(["--imatrix", quant_config.imatrix_file])
else:
print("Not using imatrix quantization.")
quantize_cmd.append(quant_config.fp16_model)
quantize_cmd.append(quant_config.quantized_gguf)
quantize_cmd.append(quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method)
print(f"Quantizing model with {quantize_cmd}")
# Use Popen for quantization
process = subprocess.Popen(quantize_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.QUANTIZE_TIMEOUT)
except subprocess.TimeoutExpired:
print("Quantization timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Quantization timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error quantizing: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error quantizing: code={process.returncode}")
print(f"Quantized successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
print(f"Quantized model path: {os.path.abspath(quant_config.quantized_gguf)}")
return quant_config.quantized_gguf
def _create_empty_repo(self, processing_config: ModelProcessingConfig):
api = HfApi(token=processing_config.token)
new_repo_url = api.create_repo(
repo_id=processing_config.output_config.repo_name,
exist_ok=True,
private=processing_config.output_config.private_repo
)
processing_config.new_repo_url = new_repo_url.url
processing_config.new_repo_id = new_repo_url.repo_id
print("Repo created successfully!", processing_config.new_repo_url)
return new_repo_url
def _generate_readme(self, processing_config: ModelProcessingConfig, quant_config: QuantizationConfig) -> str:
"""Generate README.md for the quantized model."""
creator = self._get_model_creator(processing_config.model_id)
username = whoami(processing_config.token)["name"]
try:
card = ModelCard.load(processing_config.model_id, token=processing_config.token)
except:
card = ModelCard("")
if card.data.tags is None:
card.data.tags = []
card.data.tags.extend(["llama-cpp", "gguf-my-repo"])
card.data.base_model = processing_config.model_id
card.text = dedent(
f"""
# {processing_config.model_name}
**Model creator:** [{creator}](https://huggingface.co/{creator})
**Original model**: [{processing_config.model_id}](https://huggingface.co/{processing_config.model_id})
**GGUF quantization:** provided by [{username}](https:/huggingface.co/{username}) using `llama.cpp`
## Special thanks
🙏 Special thanks to [Georgi Gerganov](https://github.com/ggerganov) and the whole team working on [llama.cpp](https://github.com/ggerganov/llama.cpp/) for making all of this possible.
## Use with Ollama
```bash
ollama run "hf.co/{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}"
```
## Use with LM Studio
```bash
lms load "{processing_config.new_repo_id}"
```
## Use with llama.cpp CLI
```bash
llama-cli --hf "{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}" -p "The meaning to life and the universe is"
```
## Use with llama.cpp Server:
```bash
llama-server --hf "{processing_config.new_repo_id}:{quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method}" -c 4096
```
"""
)
readme_path = f"{processing_config.outdir}/README.md"
card.save(readme_path)
return readme_path
def process_model(self, processing_config: ModelProcessingConfig) -> Tuple[str, str]:
"""Main method to process a model through the entire pipeline."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
output_config = processing_config.output_config
print(f"Current working directory: {os.path.abspath(os.getcwd())}")
# Download and convert base model
self._download_base_model(processing_config)
# Quantize the model
self._quantize_model(quant_config)
# Create empty repo
self._create_empty_repo(processing_config)
# Upload model
if split_config.enabled:
print(f"Splitting quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._split_and_upload_model(processing_config)
else:
try:
print(f"Uploading quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._upload_file(processing_config, quant_config.quantized_gguf, output_config.filename)
except Exception as e:
raise GGUFConverterError(f"Error uploading quantized model: {e}")
# Upload imatrix if it exists
if quant_config.use_imatrix and os.path.isfile(quant_config.imatrix_file):
try:
print(f"Uploading imatrix.dat: {os.path.abspath(quant_config.imatrix_file)}")
self._upload_file(processing_config, quant_config.imatrix_file, f"{processing_config.model_name}-imatrix.gguf")
except Exception as e:
raise GGUFConverterError(f"Error uploading imatrix.dat: {e}")
# Upload README.md
readme_path = self._generate_readme(processing_config, quant_config)
self._upload_file(processing_config, readme_path, "README.md")
print(f"Uploaded successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
class GGUFConverterUI:
"""Gradio UI for the GGUF Converter."""
def __init__(self):
self.processor = HuggingFaceModelProcessor()
self.css = """/* Custom CSS to allow scrolling */
.gradio-container {overflow-y: auto;}
"""
# Initialize components
self._initialize_components()
self._setup_interface()
def _initialize_components(self):
"""Initialize all UI components."""
#####
# Base model section
#####
self.model_id = HuggingfaceHubSearch(
label="Hub Model ID",
placeholder="Search for model id on Huggingface",
search_type="model",
)
#####
# Quantization section
#####
self.use_imatrix = gr.Checkbox(
value=False,
label="Use Imatrix Quantization",
info="Use importance matrix for quantization."
)
self.q_method = gr.Dropdown(
choices=["Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_0", "Q4_1", "Q4_K_S", "Q4_K_M", "MXFP4_MOE", "Q5_0", "Q5_1", "Q5_K_S", "Q5_K_M", "Q6_K", "Q8_0", "F16", "BF16", "COPY"],
label="Quantization Method",
info="GGML quantization type",
value="Q4_K_M",
filterable=False,
visible=True
)
self.imatrix_q_method = gr.Dropdown(
choices=["IQ1_S", "IQ1_M", "IQ2_XXS", "IQ2_XS", "IQ2_S", "IQ2_M", "Q2_K_S", "Q2_K", "IQ3_XXS", "IQ3_XS", "IQ3_S", "IQ3_M", "Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_K_S", "Q4_K_M", "IQ4_XS", "IQ4_NL", "Q5_K_M", "Q5_K_S"],
label="Imatrix Quantization Method",
info="GGML imatrix quants type",
value="IQ4_NL",
filterable=False,
visible=False
)
self.train_data_file = gr.File(
label="Training Data File",
file_types=[".txt"],
visible=False
)
#####
# Advanced Options section
#####
self.split_model = gr.Checkbox(
value=False,
label="Split Model",
info="Shard the model using gguf-split."
)
self.split_max_tensors = gr.Number(
value=256,
label="Max Tensors per File",
info="Maximum number of tensors per file when splitting model.",
visible=False
)
self.split_max_size = gr.Textbox(
label="Max File Size",
info="Maximum file size when splitting model (--split-max-size). May leave empty to use the default. Accepted suffixes: M, G. Example: 256M, 5G",
visible=False
)
self.leave_output = gr.Checkbox(
value=False,
label="Leave output tensor",
info="Leaves output.weight un(re)quantized"
)
self.quant_embedding = gr.Checkbox(
value=False,
label="Quant embeddings tensor",
info="Quantize embeddings tensor separately"
)
self.embedding_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0"],
label="Embeddings Quantization Method",
info="use a specific quant type for the token embeddings tensor",
value="Q8_0",
filterable=False,
visible=False
)
self.quant_output = gr.Checkbox(
value=False,
label="Quant output tensor",
info="Quantize output tensor separately"
)
self.output_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0"],
label="Output Quantization Method",
info="use a specific quant type for the output.weight tensor",
value="Q8_0",
filterable=False,
visible=False
)
#####
# Output Settings section
#####
self.private_repo = gr.Checkbox(
value=False,
label="Private Repo",
info="Create a private repo under your username."
)
self.repo_name = gr.Textbox(
label="Output Repository Name",
info="Set your repository name",
max_lines=1
)
self.gguf_name = gr.Textbox(
label="Output File Name",
info="Set output file name",
max_lines=1
)
#####
# Buttons section
#####
self.clear_btn = gr.ClearButton(
value="Clear",
variant="secondary",
components=[
self.model_id,
self.q_method,
self.use_imatrix,
self.imatrix_q_method,
self.private_repo,
self.train_data_file,
self.leave_output,
self.quant_embedding,
self.embedding_tensor_method,
self.quant_output,
self.output_tensor_method,
self.split_model,
self.split_max_tensors,
self.split_max_size,
self.repo_name,
self.gguf_name,
]
)
self.submit_btn = gr.Button(
value="Submit",
variant="primary"
)
#####
# Outputs section
#####
self.output_label = gr.Markdown(label="output")
self.output_image = gr.Image(
show_label=False,
show_download_button=False,
interactive=False
)
@staticmethod
def _update_output_repo(model_id: str, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Update output repository name based on model and user."""
if oauth_token is None or not oauth_token.token:
return ""
if not model_id:
return ""
try:
username = whoami(oauth_token.token)["name"]
model_name = model_id.split('/')[-1]
return f"{username}/{model_name}-GGUF"
except:
return ""
@staticmethod
def _update_output_filename(model_id: str, use_imatrix: bool, q_method: str, imatrix_q_method: str) -> str:
"""Update output filename based on model and quantization settings."""
if not model_id:
return ""
model_name = model_id.split('/')[-1]
if use_imatrix:
return f"{model_name}-{imatrix_q_method.upper()}-imat.gguf"
return f"{model_name}-{q_method.upper()}.gguf"
def _setup_interface(self):
"""Set up the Gradio interface."""
with gr.Blocks(css=self.css) as self.demo:
#####
# Layout
#####
gr.Markdown(HuggingFaceModelProcessor.ERROR_LOGIN)
gr.LoginButton(min_width=250)
gr.HTML("
{self.processor._escape_html(str(e))}', "error.png") def launch(self): """Launch the Gradio interface.""" # Set up space restart scheduler def restart_space(): HfApi().restart_space(repo_id=self.processor.SPACE_ID, token=self.processor.HF_TOKEN, factory_reboot=True) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=21600) scheduler.start() # Launch the interface self.demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False) # Main execution if __name__ == "__main__": ui = GGUFConverterUI() ui.launch()