|  | """ | 
					
						
						|  | Process and transform GuardBench leaderboard data. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | import json | 
					
						
						|  | import os | 
					
						
						|  | import pandas as pd | 
					
						
						|  | from datetime import datetime | 
					
						
						|  | from typing import Dict, List, Any, Tuple | 
					
						
						|  | import numpy as np | 
					
						
						|  |  | 
					
						
						|  | from src.display.utils import CATEGORIES, TEST_TYPES, METRICS | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | MAX_PUNISHABLE_RUNTIME_MS = 6000.0 | 
					
						
						|  | MIN_PUNISHABLE_RUNTIME_MS = 200.0 | 
					
						
						|  | MAX_RUNTIME_PENALTY = 0.75 | 
					
						
						|  |  | 
					
						
						|  | def calculate_integral_score(row: pd.Series) -> float: | 
					
						
						|  | """ | 
					
						
						|  | Calculate the integral score for a given model entry row. | 
					
						
						|  | Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. | 
					
						
						|  | Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. | 
					
						
						|  | """ | 
					
						
						|  | integral_score = 1.0 | 
					
						
						|  | metric_count = 0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  | metric_col = f"{test_type}_accuracy" | 
					
						
						|  | if metric_col in row and pd.notna(row[metric_col]): | 
					
						
						|  |  | 
					
						
						|  | integral_score *= row[metric_col] | 
					
						
						|  | metric_count += 1 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if metric_count == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | micro_error_col = "micro_avg_error_ratio" | 
					
						
						|  | if micro_error_col in row and pd.notna(row[micro_error_col]): | 
					
						
						|  |  | 
					
						
						|  | micro_error_ratio = row[micro_error_col] / 100.0 | 
					
						
						|  | integral_score *= (1.0 - micro_error_ratio) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | avg_runtime_ms = None | 
					
						
						|  | micro_runtime_col = "micro_avg_runtime_ms" | 
					
						
						|  | if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): | 
					
						
						|  | avg_runtime_ms = row[micro_runtime_col] | 
					
						
						|  |  | 
					
						
						|  | if avg_runtime_ms is not None: | 
					
						
						|  |  | 
					
						
						|  | runtime = max( | 
					
						
						|  | min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), | 
					
						
						|  | MIN_PUNISHABLE_RUNTIME_MS, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: | 
					
						
						|  | normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( | 
					
						
						|  | MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | time_factor = max(MAX_RUNTIME_PENALTY, time_factor) | 
					
						
						|  | integral_score *= time_factor | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return integral_score | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def load_leaderboard_data(file_path: str) -> Dict: | 
					
						
						|  | """ | 
					
						
						|  | Load the leaderboard data from a JSON file. | 
					
						
						|  | """ | 
					
						
						|  | if not os.path.exists(file_path): | 
					
						
						|  | version = "v0" | 
					
						
						|  | if "_v" in file_path: | 
					
						
						|  | version = file_path.split("_")[-1].split(".")[0] | 
					
						
						|  | return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} | 
					
						
						|  |  | 
					
						
						|  | with open(file_path, 'r') as f: | 
					
						
						|  | data = json.load(f) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "version" not in data: | 
					
						
						|  | version = "v0" | 
					
						
						|  | if "_v" in file_path: | 
					
						
						|  | version = file_path.split("_")[-1].split(".")[0] | 
					
						
						|  | data["version"] = version | 
					
						
						|  |  | 
					
						
						|  | return data | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def save_leaderboard_data(data: Dict, file_path: str) -> None: | 
					
						
						|  | """ | 
					
						
						|  | Save the leaderboard data to a JSON file. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | os.makedirs(os.path.dirname(file_path), exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | data["last_updated"] = datetime.now().isoformat() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "version" not in data: | 
					
						
						|  | version = "v0" | 
					
						
						|  | if "_v" in file_path: | 
					
						
						|  | version = file_path.split("_")[-1].split(".")[0] | 
					
						
						|  | data["version"] = version | 
					
						
						|  |  | 
					
						
						|  | with open(file_path, 'w') as f: | 
					
						
						|  | json.dump(data, f, indent=2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_submission(submission_data: List[Dict]) -> List[Dict]: | 
					
						
						|  | """ | 
					
						
						|  | Process submission data and convert it to leaderboard entries. | 
					
						
						|  | """ | 
					
						
						|  | entries = [] | 
					
						
						|  |  | 
					
						
						|  | for item in submission_data: | 
					
						
						|  |  | 
					
						
						|  | entry = { | 
					
						
						|  | "model_name": item.get("model_name", "Unknown Model"), | 
					
						
						|  | "per_category_metrics": {}, | 
					
						
						|  | "avg_metrics": {}, | 
					
						
						|  | "submission_date": datetime.now().isoformat(), | 
					
						
						|  | "version": item.get("version", "v0") | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: | 
					
						
						|  | if key in item: | 
					
						
						|  | entry[key] = item[key] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "per_category_metrics" in item: | 
					
						
						|  | entry["per_category_metrics"] = item["per_category_metrics"] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "avg_metrics" in item: | 
					
						
						|  | entry["avg_metrics"] = item["avg_metrics"] | 
					
						
						|  |  | 
					
						
						|  | entries.append(entry) | 
					
						
						|  |  | 
					
						
						|  | return entries | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: | 
					
						
						|  | """ | 
					
						
						|  | Convert leaderboard data to a pandas DataFrame for display. | 
					
						
						|  | """ | 
					
						
						|  | rows = [] | 
					
						
						|  |  | 
					
						
						|  | for entry in leaderboard_data.get("entries", []): | 
					
						
						|  | model_name = entry.get("model_name", "Unknown Model") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | row = { | 
					
						
						|  | "model_name": model_name, | 
					
						
						|  | "model_type": entry.get("model_type", "Unknown"), | 
					
						
						|  | "mode": entry.get("mode", "Strict"), | 
					
						
						|  | "submission_date": entry.get("submission_date", ""), | 
					
						
						|  | "version": entry.get("version", "v0"), | 
					
						
						|  | "guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for key in ["base_model", "revision", "precision", "weight_type"]: | 
					
						
						|  | if key in entry: | 
					
						
						|  | row[key] = entry[key] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for key, value in entry.items(): | 
					
						
						|  | if any(test_type in key for test_type in TEST_TYPES) or \ | 
					
						
						|  | key in ["average_f1", "average_recall", "average_precision", | 
					
						
						|  | "macro_accuracy", "macro_recall", "total_evals_count"]: | 
					
						
						|  | row[key] = value | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | avg_metrics = entry.get("avg_metrics", {}) | 
					
						
						|  | if avg_metrics: | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  | if test_type in avg_metrics: | 
					
						
						|  | metrics = avg_metrics[test_type] | 
					
						
						|  | for metric in METRICS: | 
					
						
						|  | if metric in metrics: | 
					
						
						|  | col_name = f"{test_type}_{metric}" | 
					
						
						|  | row[col_name] = metrics[metric] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if metric == "f1_binary": | 
					
						
						|  | row[f"{test_type}_f1"] = metrics[metric] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "macro_accuracy" not in row: | 
					
						
						|  | accuracy_values = [] | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  |  | 
					
						
						|  | accuracy_val = None | 
					
						
						|  | if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): | 
					
						
						|  | accuracy_val = avg_metrics[test_type]["accuracy"] | 
					
						
						|  |  | 
					
						
						|  | elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): | 
					
						
						|  | accuracy_val = row[f"{test_type}_accuracy"] | 
					
						
						|  |  | 
					
						
						|  | if accuracy_val is not None: | 
					
						
						|  | accuracy_values.append(accuracy_val) | 
					
						
						|  |  | 
					
						
						|  | if accuracy_values: | 
					
						
						|  | row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if "macro_recall" not in row: | 
					
						
						|  | recall_values = [] | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  | if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): | 
					
						
						|  | recall_values.append(avg_metrics[test_type]["recall_binary"]) | 
					
						
						|  | if recall_values: | 
					
						
						|  | row["macro_recall"] = sum(recall_values) / len(recall_values) | 
					
						
						|  |  | 
					
						
						|  | if "total_evals_count" not in row: | 
					
						
						|  | total_samples = 0 | 
					
						
						|  | found_samples = False | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  | if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): | 
					
						
						|  | total_samples += avg_metrics[test_type]["sample_count"] | 
					
						
						|  | found_samples = True | 
					
						
						|  | if found_samples: | 
					
						
						|  | row["total_evals_count"] = total_samples | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) | 
					
						
						|  | row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if pd.notna(row["micro_avg_error_ratio"]): | 
					
						
						|  | row["micro_avg_error_ratio"] *= 100 | 
					
						
						|  |  | 
					
						
						|  | rows.append(row) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | df = pd.DataFrame(rows) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for test_type in TEST_TYPES: | 
					
						
						|  | for metric in METRICS: | 
					
						
						|  | col_name = f"{test_type}_{metric}" | 
					
						
						|  | if col_name not in df.columns: | 
					
						
						|  | df[col_name] = pd.NA | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: | 
					
						
						|  |  | 
					
						
						|  | if col_name in df.columns: | 
					
						
						|  | df[f"{test_type}_f1"] = df[col_name] | 
					
						
						|  | else: | 
					
						
						|  | df[f"{test_type}_f1"] = pd.NA | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if not df.empty: | 
					
						
						|  | df["integral_score"] = df.apply(calculate_integral_score, axis=1) | 
					
						
						|  |  | 
					
						
						|  | df = df.sort_values(by="integral_score", ascending=False, na_position='last') | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | df["integral_score"] = pd.NA | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] | 
					
						
						|  | for col in summary_cols: | 
					
						
						|  | if col not in df.columns: | 
					
						
						|  | df[col] = pd.NA | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | old_avg_cols = ["average_f1", "average_recall", "average_precision"] | 
					
						
						|  | for col in old_avg_cols: | 
					
						
						|  | if col in df.columns: | 
					
						
						|  | df = df.drop(columns=[col]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | return df | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: | 
					
						
						|  | """ | 
					
						
						|  | Add new entries to the leaderboard, replacing any with the same model name. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | existing_entries = { | 
					
						
						|  | (entry["model_name"], entry.get("version", "v0")): i | 
					
						
						|  | for i, entry in enumerate(leaderboard_data.get("entries", [])) | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for new_entry in new_entries: | 
					
						
						|  | model_name = new_entry.get("model_name") | 
					
						
						|  | version = new_entry.get("version", "v0") | 
					
						
						|  |  | 
					
						
						|  | if (model_name, version) in existing_entries: | 
					
						
						|  |  | 
					
						
						|  | leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | if "entries" not in leaderboard_data: | 
					
						
						|  | leaderboard_data["entries"] = [] | 
					
						
						|  | leaderboard_data["entries"].append(new_entry) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | leaderboard_data["last_updated"] = datetime.now().isoformat() | 
					
						
						|  |  | 
					
						
						|  | return leaderboard_data | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: | 
					
						
						|  | """ | 
					
						
						|  | Process a JSONL submission file and extract entries. | 
					
						
						|  | """ | 
					
						
						|  | entries = [] | 
					
						
						|  | try: | 
					
						
						|  | with open(file_path, 'r') as f: | 
					
						
						|  | for line in f: | 
					
						
						|  | try: | 
					
						
						|  | entry = json.loads(line) | 
					
						
						|  | entries.append(entry) | 
					
						
						|  | except json.JSONDecodeError as e: | 
					
						
						|  | return [], f"Invalid JSON in submission file: {e}" | 
					
						
						|  |  | 
					
						
						|  | if not entries: | 
					
						
						|  | return [], "Submission file is empty" | 
					
						
						|  |  | 
					
						
						|  | return entries, "Successfully processed submission" | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return [], f"Error processing submission file: {e}" | 
					
						
						|  |  |