|
from contextlib import ExitStack |
|
from dataclasses import dataclass |
|
from typing import List |
|
|
|
import click |
|
import os |
|
import gradio as gr |
|
import pandas as pd |
|
|
|
import traceback |
|
import glob |
|
import json |
|
|
|
from parse_results import build_results |
|
|
|
|
|
@dataclass |
|
class PlotConfig: |
|
x_title: str |
|
y_title: str |
|
title: str |
|
percentiles: List[float] = None |
|
|
|
def check_file_exists(path, label=""): |
|
if os.path.exists(path): |
|
print(f"β
{label} file exists: {path}") |
|
print(f" File size: {os.path.getsize(path)} bytes") |
|
print(f" Absolute path: {os.path.abspath(path)}") |
|
else: |
|
print(f"β {label} file NOT found: {path}") |
|
print(f" Current working directory: {os.getcwd()}") |
|
print(f" Directory contents: {os.listdir(os.path.dirname(path) if os.path.dirname(path) else '.')}") |
|
|
|
|
|
def run(from_results_dir, datasource, port): |
|
print(f"π‘ Debug - from_results_dir: {from_results_dir}") |
|
print(f"π‘ Debug - datasource: {datasource}") |
|
print(f"π‘ Debug - current directory: {os.getcwd()}") |
|
|
|
css = ''' |
|
.summary span { |
|
font-size: 10px; |
|
padding-top:0; |
|
padding-bottom:0; |
|
} |
|
''' |
|
|
|
summary_desc = ''' |
|
## Summary |
|
This table shows the average of the metrics for each model and QPS rate. |
|
|
|
The metrics are: |
|
* Inter token latency: Time to generate a new output token for each user querying the system. |
|
It translates as the βspeedβ perceived by the end-user. We aim for at least 300 words per minute (average reading speed), so ITL<150ms |
|
* Time to First Token: Time the user has to wait before seeing the first token of its answer. |
|
Lower waiting time are essential for real-time interactions, less so for offline workloads. |
|
* End-to-end latency: The overall time the system took to generate the full response to the user. |
|
* Throughput: The number of tokens per second the system can generate across all requests |
|
* Successful requests: The number of requests the system was able to honor in the benchmark timeframe |
|
* Error rate: The percentage of requests that ended up in error, as the system could not process them in time or failed to process them. |
|
|
|
''' |
|
|
|
df_bench = pd.DataFrame() |
|
line_plots_bench = [] |
|
column_mappings = {'inter_token_latency_ms_p90': 'ITL P90 (ms)', 'time_to_first_token_ms_p90': 'TTFT P90 (ms)', |
|
'e2e_latency_ms_p90': 'E2E P90 (ms)', 'token_throughput_secs': 'Throughput (tokens/s)', |
|
'successful_requests': 'Successful requests', 'error_rate': 'Error rate (%)', 'model': 'Model', |
|
'rate': 'QPS', 'run_id': 'Run ID'} |
|
default_df = pd.DataFrame.from_dict( |
|
{"rate": [1, 2], "inter_token_latency_ms_p90": [10, 20], |
|
"version": ["default", "default"], |
|
"model": ["default", "default"]}) |
|
|
|
def load_demo(model_bench, percentiles): |
|
return update_bench(model_bench, percentiles) |
|
|
|
def update_bench(model, percentiles): |
|
res = [] |
|
for plot in line_plots_bench: |
|
if plot['config'].percentiles: |
|
k = plot['metric'] + '_' + str(percentiles) |
|
df_bench[plot['metric']] = df_bench[k] if k in df_bench.columns else 0 |
|
res.append(df_bench[(df_bench['model'] == model)]) |
|
|
|
return res + [summary_table()] |
|
|
|
def summary_table() -> pd.DataFrame: |
|
data = df_bench.groupby(['model', 'run_id', 'rate']).agg( |
|
{'inter_token_latency_ms_p90': 'mean', 'time_to_first_token_ms_p90': 'mean', |
|
'e2e_latency_ms_p90': 'mean', 'token_throughput_secs': 'mean', |
|
'successful_requests': 'mean', 'error_rate': 'mean'}).reset_index() |
|
data = data[ |
|
['run_id', 'model', 'rate', 'inter_token_latency_ms_p90', 'time_to_first_token_ms_p90', |
|
'e2e_latency_ms_p90', |
|
'token_throughput_secs']] |
|
for metric in ['inter_token_latency_ms_p90', 'time_to_first_token_ms_p90', 'e2e_latency_ms_p90', |
|
'token_throughput_secs']: |
|
data[metric] = data[metric].apply(lambda x: f"{x:.2f}") |
|
data = data.rename( |
|
columns=column_mappings) |
|
return data |
|
|
|
def load_bench_results(source) -> pd.DataFrame: |
|
data = pd.read_parquet(source) |
|
|
|
data = data[(data['id'] != 'warmup') & (data['id'] != 'throughput')] |
|
|
|
data = data[data['executor_type'] == 'ConstantArrivalRate'] |
|
return data |
|
|
|
def select_region(selection: gr.SelectData, model): |
|
min_w, max_w = selection.index |
|
data = df_bench[(df_bench['model'] == model) & (df_bench['rate'] >= min_w) & ( |
|
df_bench['rate'] <= max_w)] |
|
res = [] |
|
for plot in line_plots_bench: |
|
|
|
metric = plot["metric"] |
|
y_min = data[metric].min() |
|
y_max = data[metric].max() |
|
res.append(gr.LinePlot(x_lim=[min_w, max_w], y_lim=[y_min, y_max])) |
|
return res |
|
|
|
def reset_region(): |
|
res = [] |
|
for _ in line_plots_bench: |
|
res.append(gr.LinePlot(x_lim=None, y_lim=None)) |
|
return res |
|
|
|
def load_datasource(datasource, fn): |
|
print(f"π‘ Debug - load_datasource called with: {datasource}") |
|
if datasource.startswith('file://'): |
|
local_path = datasource[len('file://'):] |
|
print(f"π‘ Debug - Extracted local path: {local_path}") |
|
check_file_exists(local_path, "Local") |
|
return fn(local_path) |
|
elif datasource.startswith('s3://'): |
|
return fn(datasource) |
|
else: |
|
|
|
print(f"π‘ Debug - Using path as-is: {datasource}") |
|
check_file_exists(datasource, "Direct") |
|
return fn(datasource) |
|
|
|
parquet_file_to_load = None |
|
|
|
if from_results_dir is not None: |
|
|
|
|
|
output_filename = 'benchmarks.parquet' |
|
print(f"π‘ Debug - Building results from directory: {from_results_dir}") |
|
|
|
|
|
check_file_exists(from_results_dir, "Results directory") |
|
|
|
|
|
abs_results_dir = os.path.abspath(from_results_dir) |
|
print(f"π‘ Debug - Absolute results directory: {abs_results_dir}") |
|
|
|
|
|
if not os.path.exists(abs_results_dir): |
|
print(f"π‘ Debug - Creating results directory: {abs_results_dir}") |
|
os.makedirs(abs_results_dir, exist_ok=True) |
|
|
|
|
|
full_output_path = os.path.join(abs_results_dir, output_filename) |
|
print(f"π‘ Debug - Expected output path: {full_output_path}") |
|
|
|
build_results(abs_results_dir, output_filename, None) |
|
|
|
|
|
check_file_exists(full_output_path, "Generated parquet") |
|
|
|
|
|
parquet_file_to_load = full_output_path |
|
else: |
|
|
|
parquet_file_to_load = datasource |
|
|
|
print(f"π‘ Debug - Final parquet_file_to_load: {parquet_file_to_load}") |
|
|
|
|
|
try: |
|
df_bench = load_datasource(parquet_file_to_load, load_bench_results) |
|
print(f"β
Successfully loaded data with {len(df_bench)} rows") |
|
except Exception as e: |
|
print(f"β Error loading data: {str(e)}") |
|
print(f"Stack trace: {traceback.format_exc()}") |
|
|
|
df_bench = pd.DataFrame({ |
|
"model": ["error"], |
|
"run_id": ["error"], |
|
"rate": [0], |
|
"inter_token_latency_ms_p90": [0], |
|
"time_to_first_token_ms_p90": [0], |
|
"e2e_latency_ms_p90": [0], |
|
"token_throughput_secs": [0], |
|
"successful_requests": [0], |
|
"error_rate": [0] |
|
}) |
|
|
|
|
|
metrics = { |
|
"inter_token_latency_ms": PlotConfig(title="Inter Token Latency (lower is better)", x_title="QPS", |
|
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]), |
|
"time_to_first_token_ms": PlotConfig(title="TTFT (lower is better)", x_title="QPS", |
|
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]), |
|
"e2e_latency_ms": PlotConfig(title="End to End Latency (lower is better)", x_title="QPS", |
|
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]), |
|
"token_throughput_secs": PlotConfig(title="Request Output Throughput (higher is better)", x_title="QPS", |
|
y_title="Tokens/s"), |
|
"successful_requests": PlotConfig(title="Successful requests (higher is better)", x_title="QPS", |
|
y_title="Count"), |
|
"error_rate": PlotConfig(title="Error rate", x_title="QPS", y_title="%"), |
|
"prompt_tokens": PlotConfig(title="Prompt tokens", x_title="QPS", y_title="Count"), |
|
"decoded_tokens": PlotConfig(title="Decoded tokens", x_title="QPS", y_title="Count") |
|
} |
|
|
|
models = df_bench["model"].unique() |
|
run_ids = df_bench["run_id"].unique() |
|
|
|
|
|
percentiles = set() |
|
for k, v in metrics.items(): |
|
if v.percentiles: |
|
percentiles.update(v.percentiles) |
|
percentiles = map(lambda p: f'p{int(float(p) * 100)}', percentiles) |
|
percentiles = sorted(list(percentiles)) |
|
percentiles.append('avg') |
|
with gr.Blocks(css=css, title="Inference Benchmarker") as demo: |
|
with gr.Row(): |
|
gr.Markdown("# Inference-benchmarker π€\n## Benchmarks results") |
|
with gr.Row(): |
|
gr.Markdown(summary_desc) |
|
with gr.Row(): |
|
table = gr.DataFrame( |
|
pd.DataFrame(), |
|
elem_classes=["summary"], |
|
) |
|
with gr.Row(): |
|
details_desc = gr.Markdown("## Details") |
|
with gr.Row(): |
|
model = gr.Dropdown(list(models), label="Select model", value=models[0]) |
|
with gr.Row(): |
|
percentiles_bench = gr.Radio(percentiles, label="", value="avg") |
|
i = 0 |
|
with ExitStack() as stack: |
|
for k, v in metrics.items(): |
|
if i % 2 == 0: |
|
stack.close() |
|
gs = stack.enter_context(gr.Row()) |
|
line_plots_bench.append( |
|
{"component": gr.LinePlot(default_df, label=f'{v.title}', x="rate", y=k, |
|
y_title=v.y_title, x_title=v.x_title, |
|
color="run_id" |
|
), |
|
"model": model.value, |
|
"metric": k, |
|
"config": v |
|
}, |
|
) |
|
i += 1 |
|
|
|
for component in [model, percentiles_bench]: |
|
component.change(update_bench, [model, percentiles_bench], |
|
[item["component"] for item in line_plots_bench] + [table]) |
|
gr.on([plot["component"].select for plot in line_plots_bench], select_region, [model], |
|
outputs=[item["component"] for item in line_plots_bench]) |
|
gr.on([plot["component"].double_click for plot in line_plots_bench], reset_region, None, |
|
outputs=[item["component"] for item in line_plots_bench]) |
|
demo.load(load_demo, [model, percentiles_bench], |
|
[item["component"] for item in line_plots_bench] + [table]) |
|
|
|
demo.launch(server_port=port, server_name="0.0.0.0") |
|
|
|
|
|
@click.command() |
|
@click.option('--from-results-dir', 'cli_from_results_dir', default=None, help='Load inference-benchmarker results from this directory. Overrides DASHBOARD_FROM_RESULTS_DIR.') |
|
@click.option('--datasource', 'cli_datasource', default='file://benchmarks.parquet', help='Load this Parquet file directly if not building from a results directory.') |
|
@click.option('--port', default=7860, help='Port to run the dashboard') |
|
def main(cli_from_results_dir, cli_datasource, port): |
|
print("===== Starting Application =====") |
|
|
|
|
|
|
|
|
|
processing_dir = cli_from_results_dir |
|
|
|
if processing_dir is None: |
|
env_var_value = os.environ.get('DASHBOARD_FROM_RESULTS_DIR') |
|
if env_var_value: |
|
print(f"Using environment variable DASHBOARD_FROM_RESULTS_DIR='{env_var_value}' for processing.") |
|
processing_dir = env_var_value |
|
elif os.path.exists('results') and os.path.isdir('results'): |
|
print(f"No --from-results-dir option or DASHBOARD_FROM_RESULTS_DIR env var. Defaulting to 'results' directory for processing as it exists.") |
|
processing_dir = 'results' |
|
else: |
|
print(f"No directory specified for processing (no --from-results-dir, no DASHBOARD_FROM_RESULTS_DIR env var, and 'results' dir not found).") |
|
|
|
|
|
path_to_load_by_run_function = None |
|
|
|
if processing_dir: |
|
|
|
|
|
output_filename = 'benchmarks.parquet' |
|
abs_processing_dir = os.path.abspath(processing_dir) |
|
|
|
print(f"π‘ Debug - Will process JSONs from directory: {abs_processing_dir}") |
|
check_file_exists(abs_processing_dir, "Source directory for JSONs") |
|
|
|
|
|
|
|
if not os.path.exists(abs_processing_dir): |
|
print(f"π‘ Debug - Creating directory for processing/output: {abs_processing_dir}") |
|
os.makedirs(abs_processing_dir, exist_ok=True) |
|
|
|
|
|
generated_parquet_filepath = os.path.join(abs_processing_dir, output_filename) |
|
print(f"π‘ Debug - Expected path for generated parquet file: {generated_parquet_filepath}") |
|
|
|
try: |
|
build_results(abs_processing_dir, output_filename, None) |
|
print("β
Build results completed using build_results.") |
|
except Exception as e_build: |
|
print(f"β Error in build_results: {str(e_build)}") |
|
print(f"Stack trace: {traceback.format_exc()}") |
|
print("β οΈ Attempting fallback method: direct JSON processing") |
|
try: |
|
json_files = glob.glob(os.path.join(abs_processing_dir, "*.json")) |
|
print(f"Found {len(json_files)} JSON files for fallback: {json_files}") |
|
if not json_files: |
|
raise FileNotFoundError("Fallback: No JSON files found in results directory") |
|
|
|
combined_data = [] |
|
for json_file in json_files: |
|
try: |
|
with open(json_file, 'r') as f: |
|
data = json.load(f) |
|
filename = os.path.basename(json_file) |
|
model_name_parts = filename.split('_') |
|
model_name = f"{model_name_parts[0]}_{model_name_parts[1]}" if len(model_name_parts) > 1 else model_name_parts[0] |
|
|
|
if 'benchmarks' in data: |
|
for benchmark in data['benchmarks']: |
|
benchmark['model'] = model_name |
|
benchmark['run_id'] = os.path.splitext(filename)[0] |
|
combined_data.append(benchmark) |
|
else: |
|
print(f"β οΈ Fallback: No 'benchmarks' key in {json_file}") |
|
except Exception as json_err: |
|
print(f"β Fallback: Error processing {json_file}: {str(json_err)}") |
|
|
|
if combined_data: |
|
df_direct = pd.DataFrame(combined_data) |
|
df_direct.to_parquet(generated_parquet_filepath) |
|
print(f"β
Created parquet file via fallback method: {generated_parquet_filepath}") |
|
else: |
|
raise ValueError("Fallback: No data could be extracted from JSON files") |
|
except Exception as e_fallback: |
|
print(f"β Fallback method failed: {str(e_fallback)}") |
|
print(f"Stack trace: {traceback.format_exc()}") |
|
|
|
|
|
check_file_exists(generated_parquet_filepath, "Parquet file after build/fallback attempts") |
|
if os.path.exists(generated_parquet_filepath): |
|
path_to_load_by_run_function = generated_parquet_filepath |
|
else: |
|
print(f"β CRITICAL: Failed to generate or find parquet file at '{generated_parquet_filepath}' after all attempts.") |
|
|
|
|
|
|
|
|
|
|
|
if path_to_load_by_run_function is None: |
|
print(f"β οΈ Defaulting to cli_datasource '{cli_datasource}' as parquet generation failed or was skipped.") |
|
path_to_load_by_run_function = cli_datasource |
|
|
|
print(f"π‘ Final path to be loaded by run() function: '{path_to_load_by_run_function}'") |
|
|
|
|
|
|
|
run(None, path_to_load_by_run_function, port) |
|
|
|
|
|
if __name__ == '__main__': |
|
main(auto_envvar_prefix='DASHBOARD') |
|
|