File size: 9,302 Bytes
b469f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edcd8c8
3551b24
 
d7a9790
b469f9b
5717ef0
b469f9b
edcd8c8
26aa82d
edcd8c8
 
b469f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d7a9790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b469f9b
 
05c5acf
d7a9790
 
b469f9b
 
 
 
 
05c5acf
b469f9b
 
05c5acf
b469f9b
 
05c5acf
b469f9b
 
05c5acf
b469f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c761ff8
b30b471
 
 
 
 
 
 
 
 
 
b469f9b
 
 
 
 
b30b471
b469f9b
 
b30b471
 
b469f9b
 
b30b471
b469f9b
f666dcd
4ef1dbe
f666dcd
 
 
b30b471
b469f9b
 
 
f666dcd
 
 
 
 
b30b471
b469f9b
 
 
b30b471
f666dcd
b30b471
 
 
 
 
 
 
 
 
 
 
 
 
b469f9b
 
 
 
 
b30b471
 
b469f9b
 
b30b471
 
b469f9b
b30b471
b469f9b
b30b471
b469f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import gradio as gr
import time
import psutil
import tracemalloc
import gc
import pandas as pd
import dask.dataframe as dd
import polars as pl
import duckdb
import seaborn as sns
import matplotlib.pyplot as plt
import io
import os
from PIL import Image
import numpy as np
import matplotlib
import wandb
from datasets import load_dataset

# Load dataset once at the start to avoid redundant requests
# dataset = load_dataset("Chendi/NYC_TAXI_FARE_CLEANED")

wandb.login(key=os.getenv("WANDB_API_KEY"))
wandb.init(project="billion-row-analysis", name="benchmarking")
dataset = load_dataset("AnnsKhan/jan_2024_nyc", split="train")
parquet_path = "jan_2024.parquet"
if not os.path.exists(parquet_path):
     dataset.to_pandas().to_parquet(parquet_path)  # Save to disk
os.environ["MODIN_ENGINE"] = "dask"

# Initialize FastAPI app
app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Performance measurement function
def measure_performance(load_function, *args):
    gc.collect()
    tracemalloc.start()
    start_time = time.time()
    start_cpu = psutil.cpu_percent(interval=1)
    
    total_memory = psutil.virtual_memory().total  # Get total system memory
    
    start_memory = psutil.Process().memory_info().rss / total_memory * 100  # Convert to percentage
    data = load_function(*args)
    end_memory = psutil.Process().memory_info().rss / total_memory * 100  # Convert to percentage
    
    end_cpu = psutil.cpu_percent(interval=1)
    end_time = time.time()
    
    _, peak_memory = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    peak_memory_percentage = peak_memory / total_memory * 100  # Convert to percentage
    
    return data, end_time - start_time, max(end_cpu - start_cpu, 0), max(end_memory - start_memory, 0), peak_memory_percentage

# # Data loading functions
# def load_data_python_vectorized():
#     df = dataset["train"].to_pandas()
#     num_cols = df.select_dtypes(include=['number']).columns
#     np_data = {col: df[col].to_numpy() for col in num_cols}
#     return np_data

# def load_data_pandas():
#     return dataset["train"].to_pandas()

# def load_data_dask():
#     return dd.from_pandas(dataset["train"].to_pandas(), npartitions=10)

# def load_data_polars():
#     return pl.from_pandas(dataset["train"].to_pandas())

# def load_data_duckdb():
#     return duckdb.from_df(dataset["train"].to_pandas())

# Data loading functions
def load_data_python_vectorized():
    df = pd.read_parquet(parquet_path)
    
    # Convert numerical columns to NumPy arrays for vectorized operations
    num_cols = df.select_dtypes(include=['number']).columns
    np_data = {col: df[col].to_numpy() for col in num_cols}
    return np_data

def load_data_pandas():
    return pd.read_parquet(parquet_path)

def load_data_dask():
    return dd.read_parquet(parquet_path)

def load_data_polars():
    return pl.read_parquet(parquet_path)

def load_data_duckdb():
    return duckdb.read_parquet(parquet_path)

# Loaders list
loaders = [
    (load_data_pandas, "Pandas"),
    (load_data_dask, "Dask"),
    (load_data_polars, "Polars"),
    (load_data_duckdb, "DuckDB"),
    (load_data_python_vectorized, "Python Vectorized"),
]

