from fastapi import FastAPI | |
from fastapi.middleware.cors import CORSMiddleware | |
import gradio as gr | |
import time | |
import psutil | |
import tracemalloc | |
import gc | |
import pandas as pd | |
import dask.dataframe as dd | |
import polars as pl | |
import duckdb | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import io | |
import os | |
from PIL import Image | |
import numpy as np | |
import matplotlib | |
import wandb | |
from datasets import load_dataset | |
# Load dataset once at the start to avoid redundant requests | |
# dataset = load_dataset("Chendi/NYC_TAXI_FARE_CLEANED") | |
wandb.login(key=os.getenv("WANDB_API_KEY")) | |
wandb.init(project="billion-row-analysis", name="benchmarking") | |
dataset = load_dataset("AnnsKhan/jan_2024_nyc", split="train") | |
parquet_path = "jan_2024.parquet" | |
if not os.path.exists(parquet_path): | |
dataset.to_pandas().to_parquet(parquet_path) # Save to disk | |
os.environ["MODIN_ENGINE"] = "dask" | |
# Initialize FastAPI app | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Performance measurement function | |
def measure_performance(load_function, *args): | |
gc.collect() | |
tracemalloc.start() | |
start_time = time.time() | |
start_cpu = psutil.cpu_percent(interval=1) | |
total_memory = psutil.virtual_memory().total # Get total system memory | |
start_memory = psutil.Process().memory_info().rss / total_memory * 100 # Convert to percentage | |
data = load_function(*args) | |
end_memory = psutil.Process().memory_info().rss / total_memory * 100 # Convert to percentage | |
end_cpu = psutil.cpu_percent(interval=1) | |
end_time = time.time() | |
_, peak_memory = tracemalloc.get_traced_memory() | |
tracemalloc.stop() | |
peak_memory_percentage = peak_memory / total_memory * 100 # Convert to percentage | |
return data, end_time - start_time, max(end_cpu - start_cpu, 0), max(end_memory - start_memory, 0), peak_memory_percentage | |
# # Data loading functions | |
# def load_data_python_vectorized(): | |
# df = dataset["train"].to_pandas() | |
# num_cols = df.select_dtypes(include=['number']).columns | |
# np_data = {col: df[col].to_numpy() for col in num_cols} | |
# return np_data | |
# def load_data_pandas(): | |
# return dataset["train"].to_pandas() | |
# def load_data_dask(): | |
# return dd.from_pandas(dataset["train"].to_pandas(), npartitions=10) | |
# def load_data_polars(): | |
# return pl.from_pandas(dataset["train"].to_pandas()) | |
# def load_data_duckdb(): | |
# return duckdb.from_df(dataset["train"].to_pandas()) | |
# Data loading functions | |
def load_data_python_vectorized(): | |
df = pd.read_parquet(parquet_path) | |
# Convert numerical columns to NumPy arrays for vectorized operations | |
num_cols = df.select_dtypes(include=['number']).columns | |
np_data = {col: df[col].to_numpy() for col in num_cols} | |
return np_data | |
def load_data_pandas(): | |
return pd.read_parquet(parquet_path) | |
def load_data_dask(): | |
return dd.read_parquet(parquet_path) | |
def load_data_polars(): | |
return pl.read_parquet(parquet_path) | |
def load_data_duckdb(): | |
return duckdb.read_parquet(parquet_path) | |
# Loaders list | |
loaders = [ | |
(load_data_pandas, "Pandas"), | |
(load_data_dask, "Dask"), | |
(load_data_polars, "Polars"), | |
(load_data_duckdb, "DuckDB"), | |
(load_data_python_vectorized, "Python Vectorized"), | |
] | |
def run_benchmark(): | |
benchmark_results = [] | |
error_messages = [] | |
for loader, lib_name in loaders: | |
try: | |
data, load_time, cpu_load, mem_load, peak_mem_load = measure_performance(loader) | |
# Log metrics to Weights & Biases | |
wandb.log({ | |
"Library": lib_name, | |
"Load Time (s)": load_time, | |
"CPU Load (%)": cpu_load, | |
"Memory Load (%)": mem_load, | |
"Peak Memory (%)": peak_mem_load | |
}) | |
benchmark_results.append({ | |
"Library": lib_name, | |
"Load Time (s)": load_time, | |
"CPU Load (%)": cpu_load, | |
"Memory Load (%)": mem_load, | |
"Peak Memory (%)": peak_mem_load | |
}) | |
except Exception as e: | |
error_messages.append(f"{lib_name} Error: {str(e)}") | |
if error_messages: | |
return '\n'.join(error_messages), None | |
benchmark_df = pd.DataFrame(benchmark_results) | |
sns.set(style="whitegrid") | |
fig, axes = plt.subplots(2, 2, figsize=(14, 10)) | |
fig.suptitle("Benchmark Results", fontsize=16) | |
sns.barplot(x="Library", y="Load Time (s)", data=benchmark_df, ax=axes[0, 0]) | |
sns.barplot(x="Library", y="CPU Load (%)", data=benchmark_df, ax=axes[0, 1]) | |
sns.barplot(x="Library", y="Memory Load (%)", data=benchmark_df, ax=axes[1, 0]) | |
sns.barplot(x="Library", y="Peak Memory (%)", data=benchmark_df, ax=axes[1, 1]) | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | | | |
# Convert plot to an image and log it to wandb | |
image = | |
wandb.log({"Benchmark Results": wandb.Image(image)}) | |
image_array = np.array(image) | |
return benchmark_df.to_markdown(), image_array # Return NumPy array | |
matplotlib.use("Agg") | |
def explore_dataset(): | |
try: | |
df = pd.read_parquet(parquet_path) | |
# Convert float64 columns to float32 to reduce memory usage | |
for col in df.select_dtypes(include=['float64']).columns: | |
df[col] = df[col].astype('float32') | |
# If dataset is too large, sample 10% | |
if len(df) > 1_000_000: | |
df = df.sample(frac=0.5, random_state=42) | |
# Generate dataset summary | |
summary = df.describe(include='all').T | |
summary["missing_values"] = df.isnull().sum() | |
summary["unique_values"] = df.nunique() | |
summary_text = summary.to_markdown() | |
# Log dataset summary as text in Weights & Biases | |
wandb.log({"Dataset Summary": wandb.Html(summary_text)}) | |
# Prepare for visualization | |
fig, axes = plt.subplots(2, 2, figsize=(16, 10)) | |
fig.suptitle("Dataset Overview", fontsize=16) | |
# Plot data type distribution | |
data_types = df.dtypes.value_counts() | |
sns.barplot(x=data_types.index.astype(str), y=data_types.values, ax=axes[0, 0]) | |
axes[0, 0].set_title("Column Count by Data Type by AnnsKhan") | |
axes[0, 0].set_ylabel("Count") | |
axes[0, 0].set_xlabel("Column Type") | |
# Plot mean values of numeric columns | |
num_cols = df.select_dtypes(include=['number']).columns | |
if len(num_cols) > 0: | |
mean_values = df[num_cols].mean() | |
sns.barplot(x=mean_values.index, y=mean_values.values, ax=axes[0, 1]) | |
axes[0, 1].set_title("Mean Values of Numeric Columns") | |
axes[0, 1].set_xlabel("Column Name") | |
axes[0, 1].tick_params(axis='x', rotation=45) | |
# Log mean values to Weights & Biases | |
for col, mean_val in mean_values.items(): | |
wandb.log({f"Mean Values/{col}": mean_val}) | |
# Plot histogram for a selected numerical column | |
if len(num_cols) > 0: | |
selected_col = num_cols[0] # Choose the first numeric column | |
sns.histplot(df[selected_col], bins=30, kde=True, ax=axes[1, 0]) | |
axes[1, 0].set_title(f"Distribution of pick-up locations ID") | |
axes[1, 0].set_xlabel(selected_col) | |
axes[1, 0].set_ylabel("Frequency") | |
# Plot correlation heatmap | |
if len(num_cols) > 1: | |
corr = df[num_cols].corr() | |
sns.heatmap(corr, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5, ax=axes[1, 1]) | |
axes[1, 1].set_title("Correlation Heatmap") | |
# Save figure to buffer | |
buf = io.BytesIO() | |
plt.tight_layout() | |
plt.savefig(buf, format='png', bbox_inches='tight') | |
plt.close(fig) | | | |
# Convert figure to NumPy array | |
image = | |
image_array = np.array(image) | |
# Log image to Weights & Biases | |
wandb.log({"Dataset Overview": wandb.Image(image)}) | |
return summary_text, image_array | |
except Exception as e: | |
return f"Error loading data: {str(e)}", None | |
# Gradio interface setup | |
def gradio_interface(): | |
def run_and_plot(): | |
results, plot = run_benchmark() | |
return results, plot | |
def explore_data(): | |
summary, plot = explore_dataset() | |
return summary, plot | |
with gr.Blocks() as demo: | |
gr.Markdown("## Explore Dataset") | |
explore_button = gr.Button("Explore Data") | |
summary_text = gr.Textbox(label="Dataset Summary") | |
explore_image = gr.Image(label="Feature Distributions") | |, outputs=[summary_text, explore_image]) | |
gr.Markdown("## Benchmarking Different Data Loading Libraries") | |
run_button = gr.Button("Run Benchmark") | |
result_text = gr.Textbox(label="Benchmark Results") | |
plot_image = gr.Image(label="Performance Graph") | |, outputs=[result_text, plot_image]) | |
return demo | |
demo = gradio_interface() | |
# Run the Gradio app | |
demo.launch(share=False) # No need for share=True in VS Code, local access is sufficient | |