loghugging25's picture
app.py fixed
c8d26ac
raw
history blame
18.5 kB
from contextlib import ExitStack
from dataclasses import dataclass
from typing import List
import click
import os
import gradio as gr
import pandas as pd
import traceback
import glob
import json
from parse_results import build_results
@dataclass
class PlotConfig:
x_title: str
y_title: str
title: str
percentiles: List[float] = None
def check_file_exists(path, label=""):
if os.path.exists(path):
print(f"βœ… {label} file exists: {path}")
print(f" File size: {os.path.getsize(path)} bytes")
print(f" Absolute path: {os.path.abspath(path)}")
else:
print(f"❌ {label} file NOT found: {path}")
print(f" Current working directory: {os.getcwd()}")
print(f" Directory contents: {os.listdir(os.path.dirname(path) if os.path.dirname(path) else '.')}")
def run(from_results_dir, datasource, port):
print(f"πŸ’‘ Debug - from_results_dir: {from_results_dir}")
print(f"πŸ’‘ Debug - datasource: {datasource}")
print(f"πŸ’‘ Debug - current directory: {os.getcwd()}")
css = '''
.summary span {
font-size: 10px;
padding-top:0;
padding-bottom:0;
}
'''
summary_desc = '''
## Summary
This table shows the average of the metrics for each model and QPS rate.
The metrics are:
* Inter token latency: Time to generate a new output token for each user querying the system.
It translates as the β€œspeed” perceived by the end-user. We aim for at least 300 words per minute (average reading speed), so ITL<150ms
* Time to First Token: Time the user has to wait before seeing the first token of its answer.
Lower waiting time are essential for real-time interactions, less so for offline workloads.
* End-to-end latency: The overall time the system took to generate the full response to the user.
* Throughput: The number of tokens per second the system can generate across all requests
* Successful requests: The number of requests the system was able to honor in the benchmark timeframe
* Error rate: The percentage of requests that ended up in error, as the system could not process them in time or failed to process them.
'''
df_bench = pd.DataFrame()
line_plots_bench = []
column_mappings = {'inter_token_latency_ms_p90': 'ITL P90 (ms)', 'time_to_first_token_ms_p90': 'TTFT P90 (ms)',
'e2e_latency_ms_p90': 'E2E P90 (ms)', 'token_throughput_secs': 'Throughput (tokens/s)',
'successful_requests': 'Successful requests', 'error_rate': 'Error rate (%)', 'model': 'Model',
'rate': 'QPS', 'run_id': 'Run ID'}
default_df = pd.DataFrame.from_dict(
{"rate": [1, 2], "inter_token_latency_ms_p90": [10, 20],
"version": ["default", "default"],
"model": ["default", "default"]})
def load_demo(model_bench, percentiles):
return update_bench(model_bench, percentiles)
def update_bench(model, percentiles):
res = []
for plot in line_plots_bench:
if plot['config'].percentiles:
k = plot['metric'] + '_' + str(percentiles)
df_bench[plot['metric']] = df_bench[k] if k in df_bench.columns else 0
res.append(df_bench[(df_bench['model'] == model)])
return res + [summary_table()]
def summary_table() -> pd.DataFrame:
data = df_bench.groupby(['model', 'run_id', 'rate']).agg(
{'inter_token_latency_ms_p90': 'mean', 'time_to_first_token_ms_p90': 'mean',
'e2e_latency_ms_p90': 'mean', 'token_throughput_secs': 'mean',
'successful_requests': 'mean', 'error_rate': 'mean'}).reset_index()
data = data[
['run_id', 'model', 'rate', 'inter_token_latency_ms_p90', 'time_to_first_token_ms_p90',
'e2e_latency_ms_p90',
'token_throughput_secs']]
for metric in ['inter_token_latency_ms_p90', 'time_to_first_token_ms_p90', 'e2e_latency_ms_p90',
'token_throughput_secs']:
data[metric] = data[metric].apply(lambda x: f"{x:.2f}")
data = data.rename(
columns=column_mappings)
return data
def load_bench_results(source) -> pd.DataFrame:
data = pd.read_parquet(source)
# remove warmup and throughput
data = data[(data['id'] != 'warmup') & (data['id'] != 'throughput')]
# only keep constant rate
data = data[data['executor_type'] == 'ConstantArrivalRate']
return data
def select_region(selection: gr.SelectData, model):
min_w, max_w = selection.index
data = df_bench[(df_bench['model'] == model) & (df_bench['rate'] >= min_w) & (
df_bench['rate'] <= max_w)]
res = []
for plot in line_plots_bench:
# find the y values for the selected region
metric = plot["metric"]
y_min = data[metric].min()
y_max = data[metric].max()
res.append(gr.LinePlot(x_lim=[min_w, max_w], y_lim=[y_min, y_max]))
return res
def reset_region():
res = []
for _ in line_plots_bench:
res.append(gr.LinePlot(x_lim=None, y_lim=None))
return res
def load_datasource(datasource, fn):
print(f"πŸ’‘ Debug - load_datasource called with: {datasource}")
if datasource.startswith('file://'):
local_path = datasource[len('file://'):]
print(f"πŸ’‘ Debug - Extracted local path: {local_path}")
check_file_exists(local_path, "Local")
return fn(local_path)
elif datasource.startswith('s3://'):
return fn(datasource)
else:
# If no scheme is provided, assume it's a local path.
print(f"πŸ’‘ Debug - Using path as-is: {datasource}")
check_file_exists(datasource, "Direct")
return fn(datasource)
parquet_file_to_load = None
if from_results_dir is not None:
# If from_results_dir is specified, results are built into 'benchmarks.parquet'
# within that directory.
output_filename = 'benchmarks.parquet'
print(f"πŸ’‘ Debug - Building results from directory: {from_results_dir}")
# Check if results directory exists
check_file_exists(from_results_dir, "Results directory")
# Create absolute path for results directory
abs_results_dir = os.path.abspath(from_results_dir)
print(f"πŸ’‘ Debug - Absolute results directory: {abs_results_dir}")
# Create the results directory if it doesn't exist
if not os.path.exists(abs_results_dir):
print(f"πŸ’‘ Debug - Creating results directory: {abs_results_dir}")
os.makedirs(abs_results_dir, exist_ok=True)
# Call build_results with absolute paths
full_output_path = os.path.join(abs_results_dir, output_filename)
print(f"πŸ’‘ Debug - Expected output path: {full_output_path}")
build_results(abs_results_dir, output_filename, None)
# Check if the file was created
check_file_exists(full_output_path, "Generated parquet")
# The file to load is now in from_results_dir/output_filename
parquet_file_to_load = full_output_path
else:
# If not building from results_dir, use the provided datasource directly.
parquet_file_to_load = datasource
print(f"πŸ’‘ Debug - Final parquet_file_to_load: {parquet_file_to_load}")
# Load data
try:
df_bench = load_datasource(parquet_file_to_load, load_bench_results)
print(f"βœ… Successfully loaded data with {len(df_bench)} rows")
except Exception as e:
print(f"❌ Error loading data: {str(e)}")
print(f"Stack trace: {traceback.format_exc()}")
# Create a minimal DataFrame to prevent further errors
df_bench = pd.DataFrame({
"model": ["error"],
"run_id": ["error"],
"rate": [0],
"inter_token_latency_ms_p90": [0],
"time_to_first_token_ms_p90": [0],
"e2e_latency_ms_p90": [0],
"token_throughput_secs": [0],
"successful_requests": [0],
"error_rate": [0]
})
# Define metrics
metrics = {
"inter_token_latency_ms": PlotConfig(title="Inter Token Latency (lower is better)", x_title="QPS",
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
"time_to_first_token_ms": PlotConfig(title="TTFT (lower is better)", x_title="QPS",
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
"e2e_latency_ms": PlotConfig(title="End to End Latency (lower is better)", x_title="QPS",
y_title="Time (ms)", percentiles=[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]),
"token_throughput_secs": PlotConfig(title="Request Output Throughput (higher is better)", x_title="QPS",
y_title="Tokens/s"),
"successful_requests": PlotConfig(title="Successful requests (higher is better)", x_title="QPS",
y_title="Count"),
"error_rate": PlotConfig(title="Error rate", x_title="QPS", y_title="%"),
"prompt_tokens": PlotConfig(title="Prompt tokens", x_title="QPS", y_title="Count"),
"decoded_tokens": PlotConfig(title="Decoded tokens", x_title="QPS", y_title="Count")
}
models = df_bench["model"].unique()
run_ids = df_bench["run_id"].unique()
# get all available percentiles
percentiles = set()
for k, v in metrics.items():
if v.percentiles:
percentiles.update(v.percentiles)
percentiles = map(lambda p: f'p{int(float(p) * 100)}', percentiles)
percentiles = sorted(list(percentiles))
percentiles.append('avg')
with gr.Blocks(css=css, title="Inference Benchmarker") as demo:
with gr.Row():
gr.Markdown("# Inference-benchmarker πŸ€—\n## Benchmarks results")
with gr.Row():
gr.Markdown(summary_desc)
with gr.Row():
table = gr.DataFrame(
pd.DataFrame(),
elem_classes=["summary"],
)
with gr.Row():
details_desc = gr.Markdown("## Details")
with gr.Row():
model = gr.Dropdown(list(models), label="Select model", value=models[0])
with gr.Row():
percentiles_bench = gr.Radio(percentiles, label="", value="avg")
i = 0
with ExitStack() as stack:
for k, v in metrics.items():
if i % 2 == 0:
stack.close()
gs = stack.enter_context(gr.Row())
line_plots_bench.append(
{"component": gr.LinePlot(default_df, label=f'{v.title}', x="rate", y=k,
y_title=v.y_title, x_title=v.x_title,
color="run_id"
),
"model": model.value,
"metric": k,
"config": v
},
)
i += 1
for component in [model, percentiles_bench]:
component.change(update_bench, [model, percentiles_bench],
[item["component"] for item in line_plots_bench] + [table])
gr.on([plot["component"].select for plot in line_plots_bench], select_region, [model],
outputs=[item["component"] for item in line_plots_bench])
gr.on([plot["component"].double_click for plot in line_plots_bench], reset_region, None,
outputs=[item["component"] for item in line_plots_bench])
demo.load(load_demo, [model, percentiles_bench],
[item["component"] for item in line_plots_bench] + [table])
demo.launch(server_port=port, server_name="0.0.0.0")
@click.command()
@click.option('--from-results-dir', 'cli_from_results_dir', default=None, help='Load inference-benchmarker results from this directory. Overrides DASHBOARD_FROM_RESULTS_DIR.')
@click.option('--datasource', 'cli_datasource', default='file://benchmarks.parquet', help='Load this Parquet file directly if not building from a results directory.')
@click.option('--port', default=7860, help='Port to run the dashboard')
def main(cli_from_results_dir, cli_datasource, port):
print("===== Starting Application =====")
# print(f"Environment variables: {os.environ}") # Already in user's code or logs
# Determine the directory from which to process JSON results
# Priority: 1. CLI option, 2. Env Var, 3. Default to 'results' dir
processing_dir = cli_from_results_dir
if processing_dir is None:
env_var_value = os.environ.get('DASHBOARD_FROM_RESULTS_DIR')
if env_var_value:
print(f"Using environment variable DASHBOARD_FROM_RESULTS_DIR='{env_var_value}' for processing.")
processing_dir = env_var_value
elif os.path.exists('results') and os.path.isdir('results'):
print(f"No --from-results-dir option or DASHBOARD_FROM_RESULTS_DIR env var. Defaulting to 'results' directory for processing as it exists.")
processing_dir = 'results'
else:
print(f"No directory specified for processing (no --from-results-dir, no DASHBOARD_FROM_RESULTS_DIR env var, and 'results' dir not found).")
# processing_dir remains None
path_to_load_by_run_function = None # This will be the path to the .parquet file
if processing_dir:
# A directory for processing JSONs has been determined.
# Use the existing logic to build/fallback and generate benchmarks.parquet.
output_filename = 'benchmarks.parquet'
abs_processing_dir = os.path.abspath(processing_dir)
print(f"πŸ’‘ Debug - Will process JSONs from directory: {abs_processing_dir}")
check_file_exists(abs_processing_dir, "Source directory for JSONs")
# Ensure the directory exists (it might be 'results' or user-provided)
# build_results might expect the output directory to exist.
if not os.path.exists(abs_processing_dir):
print(f"πŸ’‘ Debug - Creating directory for processing/output: {abs_processing_dir}")
os.makedirs(abs_processing_dir, exist_ok=True)
# The generated parquet file will be placed inside the abs_processing_dir
generated_parquet_filepath = os.path.join(abs_processing_dir, output_filename)
print(f"πŸ’‘ Debug - Expected path for generated parquet file: {generated_parquet_filepath}")
try:
build_results(abs_processing_dir, output_filename, None) # output_filename is relative to abs_processing_dir
print("βœ… Build results completed using build_results.")
except Exception as e_build:
print(f"❌ Error in build_results: {str(e_build)}")
print(f"Stack trace: {traceback.format_exc()}")
print("⚠️ Attempting fallback method: direct JSON processing")
try:
json_files = glob.glob(os.path.join(abs_processing_dir, "*.json"))
print(f"Found {len(json_files)} JSON files for fallback: {json_files}")
if not json_files:
raise FileNotFoundError("Fallback: No JSON files found in results directory")
combined_data = []
for json_file in json_files:
try:
with open(json_file, 'r') as f:
data = json.load(f)
filename = os.path.basename(json_file)
model_name_parts = filename.split('_')
model_name = f"{model_name_parts[0]}_{model_name_parts[1]}" if len(model_name_parts) > 1 else model_name_parts[0]
if 'benchmarks' in data:
for benchmark in data['benchmarks']:
benchmark['model'] = model_name
benchmark['run_id'] = os.path.splitext(filename)[0]
combined_data.append(benchmark)
else:
print(f"⚠️ Fallback: No 'benchmarks' key in {json_file}")
except Exception as json_err:
print(f"❌ Fallback: Error processing {json_file}: {str(json_err)}")
if combined_data:
df_direct = pd.DataFrame(combined_data)
df_direct.to_parquet(generated_parquet_filepath)
print(f"βœ… Created parquet file via fallback method: {generated_parquet_filepath}")
else:
raise ValueError("Fallback: No data could be extracted from JSON files")
except Exception as e_fallback:
print(f"❌ Fallback method failed: {str(e_fallback)}")
print(f"Stack trace: {traceback.format_exc()}")
# After attempting to build/generate, check if the file exists
check_file_exists(generated_parquet_filepath, "Parquet file after build/fallback attempts")
if os.path.exists(generated_parquet_filepath):
path_to_load_by_run_function = generated_parquet_filepath
else:
print(f"❌ CRITICAL: Failed to generate or find parquet file at '{generated_parquet_filepath}' after all attempts.")
# path_to_load_by_run_function remains None here, will be handled below.
# If path_to_load_by_run_function is still None at this point
# (either because processing_dir was not set, or all generation attempts failed),
# default to the original cli_datasource.
if path_to_load_by_run_function is None:
print(f"⚠️ Defaulting to cli_datasource '{cli_datasource}' as parquet generation failed or was skipped.")
path_to_load_by_run_function = cli_datasource
print(f"πŸ’‘ Final path to be loaded by run() function: '{path_to_load_by_run_function}'")
# Call run(). The first argument (from_results_dir for run()) is None because main handles processing.
# The second argument (datasource for run()) is the actual file path to load.
run(None, path_to_load_by_run_function, port)
if __name__ == '__main__':
main(auto_envvar_prefix='DASHBOARD')