def run_benchmark():
    benchmark_results = []
    error_messages = []
    
    for loader, lib_name in loaders:
        try:
            data, load_time, cpu_load, mem_load, peak_mem_load = measure_performance(loader)

            # Log metrics to Weights & Biases
            wandb.log({
                "Library": lib_name,
                "Load Time (s)": load_time,
                "CPU Load (%)": cpu_load,
                "Memory Load (%)": mem_load,
                "Peak Memory (%)": peak_mem_load
            })

            benchmark_results.append({
                "Library": lib_name,
                "Load Time (s)": load_time,
                "CPU Load (%)": cpu_load,
                "Memory Load (%)": mem_load,
                "Peak Memory (%)": peak_mem_load
            })

        except Exception as e:
            error_messages.append(f"{lib_name} Error: {str(e)}")

    if error_messages:
        return '\n'.join(error_messages), None

    benchmark_df = pd.DataFrame(benchmark_results)

    sns.set(style="whitegrid")
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle("Benchmark Results", fontsize=16)

    sns.barplot(x="Library", y="Load Time (s)", data=benchmark_df, ax=axes[0, 0])
    sns.barplot(x="Library", y="CPU Load (%)", data=benchmark_df, ax=axes[0, 1])
    sns.barplot(x="Library", y="Memory Load (%)", data=benchmark_df, ax=axes[1, 0])
    sns.barplot(x="Library", y="Peak Memory (%)", data=benchmark_df, ax=axes[1, 1])

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    
    # Convert plot to an image and log it to wandb
    image = Image.open(buf)
    wandb.log({"Benchmark Results": wandb.Image(image)})

    image_array = np.array(image)

    return benchmark_df.to_markdown(), image_array  # Return NumPy array


matplotlib.use("Agg")
def explore_dataset():
    try:
        df = pd.read_parquet(parquet_path)

        # Convert float64 columns to float32 to reduce memory usage
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype('float32')

        # If dataset is too large, sample 10%
        if len(df) > 1_000_000:
            df = df.sample(frac=0.5, random_state=42)

        # Generate dataset summary
        summary = df.describe(include='all').T  
        summary["missing_values"] = df.isnull().sum()
        summary["unique_values"] = df.nunique()
        summary_text = summary.to_markdown()
        
        # Log dataset summary as text in Weights & Biases
        wandb.log({"Dataset Summary": wandb.Html(summary_text)})

        # Prepare for visualization
        fig, axes = plt.subplots(2, 2, figsize=(16, 10))  
        fig.suptitle("Dataset Overview", fontsize=16)

        # Plot data type distribution
        data_types = df.dtypes.value_counts()
        sns.barplot(x=data_types.index.astype(str), y=data_types.values, ax=axes[0, 0])
        axes[0, 0].set_title("Column Count by Data Type by AnnsKhan")
        axes[0, 0].set_ylabel("Count")
        axes[0, 0].set_xlabel("Column Type")

        # Plot mean values of numeric columns
        num_cols = df.select_dtypes(include=['number']).columns
        if len(num_cols) > 0:
            mean_values = df[num_cols].mean()
            sns.barplot(x=mean_values.index, y=mean_values.values, ax=axes[0, 1])
            axes[0, 1].set_title("Mean Values of Numeric Columns")
            axes[0, 1].set_xlabel("Column Name")
            axes[0, 1].tick_params(axis='x', rotation=45)

            # Log mean values to Weights & Biases
            for col, mean_val in mean_values.items():
                wandb.log({f"Mean Values/{col}": mean_val})

        # Plot histogram for a selected numerical column
        if len(num_cols) > 0:
            selected_col = num_cols[0]  # Choose the first numeric column
            sns.histplot(df[selected_col], bins=30, kde=True, ax=axes[1, 0])
            axes[1, 0].set_title(f"Distribution of pick-up locations ID")
            axes[1, 0].set_xlabel(selected_col)
            axes[1, 0].set_ylabel("Frequency")

        # Plot correlation heatmap
        if len(num_cols) > 1:
            corr = df[num_cols].corr()
            sns.heatmap(corr, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5, ax=axes[1, 1])
            axes[1, 1].set_title("Correlation Heatmap")

        # Save figure to buffer
        buf = io.BytesIO()
        plt.tight_layout()
        plt.savefig(buf, format='png', bbox_inches='tight')
        plt.close(fig)
        buf.seek(0)

        # Convert figure to NumPy array
        image = Image.open(buf)
        image_array = np.array(image)

        # Log image to Weights & Biases
        wandb.log({"Dataset Overview": wandb.Image(image)})

        return summary_text, image_array

    except Exception as e:
        return f"Error loading data: {str(e)}", None

    
    # Gradio interface setup
def gradio_interface():
    def run_and_plot():
        results, plot = run_benchmark()
        return results, plot
    
    def explore_data():
        summary, plot = explore_dataset()
        return summary, plot    

    with gr.Blocks() as demo:
        gr.Markdown("## Explore Dataset")
        explore_button = gr.Button("Explore Data")
        summary_text = gr.Textbox(label="Dataset Summary")
        explore_image = gr.Image(label="Feature Distributions")
        explore_button.click(explore_data, outputs=[summary_text, explore_image])
        
        gr.Markdown("## Benchmarking Different Data Loading Libraries")
        
        run_button = gr.Button("Run Benchmark")
        result_text = gr.Textbox(label="Benchmark Results")
        plot_image = gr.Image(label="Performance Graph")
        
        run_button.click(run_and_plot, outputs=[result_text, plot_image])
    return demo

demo = gradio_interface()

# Run the Gradio app
demo.launch(share=False)  # No need for share=True in VS Code, local access is sufficient