File size: 9,156 Bytes
e7abd9e
 
 
 
 
 
 
9469eae
e7abd9e
 
 
 
9469eae
e7abd9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9469eae
e7abd9e
 
9469eae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7abd9e
9469eae
e7abd9e
9469eae
e7abd9e
 
 
 
9469eae
 
e7abd9e
 
 
 
9469eae
e7abd9e
 
9469eae
e7abd9e
 
9469eae
e7abd9e
 
9469eae
 
 
e7abd9e
9469eae
 
e7abd9e
 
 
9469eae
e7abd9e
9469eae
e7abd9e
9469eae
e7abd9e
9469eae
 
 
e7abd9e
 
 
9469eae
e7abd9e
 
9469eae
e7abd9e
9469eae
e7abd9e
9469eae
e7abd9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import json
import logging
import asyncio
import re
from typing import Tuple, Optional, Dict, Any
import aiohttp
from huggingface_hub import HfApi, ModelCard, hf_hub_download
from huggingface_hub import hf_api
from transformers import AutoConfig, AutoTokenizer
from app.config.base import HF_TOKEN, API
from app.utils.logging import LogFormatter


logger = logging.getLogger(__name__)

class ModelValidator:
    def __init__(self):
        self.token = HF_TOKEN
        self.api = HfApi(token=self.token)
        self.headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
        
    async def check_model_card(self, model_id: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
        """Check if model has a valid model card"""
        try:
            logger.info(LogFormatter.info(f"Checking model card for {model_id}"))
            
            # Get model card content using ModelCard.load
            try:
                model_card = await asyncio.to_thread(
                    ModelCard.load,
                    model_id
                )
                logger.info(LogFormatter.success("Model card found"))
            except Exception as e:
                error_msg = "Please add a model card to your model to explain how you trained/fine-tuned it."
                logger.error(LogFormatter.error(error_msg, e))
                return False, error_msg, None
            
            # Check license in model card data
            if model_card.data.license is None and not ("license_name" in model_card.data and "license_link" in model_card.data):
                error_msg = "License not found. Please add a license to your model card using the `license` metadata or a `license_name`/`license_link` pair."
                logger.warning(LogFormatter.warning(error_msg))
                return False, error_msg, None

            # Enforce card content length
            if len(model_card.text) < 200:
                error_msg = "Please add a description to your model card, it is too short."
                logger.warning(LogFormatter.warning(error_msg))
                return False, error_msg, None
            
            logger.info(LogFormatter.success("Model card validation passed"))
            return True, "", model_card
            
        except Exception as e:
            error_msg = "Failed to validate model card"
            logger.error(LogFormatter.error(error_msg, e))
            return False, str(e), None
            
    async def get_safetensors_metadata(self, model_id: str, is_adapter: bool = False, revision: str = "main")  -> Optional[Dict]:
        """Get metadata from a safetensors file"""
        try:
            if is_adapter:
                metadata = await asyncio.to_thread(
                    hf_api.parse_safetensors_file_metadata,
                    model_id,
                    "adapter_model.safetensors",
                    token=self.token,
                    revision=revision,
                )
            else:
                metadata = await asyncio.to_thread(
                    hf_api.get_safetensors_metadata,
                    repo_id=model_id,
                    token=self.token,
                    revision=revision,
                )
            return metadata

        except Exception as e:
            logger.error(f"Failed to get safetensors metadata: {str(e)}")
            return None

    async def get_model_size(
        self,
        model_info: Any,
        precision: str,
        base_model: str,
        revision: str
    ) -> Tuple[Optional[float], Optional[str]]:
        """Get model size in billions of parameters"""
        try:
            logger.info(LogFormatter.info(f"Checking model size for {model_info.modelId}"))

            # Check if model is adapter
            is_adapter = any(s.rfilename == "adapter_config.json" for s in model_info.siblings if hasattr(s, 'rfilename'))

            # Try to get size from safetensors first
            model_size = None

            if is_adapter and base_model:
                # For adapters, we need both adapter and base model sizes
                adapter_meta = await self.get_safetensors_metadata(model_info.id, is_adapter=True, revision=revision)
                base_meta = await self.get_safetensors_metadata(base_model, revision="main")

                if adapter_meta and base_meta:
                    adapter_size = sum(adapter_meta.parameter_count.values())
                    base_size = sum(base_meta.parameter_count.values())
                    model_size = (adapter_size + base_size) / (2 * 1e9)  # Convert to billions, assuming float16
            else:
                # For regular models, just get the model size
                meta = await self.get_safetensors_metadata(model_info.id, revision=revision)
                if meta:
                    total_params = sum(meta.parameter_count.values())
                    model_size = total_params / (2 * 1e9)  # Convert to billions, assuming float16

            if model_size is None:
                # If model size could not be determined, return an error
                return None, "Model size could not be determined"

            # Adjust size for GPTQ models
            size_factor = 8 if (precision == "GPTQ" or "gptq" in model_info.id.lower()) else 1
            model_size = round(size_factor * model_size, 3)

            logger.info(LogFormatter.success(f"Model size: {model_size}B parameters"))
            return model_size, None

        except Exception as e:
            logger.error(LogFormatter.error(f"Error while determining model size: {e}"))
            return None, str(e)

            
    async def check_chat_template(
        self,
        model_id: str,
        revision: str
    ) -> Tuple[bool, Optional[str]]:
        """Check if model has a valid chat template"""
        try:
            logger.info(LogFormatter.info(f"Checking chat template for {model_id}"))
            
            try:
                config_file = await asyncio.to_thread(
                    hf_hub_download,
                    repo_id=model_id,
                    filename="tokenizer_config.json",
                    revision=revision,
                    repo_type="model"
                )
                
                with open(config_file, 'r') as f:
                    tokenizer_config = json.load(f)
                
                if 'chat_template' not in tokenizer_config:
                    error_msg = f"The model {model_id} doesn't have a chat_template in its tokenizer_config.json. Please add a chat_template before submitting or submit without it."
                    logger.error(LogFormatter.error(error_msg))
                    return False, error_msg
                
                logger.info(LogFormatter.success("Valid chat template found"))
                return True, None
                
            except Exception as e:
                error_msg = f"Error checking chat_template: {str(e)}"
                logger.error(LogFormatter.error(error_msg))
                return False, error_msg
                    
        except Exception as e:
            error_msg = "Failed to check chat template"
            logger.error(LogFormatter.error(error_msg, e))
            return False, str(e)
            
    async def is_model_on_hub(
        self,
        model_name: str,
        revision: str,
        test_tokenizer: bool = False,
        trust_remote_code: bool = False
    ) -> Tuple[bool, Optional[str], Optional[Any]]:
        """Check if model exists and is properly configured on the Hub"""
        try:
            config = await asyncio.to_thread(
                AutoConfig.from_pretrained,
                model_name,
                revision=revision,
                trust_remote_code=trust_remote_code,
                token=self.token,
                force_download=True
            )
            
            if test_tokenizer:
                try:
                    await asyncio.to_thread(
                        AutoTokenizer.from_pretrained,
                        model_name,
                        revision=revision,
                        trust_remote_code=trust_remote_code,
                        token=self.token
                    )
                except ValueError as e:
                    return False, f"uses a tokenizer which is not in a transformers release: {e}", None
                except Exception:
                    return False, "'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", None
            
            return True, None, config
            
        except ValueError:
            return False, "needs to be launched with `trust_remote_code=True`. For safety reason, we do not allow these models to be automatically submitted to the leaderboard.", None
        except Exception as e:
            if "You are trying to access a gated repo." in str(e):
                return True, "uses a gated model.", None
            return False, f"was not found or misconfigured on the hub! Error raised was {e.args[0]}", None