from app.core.cache import cache_config from datetime import datetime from typing import List, Dict, Any import datasets from fastapi import HTTPException import logging from app.config.base import HF_ORGANIZATION from app.core.formatting import LogFormatter from dataclasses import dataclass from enum import Enum import json import os import glob from transformers import AutoConfig from transformers.models.auto.tokenization_auto import AutoTokenizer import dateutil import numpy as np from huggingface_hub import snapshot_download from app.services.models import ModelService from app.config import ( RESULTS_CACHE, EVAL_CACHE, HF_TOKEN, ) from app.config.hf_config import ( RESULTS_REPO, ) logger = logging.getLogger(__name__) ## All the model information that we might need ## TODO move all these classes to proper place @dataclass class ModelDetails: name: str display_name: str = "" symbol: str = "" # emoji class ModelType(Enum): PT = ModelDetails(name="pretrained", symbol="🟢") FT = ModelDetails(name="fine-tunedondomain-specificdatasets", symbol="🔶") IFT = ModelDetails(name="instruction-tuned", symbol="⭕") RL = ModelDetails(name="RL-tuned", symbol="🟦") CHAT = ModelDetails(name="chatmodels") Unknown = ModelDetails(name="", symbol="?") def to_str(self): return f"{self.value.name}" @staticmethod def from_str(type): if "fine-tuned" in type or "🔶" in type: return ModelType.FT if "pretrained" in type or "🟢" in type: return ModelType.PT if "RL-tuned" in type or "🟦" in type: return ModelType.RL if "instruction-tuned" in type or "⭕" in type or "chatmodels" in type : return ModelType.CHAT return ModelType.Unknown class WeightType(Enum): Adapter = ModelDetails("Adapter") Original = ModelDetails("Original") Delta = ModelDetails("Delta") class Precision(Enum): float16 = ModelDetails("float16") bfloat16 = ModelDetails("bfloat16") Unknown = ModelDetails("?") def from_str(precision): if precision in ["torch.float16", "float16"]: return Precision.float16 if precision in ["torch.bfloat16", "bfloat16"]: return Precision.bfloat16 return Precision.Unknown @dataclass class Task: benchmark: str metric: str normalized_metric: str col_name: str class Tasks(Enum): # task_key in the json file, metric_key in the json file, name to display in the leaderboard # task0 = Task("IFEVal-fr", "metric_name", "IFEVal-fr") # task1 = Task("GPQA-fr", "metric_name", "GPQA-fr") # task2 = Task("BAC-fr", "metric_name", "BAC-fr") task0 = Task("community|gpqa-fr|0", "acc", "norm_acc", "GPQA-fr") # On pourrait vouloir mettre "Connaissances" task1 = Task("community|ifeval-fr|0", "norm_acc", "norm_acc", "IFEval-fr") # FIXME norm_acc should be acc # et "Suivi d'instructions" def is_model_on_hub(model_name: str, revision: str, token: str = None, trust_remote_code=False, test_tokenizer=False) -> tuple[bool, str]: """Checks if the model model_name is on the hub, and whether it (and its tokenizer) can be loaded with AutoClasses.""" try: config = AutoConfig.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) if test_tokenizer: try: tk = AutoTokenizer.from_pretrained(model_name, revision=revision, trust_remote_code=trust_remote_code, token=token) except ValueError as e: return ( False, f"uses a tokenizer which is not in a transformers release: {e}", None ) except Exception as e: return (False, "'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", None) return True, None, config except ValueError: return ( False, "needs to be launched with `trust_remote_code=True`. For safety reason, we do not allow these models to be automatically submitted to the leaderboard.", None ) except Exception as e: return False, "was not found on hub!", None @dataclass class EvalResult: """Represents one full evaluation. Built from a combination of the result and request file for a given run.""" eval_name: str # org_model_precision (uid) full_model: str # org/model (path on hub) org: str model: str revision: str # commit hash, "" if main results: dict normalized_results: dict precision: Precision = Precision.Unknown model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ... weight_type: WeightType = WeightType.Original # Original or Adapter architecture: str = "Unknown" license: str = "?" likes: int = 0 num_params: int = 0 date: str = "" # submission date of request file still_on_hub: bool = False @classmethod def init_from_json_file(self, json_filepath): """Inits the result from the specific model result file""" with open(json_filepath) as fp: data = json.load(fp) config = data.get("config_general") # Precision precision = Precision.from_str(config.get("model_dtype")) # Get model and org org_and_model = config.get("model_name", config.get("model_args", None)) org_and_model = org_and_model.split("/", 1) if len(org_and_model) == 1: org = None model = org_and_model[0] result_key = f"{model}_{precision.value.name}" else: org = org_and_model[0] model = org_and_model[1] result_key = f"{org}_{model}_{precision.value.name}" full_model = "/".join(org_and_model) still_on_hub, _, model_config = is_model_on_hub( full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False ) architecture = "?" if model_config is not None: architectures = getattr(model_config, "architectures", None) if architectures: architecture = ";".join(architectures) # Extract results available in this file (some results are split in several files) results = {} normalized_results = {} for task in Tasks: task = task.value # We average all scores of a given metric (not all metrics are present in all files) accs = np.array([v.get(task.metric, None) for k, v in data["results"].items() if task.benchmark == k]) if accs.size == 0 or any([acc is None for acc in accs]): continue mean_acc = np.mean(accs) * 100.0 results[task.benchmark] = mean_acc r = data["results"][task.benchmark].get(task.normalized_metric, None) if r is None: continue normalized_results[task.benchmark] = r * 100.0 return self( eval_name=result_key, full_model=full_model, org=org, model=model, results=results, normalized_results=normalized_results, precision=precision, revision=config.get("model_sha", ""), still_on_hub=still_on_hub, architecture=architecture, ) def update_with_request_file(self, existing_models): """Finds the relevant request file for the current model and updates info with it""" for status, models in existing_models.items(): if status == "finished": for model in models: if model["name"] == self.full_model and model["precision"] == self.precision.value.name: # FIXME and model["revision"] == model_data["revision"]: self.model_type = ModelType.from_str(model["model_type"]) self.weight_type = WeightType[model["weight_type"]] #self.license = request.get("license", "?") #self.likes = request.get("likes", 0) #self.num_params = request.get("params", 0) #self.date = request.get("submitted_time", "") return print( f"Could not find request file for {self.org}/{self.model} with precision {self.precision.value.name}" ) class LeaderboardService: def __init__(self): self.model_service = ModelService() pass async def get_raw_eval_results(self, results_path: str, requests_path: str) -> list[EvalResult]: """From the path of the results folder root, extract all needed info for results""" model_result_filepaths = [] for root, _, files in os.walk(results_path): # We should only have json files in model results if len(files) == 0 or any([not f.endswith(".json") for f in files]): continue # Sort the files by date try: files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7]) except dateutil.parser._parser.ParserError: files = [files[-1]] for file in files: model_result_filepaths.append(os.path.join(root, file)) eval_results = {} await self.model_service.initialize() for model_result_filepath in model_result_filepaths: # Creation of result eval_result = EvalResult.init_from_json_file(model_result_filepath) existing_models = await self.model_service.get_models() eval_result.update_with_request_file(existing_models) # Store results of same eval together eval_name = eval_result.eval_name if eval_name in eval_results.keys(): eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) else: eval_results[eval_name] = eval_result return eval_results.values() async def fetch_raw_data(self) -> List[EvalResult]: """Fetch raw leaderboard data from HuggingFace dataset""" try: logger.info(LogFormatter.section("FETCHING LEADERBOARD DATA")) logger.info(LogFormatter.info(f"Loading dataset from {HF_ORGANIZATION}/contents")) print("GETTING FROM %s" % HF_ORGANIZATION) # TODO: cache snapshot_download( repo_id=RESULTS_REPO, local_dir=RESULTS_CACHE, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=HF_TOKEN, ) data = await self.get_raw_eval_results(RESULTS_CACHE, EVAL_CACHE) return data except Exception as e: logger.error(LogFormatter.error("Failed to fetch leaderboard data", e)) raise HTTPException(status_code=500, detail=str(e)) async def get_formatted_data(self) -> List[Dict[str, Any]]: """Get formatted leaderboard data""" try: logger.info(LogFormatter.section("FORMATTING LEADERBOARD DATA")) raw_data = await self.fetch_raw_data() formatted_data = [] type_counts = {} error_count = 0 # Initialize progress tracking total_items = len(raw_data) logger.info(LogFormatter.info(f"Processing {total_items:,} entries...")) for i, item in enumerate(raw_data, 1): try: formatted_item = await self.transform_data(item) formatted_data.append(formatted_item) # Count model types model_type = formatted_item["model"]["type"] type_counts[model_type] = type_counts.get(model_type, 0) + 1 except Exception as e: error_count += 1 logger.error(LogFormatter.error(f"Failed to format entry {i}/{total_items}", e)) continue # Log progress every 10% if i % max(1, total_items // 10) == 0: progress = (i / total_items) * 100 logger.info(LogFormatter.info(f"Progress: {LogFormatter.progress_bar(i, total_items)}")) # Log final statistics stats = { "Total_Processed": total_items, "Successful": len(formatted_data), "Failed": error_count } logger.info(LogFormatter.section("PROCESSING SUMMARY")) for line in LogFormatter.stats(stats, "Processing Statistics"): logger.info(line) # Log model type distribution type_stats = {f"Type_{k}": v for k, v in type_counts.items()} logger.info(LogFormatter.subsection("MODEL TYPE DISTRIBUTION")) for line in LogFormatter.stats(type_stats): logger.info(line) return formatted_data except Exception as e: logger.error(LogFormatter.error("Failed to format leaderboard data", e)) raise HTTPException(status_code=500, detail=str(e)) async def transform_data(self, data: EvalResult) -> Dict[str, Any]: """Transform raw data into the format expected by the frontend""" try: # Extract model name for logging model_name = data.full_model logger.debug(LogFormatter.info(f"Transforming data for model: {model_name}")) # Create unique ID combining model name, precision, sha and chat template status unique_id = f"{data.full_model}_{data.precision}" # FIXME missing _{data.get('Model sha', 'Unknown')}_{str(data.get('Chat Template', False))}" evaluations = { "ifeval_fr": { "name": "IFEval FR", "value": data.results.get("community|ifeval-fr|0", 0), "normalized_score": data.normalized_results.get("community|ifeval-fr|0", 0), }, "gpqa_fr": { "name": "GPQA FR", "value": data.results.get("community|gpqa-fr|0", 0), "normalized_score": data.normalized_results.get("community|gpqa-fr|0", 0), }, "bac_fr": { "name": "BAC FR", "value": 0, # FIXME "normalized_score": 0 # FIXME } } features = { } # FIXME # "is_not_available_on_hub": data.get("Available on the hub", False), # "is_merged": data.get("Merged", False), # "is_moe": data.get("MoE", False), # "is_flagged": data.get("Flagged", False), # "is_official_provider": data.get("Official Providers", False) #} metadata = { } # FIXME # "upload_date": data.get("Upload To Hub Date"), # "submission_date": data.get("Submission Date"), # "generation": data.get("Generation"), # "base_model": data.get("Base Model"), # "hub_license": data.get("Hub License"), # "hub_hearts": data.get("Hub ❤️"), # "params_billions": data.get("#Params (B)"), # "co2_cost": data.get("CO₂ cost (kg)", 0) #} # Clean model type by removing emojis if present model_type = data.model_type # FIXME # Map old model types to new ones #model_type_mapping = { # "fine-tuned": "fined-tuned-on-domain-specific-dataset", # "fine tuned": "fined-tuned-on-domain-specific-dataset", # "finetuned": "fined-tuned-on-domain-specific-dataset", # "fine_tuned": "fined-tuned-on-domain-specific-dataset", # "ft": "fined-tuned-on-domain-specific-dataset", # "finetuning": "fined-tuned-on-domain-specific-dataset", # "fine tuning": "fined-tuned-on-domain-specific-dataset", # "fine-tuning": "fined-tuned-on-domain-specific-dataset" #} #mapped_type = model_type_mapping.get(model_type.lower().strip(), model_type) #if mapped_type != model_type: # logger.debug(LogFormatter.info(f"Model type mapped: {model_type} -> {mapped_type}")) transformed_data = { "id": unique_id, "model": { "name": data.full_model, "sha": "", # FIXME data.get("Model sha"), "precision": data.precision.name, "type": model_type.to_str(), "weight_type": data.weight_type.name, "architecture": data.architecture, "average_score": sum([v for v in data.results.values() if v is not None]) / len(Tasks), "has_chat_template": False, # FIXME data.get("Chat Template", False), }, "evaluations": evaluations, "features": features, "metadata": metadata } logger.debug(LogFormatter.success(f"Successfully transformed data for {model_name}")) return transformed_data except Exception as e: logger.error(LogFormatter.error(f"Failed to transform data for {data.full_model}", e)) raise