Spaces:
Running
Running
import json | |
import os | |
from typing import Any, Dict | |
import pandas as pd | |
from huggingface_hub import HfApi, hf_hub_download, metadata_load | |
from .dataset_handler import VIDORE_DATASETS_KEYWORDS, get_datasets_nickname | |
BLOCKLIST = ["impactframes"] | |
class ModelHandler: | |
def __init__(self, model_infos_path="model_infos.json"): | |
self.api = HfApi() | |
self.model_infos_path = model_infos_path | |
self.model_infos = self._load_model_infos() | |
def _load_model_infos(self) -> Dict: | |
if os.path.exists(self.model_infos_path): | |
with open(self.model_infos_path) as f: | |
return json.load(f) | |
return {} | |
def _save_model_infos(self): | |
with open(self.model_infos_path, "w") as f: | |
json.dump(self.model_infos, f) | |
def _are_results_in_new_vidore_format(self, results: Dict[str, Any]) -> bool: | |
return "metadata" in results and "metrics" in results | |
def get_vidore_data(self, metric="ndcg_at_5"): | |
models = self.api.list_models(filter="vidore") | |
repositories = [model.modelId for model in models] # type: ignore | |
for repo_id in repositories: | |
org_name = repo_id.split("/")[0] | |
if org_name in BLOCKLIST: | |
continue | |
files = [f for f in self.api.list_repo_files(repo_id) if f.endswith("_metrics.json") or f == "results.json"] | |
if len(files) == 0: | |
continue | |
else: | |
for file in files: | |
if file.endswith("results.json"): | |
model_name = repo_id.replace("/", "_") | |
else: | |
model_name = file.split("_metrics.json")[0] | |
if model_name not in self.model_infos: | |
readme_path = hf_hub_download(repo_id, filename="README.md") | |
meta = metadata_load(readme_path) | |
try: | |
result_path = hf_hub_download(repo_id, filename=file) | |
with open(result_path) as f: | |
results = json.load(f) | |
if self._are_results_in_new_vidore_format(results): | |
metadata = results["metadata"] | |
results = results["metrics"] | |
self.model_infos[model_name] = {"meta": meta, "results": results} | |
except Exception as e: | |
print(f"Error loading {model_name} - {e}") | |
continue | |
# self._save_model_infos() | |
model_res = {} | |
if len(self.model_infos) > 0: | |
for model in self.model_infos.keys(): | |
res = self.model_infos[model]["results"] | |
dataset_res = {} | |
for dataset in res.keys(): | |
# for each keyword check if it is in the dataset name if not continue | |
if not any(keyword in dataset for keyword in VIDORE_DATASETS_KEYWORDS): | |
print(f"{dataset} not found in ViDoRe datasets. Skipping ...") | |
continue | |
dataset_nickname = get_datasets_nickname(dataset) | |
dataset_res[dataset_nickname] = res[dataset][metric] | |
model_res[model] = dataset_res | |
df = pd.DataFrame(model_res).T | |
return df | |
return pd.DataFrame() | |
def add_rank(df): | |
df.fillna(0.0, inplace=True) | |
cols_to_rank = [ | |
col | |
for col in df.columns | |
if col | |
not in [ | |
"Model", | |
"Model Size (Million Parameters)", | |
"Memory Usage (GB, fp32)", | |
"Embedding Dimensions", | |
"Max Tokens", | |
] | |
] | |
if len(cols_to_rank) == 1: | |
df.sort_values(cols_to_rank[0], ascending=False, inplace=True) | |
else: | |
df.insert(len(df.columns) - len(cols_to_rank), "Average", df[cols_to_rank].mean(axis=1, skipna=False)) | |
df.sort_values("Average", ascending=False, inplace=True) | |
df.insert(0, "Rank", list(range(1, len(df) + 1))) | |
# multiply values by 100 if they are floats and round to 1 decimal place | |
for col in df.columns: | |
if df[col].dtype == "float64": | |
df[col] = df[col].apply(lambda x: round(x * 100, 1)) | |
return df | |