import json import gzip import gradio as gr from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import snapshot_download from io import StringIO from src.about import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT, INTRODUCTION_TEXT, LLM_BENCHMARKS_TEXT, TITLE, ) from src.display.css_html_js import custom_css from src.display.utils import ( BENCHMARK_COLS, BENCHMARK_COLS_MULTIMODAL, BENCHMARK_COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_CAUSALGRAPH, COLS, COLS_MIB_SUBGRAPH, COLS_MIB_CAUSALGRAPH, COLS_MULTIMODAL, EVAL_COLS, EVAL_TYPES, AutoEvalColumn, AutoEvalColumn_mib_subgraph, AutoEvalColumn_mib_causalgraph, fields, ) from src.envs import API, EVAL_REQUESTS_PATH, QUEUE_REPO, REPO_ID, TOKEN, RESULTS_REPO_MIB_SUBGRAPH, EVAL_RESULTS_MIB_SUBGRAPH_PATH, RESULTS_REPO_MIB_CAUSALGRAPH, EVAL_RESULTS_MIB_CAUSALGRAPH_PATH from src.populate import get_evaluation_queue_df, get_leaderboard_df, get_leaderboard_df_mib_subgraph, get_leaderboard_df_mib_causalgraph from src.submission.submit import add_new_eval from gradio_leaderboard import SelectColumns, Leaderboard import pandas as pd from typing import List, Dict, Union, Optional, Any from dataclasses import fields class SmartSelectColumns(SelectColumns): """ Enhanced SelectColumns component for gradio_leaderboard with dynamic column filtering. """ def __init__( self, benchmark_keywords: Optional[List[str]] = None, model_keywords: Optional[List[str]] = None, column_mapping: Optional[Dict[str, str]] = None, initial_selected: Optional[List[str]] = None, **kwargs ): """ Initialize SmartSelectColumns with dynamic filtering. Args: benchmark_keywords: List of benchmark names to filter by (e.g., ["ioi", "mcqa"]) model_keywords: List of model names to filter by (e.g., ["llama3", "qwen2_5"]) column_mapping: Dict mapping actual column names to display names initial_selected: List of columns to show initially """ super().__init__(**kwargs) self.benchmark_keywords = benchmark_keywords or [] self.model_keywords = model_keywords or [] self.column_mapping = column_mapping or {} self.reverse_mapping = {v: k for k, v in self.column_mapping.items()} if column_mapping else {} self.initial_selected = initial_selected or [] def preprocess_value(self, x: List[str]) -> List[str]: """Transform selected display names back to actual column names.""" return [self.reverse_mapping.get(col, col) for col in x] def postprocess_value(self, y: List[str]) -> List[str]: """Transform actual column names to display names.""" return [self.column_mapping.get(col, col) for col in y] def get_filtered_groups(self, df: pd.DataFrame) -> Dict[str, List[str]]: """ Dynamically create column groups based on keywords. """ filtered_groups = {} # Create benchmark groups for benchmark in self.benchmark_keywords: matching_cols = [ col for col in df.columns if benchmark in col.lower() ] if matching_cols: group_name = f"Benchmark group for {benchmark}" filtered_groups[group_name] = [ self.column_mapping.get(col, col) for col in matching_cols ] # Create model groups for model in self.model_keywords: matching_cols = [ col for col in df.columns if model in col.lower() ] if matching_cols: group_name = f"Model group for {model}" filtered_groups[group_name] = [ self.column_mapping.get(col, col) for col in matching_cols ] return filtered_groups def update( self, value: Union[pd.DataFrame, Dict[str, List[str]], Any] ) -> Dict: """Update component with new values.""" if isinstance(value, pd.DataFrame): # Get all column names and convert to display names choices = [self.column_mapping.get(col, col) for col in value.columns] # Use initial selection or default columns selected = self.initial_selected if self.initial_selected else choices # Get dynamically filtered groups filtered_cols = self.get_filtered_groups(value) return { "choices": choices, "value": selected, "filtered_cols": filtered_cols } # Handle fields object if hasattr(value, '__dataclass_fields__'): field_names = [field.name for field in fields(value)] choices = [self.column_mapping.get(name, name) for name in field_names] return { "choices": choices, "value": self.initial_selected if self.initial_selected else choices } return super().update(value) def restart_space(): API.restart_space(repo_id=REPO_ID) ### Space initialisation try: # print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: # print(RESULTS_REPO_MIB_SUBGRAPH) snapshot_download( repo_id=RESULTS_REPO_MIB_SUBGRAPH, local_dir=EVAL_RESULTS_MIB_SUBGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: # print(RESULTS_REPO_MIB_CAUSALGRAPH) snapshot_download( repo_id=RESULTS_REPO_MIB_CAUSALGRAPH, local_dir=EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() LEADERBOARD_DF_MIB_SUBGRAPH = get_leaderboard_df_mib_subgraph(EVAL_RESULTS_MIB_SUBGRAPH_PATH, EVAL_REQUESTS_PATH, COLS_MIB_SUBGRAPH, BENCHMARK_COLS_MIB_SUBGRAPH) # LEADERBOARD_DF_MIB_CAUSALGRAPH = get_leaderboard_df_mib_causalgraph(EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, EVAL_REQUESTS_PATH, COLS_MIB_CAUSALGRAPH, BENCHMARK_COLS_MIB_CAUSALGRAPH) # In app.py, modify the LEADERBOARD initialization LEADERBOARD_DF_MIB_CAUSALGRAPH_DETAILED, LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED = get_leaderboard_df_mib_causalgraph( EVAL_RESULTS_MIB_CAUSALGRAPH_PATH, EVAL_REQUESTS_PATH, COLS_MIB_CAUSALGRAPH, BENCHMARK_COLS_MIB_CAUSALGRAPH ) # LEADERBOARD_DF = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) # LEADERBOARD_DF_MULTIMODAL = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS_MULTIMODAL, BENCHMARK_COLS_MULTIMODAL) ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) # def init_leaderboard_mib_subgraph(dataframe, track): # # print(f"init_leaderboard_mib: dataframe head before loc is {dataframe.head()}\n") # if dataframe is None or dataframe.empty: # raise ValueError("Leaderboard DataFrame is empty or None.") # # filter for correct track # # dataframe = dataframe.loc[dataframe["Track"] == track] # # print(f"init_leaderboard_mib: dataframe head after loc is {dataframe.head()}\n") # return Leaderboard( # value=dataframe, # datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], # select_columns=SelectColumns( # default_selection=[c.name for c in fields(AutoEvalColumn_mib_subgraph) if c.displayed_by_default], # cant_deselect=[c.name for c in fields(AutoEvalColumn_mib_subgraph) if c.never_hidden], # label="Select Columns to Display:", # ), # search_columns=["Method"], # Changed from AutoEvalColumn_mib_subgraph.model.name to "Method" # hide_columns=[c.name for c in fields(AutoEvalColumn_mib_subgraph) if c.hidden], # bool_checkboxgroup_label="Hide models", # interactive=False, # ) from src.about import TasksMib_Subgraph # def init_leaderboard_mib_subgraph(dataframe, track): # """Initialize the subgraph leaderboard with grouped column selection by benchmark.""" # if dataframe is None or dataframe.empty: # raise ValueError("Leaderboard DataFrame is empty or None.") # print("\nDebugging DataFrame columns:", dataframe.columns.tolist()) # # Create groups of columns by benchmark # benchmark_groups = [] # # For each benchmark in our TasksMib_Subgraph enum... # for task in TasksMib_Subgraph: # benchmark = task.value.benchmark # # Get all valid columns for this benchmark's models # benchmark_cols = [ # f"{benchmark}_{model}" # for model in task.value.models # if f"{benchmark}_{model}" in dataframe.columns # ] # if benchmark_cols: # Only add if we have valid columns # benchmark_groups.append(benchmark_cols) # print(f"\nBenchmark group for {benchmark}:", benchmark_cols) # # Create model groups as well # model_groups = [] # all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models)) # # For each unique model... # for model in all_models: # # Get all valid columns for this model across benchmarks # model_cols = [ # f"{task.value.benchmark}_{model}" # for task in TasksMib_Subgraph # if model in task.value.models # and f"{task.value.benchmark}_{model}" in dataframe.columns # ] # if model_cols: # Only add if we have valid columns # model_groups.append(model_cols) # print(f"\nModel group for {model}:", model_cols) # # Combine all groups # all_groups = benchmark_groups + model_groups # # Flatten groups for default selection (show everything initially) # all_columns = [col for group in all_groups for col in group] # print("\nAll available columns:", all_columns) # return Leaderboard( # value=dataframe, # datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], # select_columns=SelectColumns( # default_selection=all_columns, # Show all columns initially # label="Select Results:" # ), # search_columns=["Method"], # hide_columns=[], # interactive=False, # ) def init_leaderboard_mib_subgraph(dataframe, track): """Initialize the subgraph leaderboard with display names for better readability.""" if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") print("\nDebugging DataFrame columns:", dataframe.columns.tolist()) # First, create our display name mapping # This is like creating a translation dictionary between internal names and display names display_mapping = {} for task in TasksMib_Subgraph: for model in task.value.models: field_name = f"{task.value.benchmark}_{model}" display_name = f"{task.value.benchmark}({model})" display_mapping[field_name] = display_name # Now when creating benchmark groups, we'll use display names benchmark_groups = [] for task in TasksMib_Subgraph: benchmark = task.value.benchmark benchmark_cols = [ display_mapping[f"{benchmark}_{model}"] # Use display name from our mapping for model in task.value.models if f"{benchmark}_{model}" in dataframe.columns ] if benchmark_cols: benchmark_groups.append(benchmark_cols) print(f"\nBenchmark group for {benchmark}:", benchmark_cols) # Similarly for model groups model_groups = [] all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models)) for model in all_models: model_cols = [ display_mapping[f"{task.value.benchmark}_{model}"] # Use display name for task in TasksMib_Subgraph if model in task.value.models and f"{task.value.benchmark}_{model}" in dataframe.columns ] if model_cols: model_groups.append(model_cols) print(f"\nModel group for {model}:", model_cols) # Combine all groups using display names all_groups = benchmark_groups + model_groups all_columns = [col for group in all_groups for col in group] # Important: We need to rename our DataFrame columns to match display names renamed_df = dataframe.rename(columns=display_mapping) # return Leaderboard( # value=renamed_df, # Use DataFrame with display names # datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], # select_columns=SelectColumns( # default_selection=all_columns, # Now contains display names # label="Select Results:" # ), # search_columns=["Method"], # hide_columns=[], # interactive=False, # ) # Complete column groups for both benchmarks and models # Define keywords for filtering benchmark_keywords = ["ioi", "mcqa", "arithmetic_addition", "arithmetic_subtraction", "arc_easy", "arc_challenge"] model_keywords = ["qwen2_5", "gpt2", "gemma2", "llama3"] # # Optional: Define display names # mappings = { # "ioi_llama3": "IOI (LLaMA-3)", # "ioi_qwen2_5": "IOI (Qwen-2.5)", # "ioi_gpt2": "IOI (GPT-2)", # "ioi_gemma2": "IOI (Gemma-2)", # "mcqa_llama3": "MCQA (LLaMA-3)", # "mcqa_qwen2_5": "MCQA (Qwen-2.5)", # "mcqa_gemma2": "MCQA (Gemma-2)", # "arithmetic_addition_llama3": "Arithmetic Addition (LLaMA-3)", # "arithmetic_subtraction_llama3": "Arithmetic Subtraction (LLaMA-3)", # "arc_easy_llama3": "ARC Easy (LLaMA-3)", # "arc_easy_gemma2": "ARC Easy (Gemma-2)", # "arc_challenge_llama3": "ARC Challenge (LLaMA-3)", # "eval_name": "Evaluation Name", # "Method": "Method", # "Average": "Average Score" # } mappings = {} # Create SmartSelectColumns instance smart_columns = SmartSelectColumns( benchmark_keywords=benchmark_keywords, model_keywords=model_keywords, column_mapping=mappings, initial_selected=["Method", "Average"] ) # Create Leaderboard leaderboard = Leaderboard( value=renamed_df, datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], select_columns=smart_columns, search_columns=["Method"], hide_columns=[], interactive=False ) # def init_leaderboard_mib_subgraph(dataframe, track): # """Initialize the subgraph leaderboard with group-based column selection.""" # if dataframe is None or dataframe.empty: # raise ValueError("Leaderboard DataFrame is empty or None.") # print("\nDebugging DataFrame columns:", dataframe.columns.tolist()) # # Create selection mapping for benchmark groups # selection_mapping = {} # # Create benchmark groups with descriptive names # for task in TasksMib_Subgraph: # benchmark = task.value.benchmark # # Get all columns for this benchmark's models # benchmark_cols = [ # f"{benchmark}_{model}" # for model in task.value.models # if f"{benchmark}_{model}" in dataframe.columns # ] # if benchmark_cols: # # Use a descriptive group name as the key # group_name = f"Benchmark: {benchmark.upper()}" # selection_mapping[group_name] = benchmark_cols # print(f"\n{group_name} maps to:", benchmark_cols) # # Create model groups with descriptive names # all_models = list(set(model for task in TasksMib_Subgraph for model in task.value.models)) # for model in all_models: # # Get all columns for this model across benchmarks # model_cols = [ # f"{task.value.benchmark}_{model}" # for task in TasksMib_Subgraph # if model in task.value.models # and f"{task.value.benchmark}_{model}" in dataframe.columns # ] # if model_cols: # # Use a descriptive group name as the key # group_name = f"Model: {model}" # selection_mapping[group_name] = model_cols # print(f"\n{group_name} maps to:", model_cols) # # The selection options are the group names # selection_options = list(selection_mapping.keys()) # print("\nSelection options:", selection_options) # return Leaderboard( # value=dataframe, # datatype=[c.type for c in fields(AutoEvalColumn_mib_subgraph)], # select_columns=SelectColumns( # default_selection=selection_options, # Show all groups by default # label="Select Benchmark or Model Groups:" # ), # search_columns=["Method"], # hide_columns=[], # interactive=False, # ) def init_leaderboard_mib_causalgraph(dataframe, track): # print("Debugging column issues:") # print("\nActual DataFrame columns:") # print(dataframe.columns.tolist()) # print("\nExpected columns for Leaderboard:") expected_cols = [c.name for c in fields(AutoEvalColumn_mib_causalgraph)] # print(expected_cols) # print("\nMissing columns:") missing_cols = [col for col in expected_cols if col not in dataframe.columns] # print(missing_cols) # print("\nSample of DataFrame content:") # print(dataframe.head().to_string()) return Leaderboard( value=dataframe, datatype=[c.type for c in fields(AutoEvalColumn_mib_causalgraph)], select_columns=SelectColumns( default_selection=[c.name for c in fields(AutoEvalColumn_mib_causalgraph) if c.displayed_by_default], cant_deselect=[c.name for c in fields(AutoEvalColumn_mib_causalgraph) if c.never_hidden], label="Select Columns to Display:", ), search_columns=["Method"], hide_columns=[c.name for c in fields(AutoEvalColumn_mib_causalgraph) if c.hidden], bool_checkboxgroup_label="Hide models", interactive=False, ) def init_leaderboard_mib_causalgraph(dataframe, track): # print("Debugging column issues:") # print("\nActual DataFrame columns:") # print(dataframe.columns.tolist()) # Create only necessary columns return Leaderboard( value=dataframe, datatype=[c.type for c in fields(AutoEvalColumn_mib_causalgraph)], select_columns=SelectColumns( default_selection=["Method"], # Start with just Method column cant_deselect=["Method"], # Method column should always be visible label="Select Columns to Display:", ), search_columns=["Method"], hide_columns=[], bool_checkboxgroup_label="Hide models", interactive=False, ) def init_leaderboard(dataframe, track): if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") # filter for correct track dataframe = dataframe.loc[dataframe["Track"] == track] # print(f"\n\n\n dataframe is {dataframe}\n\n\n") return Leaderboard( value=dataframe, datatype=[c.type for c in fields(AutoEvalColumn)], select_columns=SelectColumns( default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default], cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden], label="Select Columns to Display:", ), search_columns=[AutoEvalColumn.model.name], hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden], bool_checkboxgroup_label="Hide models", interactive=False, ) def process_json(temp_file): if temp_file is None: return {} # Handle file upload try: file_path = temp_file.name if file_path.endswith('.gz'): with gzip.open(file_path, 'rt') as f: data = json.load(f) else: with open(file_path, 'r') as f: data = json.load(f) except Exception as e: raise gr.Error(f"Error processing file: {str(e)}") gr.Markdown("Upload successful!") return data demo = gr.Blocks(css=custom_css) with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") with gr.Tabs(elem_classes="tab-buttons") as tabs: # with gr.TabItem("Strict", elem_id="strict-benchmark-tab-table", id=0): # leaderboard = init_leaderboard(LEADERBOARD_DF, "strict") # with gr.TabItem("Strict-small", elem_id="strict-small-benchmark-tab-table", id=1): # leaderboard = init_leaderboard(LEADERBOARD_DF, "strict-small") # with gr.TabItem("Multimodal", elem_id="multimodal-benchmark-tab-table", id=2): # leaderboard = init_leaderboard(LEADERBOARD_DF_MULTIMODAL, "multimodal") # with gr.TabItem("📝 About", elem_id="llm-benchmark-tab-table", id=4): # gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") # with gr.TabItem("👶 Submit", elem_id="llm-benchmark-tab-table", id=5): # with gr.Column(): # with gr.Row(): # gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") # with gr.TabItem("Subgraph", elem_id="subgraph", id=0): # leaderboard = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH, "Subgraph") with gr.TabItem("Subgraph", elem_id="subgraph", id=0): # Add description for filters gr.Markdown(""" ### Filtering Options Use the dropdown menus below to filter results by specific tasks or models. You can combine filters to see specific task-model combinations. """) leaderboard = init_leaderboard_mib_subgraph(LEADERBOARD_DF_MIB_SUBGRAPH, "Subgraph") # Then modify the Causal Graph tab section with gr.TabItem("Causal Graph", elem_id="causalgraph", id=1): with gr.Tabs() as causalgraph_tabs: with gr.TabItem("Detailed View", id=0): leaderboard_detailed = init_leaderboard_mib_causalgraph( LEADERBOARD_DF_MIB_CAUSALGRAPH_DETAILED, "Causal Graph" ) with gr.TabItem("Aggregated View", id=1): leaderboard_aggregated = init_leaderboard_mib_causalgraph( LEADERBOARD_DF_MIB_CAUSALGRAPH_AGGREGATED, "Causal Graph" ) with gr.TabItem("Intervention Averaged", id=2): leaderboard_averaged = init_leaderboard_mib_causalgraph( LEADERBOARD_DF_MIB_CAUSALGRAPH_AVERAGED, "Causal Graph" ) # with gr.Row(): # with gr.Accordion("📙 Citation", open=False): # citation_button = gr.Textbox( # value=CITATION_BUTTON_TEXT, # label=CITATION_BUTTON_LABEL, # lines=20, # elem_id="citation-button", # show_copy_button=True, # ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=1800) scheduler.start() demo.launch(share=True, ssr_mode=False)