|
import io, math, json, gzip |
|
import numpy as np |
|
import pandas as pd |
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
def shannon_entropy_from_counts(counts: np.ndarray) -> float: |
|
counts = counts.astype(float) |
|
total = counts.sum() |
|
if total <= 0: |
|
return 0.0 |
|
p = counts / total |
|
p = p[p > 0] |
|
return float(-(p * np.log2(p)).sum()) |
|
|
|
def numeric_binned_entropy(series: pd.Series, bins: int = 32): |
|
x = series.dropna().astype(float).values |
|
if x.size == 0: |
|
return 0.0, 0 |
|
try: |
|
qs = np.linspace(0, 1, bins + 1) |
|
edges = np.unique(np.nanpercentile(x, qs * 100)) |
|
if len(edges) < 2: |
|
edges = np.unique(x) |
|
hist, _ = np.histogram(x, bins=edges) |
|
except Exception: |
|
hist, _ = np.histogram(x, bins=bins) |
|
H = shannon_entropy_from_counts(hist) |
|
k = np.count_nonzero(hist) |
|
return H, max(k, 1) |
|
|
|
def categorical_entropy(series: pd.Series): |
|
x = series.dropna().astype(str).values |
|
if x.size == 0: |
|
return 0.0, 0 |
|
vals, counts = np.unique(x, return_counts=True) |
|
H = shannon_entropy_from_counts(counts) |
|
return H, len(vals) |
|
|
|
def monotone_runs_and_entropy(series: pd.Series): |
|
x = series.dropna().values |
|
n = len(x) |
|
if n <= 1: |
|
return 1, 0.0 |
|
runs = [1] |
|
for i in range(1, n): |
|
if x[i] >= x[i-1]: |
|
runs[-1] += 1 |
|
else: |
|
runs.append(1) |
|
run_lengths = np.array(runs, dtype=float) |
|
H = shannon_entropy_from_counts(run_lengths) |
|
return len(runs), H |
|
|
|
def sortedness_score(series: pd.Series) -> float: |
|
x = series.dropna().values |
|
if len(x) <= 1: |
|
return 1.0 |
|
return float(np.mean(np.diff(x) >= 0)) |
|
|
|
def gzip_compress_ratio_from_bytes(b: bytes) -> float: |
|
if len(b) == 0: |
|
return 1.0 |
|
out = io.BytesIO() |
|
with gzip.GzipFile(fileobj=out, mode="wb") as f: |
|
f.write(b) |
|
compressed = out.getvalue() |
|
return len(compressed) / len(b) |
|
|
|
def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float: |
|
s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df |
|
raw = s.to_csv(index=False).encode("utf-8", errors="ignore") |
|
return gzip_compress_ratio_from_bytes(raw) |
|
|
|
def pareto_maxima_count(points: np.ndarray) -> int: |
|
if points.shape[1] < 2 or points.shape[0] == 0: |
|
return 0 |
|
P = points[:, :2] |
|
order = np.lexsort((-P[:, 1], -P[:, 0])) |
|
best_y = -np.inf |
|
count = 0 |
|
for idx in order: |
|
y = P[idx, 1] |
|
if y >= best_y: |
|
count += 1 |
|
best_y = y |
|
return int(count) |
|
|
|
def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float: |
|
n = points.shape[0] |
|
if n == 0: |
|
return 0.0 |
|
if n <= max_leaf: |
|
return 0.0 |
|
vals = points[:, axis] |
|
med = np.median(vals) |
|
left = points[vals <= med] |
|
right = points[vals > med] |
|
pL = len(left) / n |
|
pR = len(right) / n |
|
H_here = 0.0 |
|
for p in (pL, pR): |
|
if p > 0: |
|
H_here += -p * math.log(p, 2) |
|
next_axis = (axis + 1) % points.shape[1] |
|
return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis) |
|
|
|
def normalize(value: float, max_value: float) -> float: |
|
if max_value <= 0: |
|
return 0.0 |
|
v = max(0.0, min(1.0, value / max_value)) |
|
return float(v) |
|
|
|
|
|
|
|
|
|
def grade_band(value: float, thresholds: list, labels: list): |
|
"""Generic banding helper: thresholds ascending; returns (label_idx, label).""" |
|
for i, t in enumerate(thresholds): |
|
if value <= t: |
|
return i, labels[i] |
|
return len(labels)-1, labels[-1] |
|
|
|
def interpret_report(report: dict) -> dict: |
|
"""Produce human-friendly interpretations with color badges and advice.""" |
|
r, c = report["shape"]["rows"], report["shape"]["cols"] |
|
max_bits = math.log2(max(2, r)) |
|
|
|
|
|
he = report.get("harvestable_energy_score", 0.0) |
|
he_pct = round(100 * he) |
|
he_idx, he_label = grade_band(1.0 - he, [0.15, 0.35, 0.6, 0.85], |
|
["Excellent", "High", "Moderate", "Low", "Very Low"]) |
|
he_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][he_idx] |
|
|
|
|
|
gz = report.get("gzip_compression_ratio", 1.0) |
|
gz_idx, gz_label = grade_band(gz, [0.45, 0.7, 0.9, 1.1], ["Highly compressible", "Compressible", "Some structure", "Low structure", "Unstructured"]) |
|
gz_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][gz_idx] |
|
|
|
|
|
Hkd = float(report.get("kd_partition_entropy_bits", 0.0)) |
|
Hkd_norm = normalize(Hkd, max_bits) |
|
kd_idx, kd_label = grade_band(Hkd_norm, [0.15, 0.3, 0.5, 0.75], ["Simple spatial blocks", "Moderately simple", "Mixed", "Complex", "Highly complex"]) |
|
kd_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][kd_idx] |
|
|
|
|
|
per_col = report.get("per_column", {}) |
|
run_H = [] |
|
sorted_fracs = [] |
|
for col, st in per_col.items(): |
|
if "run_entropy_bits" in st: |
|
run_H.append(st["run_entropy_bits"]) |
|
sorted_fracs.append(st.get("sortedness_fraction", 0.0)) |
|
if run_H: |
|
runH_mean = float(np.mean(run_H)) |
|
runH_norm = normalize(runH_mean, max_bits) |
|
sort_mean = float(np.mean(sorted_fracs)) if sorted_fracs else 0.0 |
|
else: |
|
runH_norm = 1.0 |
|
sort_mean = 0.0 |
|
|
|
run_idx, run_label = grade_band(runH_norm, [0.15, 0.3, 0.5, 0.75], ["Long smooth runs", "Mostly smooth", "Mixed runs", "Choppy", "Highly choppy"]) |
|
run_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][run_idx] |
|
|
|
sort_idx, sort_label = grade_band(1.0 - sort_mean, [0.15, 0.3, 0.5, 0.75], ["Highly sorted", "Mostly sorted", "Partially sorted", "Barely sorted", "Unsorted"]) |
|
sort_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][sort_idx] |
|
|
|
|
|
dup = report.get("duplicate_row_fraction", 0.0) |
|
dup_idx, dup_label = grade_band(dup, [0.01, 0.05, 0.15, 0.3], ["Clean", "Light dups", "Moderate dups", "High dups", "Very high dups"]) |
|
dup_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][dup_idx] |
|
|
|
|
|
recs = [] |
|
if he >= 0.7: |
|
recs.append("Leverage **adaptive algorithms** (TimSort-style merges, linear hull/skyline passes) for near-linear performance.") |
|
elif he >= 0.4: |
|
recs.append("Consider **light preprocessing** (bucketing, dedupe) to unlock more adaptive speedups.") |
|
else: |
|
recs.append("Expect **near worst-case costs**; use robust algorithms and consider feature engineering/cleaning.") |
|
|
|
if gz <= 0.7: |
|
recs.append("Data is **highly compressible** β try dictionary/columnar encoding and caching to cut memory/IO.") |
|
elif gz >= 1.0: |
|
recs.append("Data is **hard to compress** β prioritize dimensionality reduction or noise filtering.") |
|
|
|
if runH_norm <= 0.3 or sort_mean >= 0.7: |
|
recs.append("Columns show **long monotone runs** β merges and single-pass scans will be efficient.") |
|
else: |
|
recs.append("Columns are **choppy** β batch/aggregate before sorting to reduce comparisons.") |
|
|
|
if Hkd_norm <= 0.3: |
|
recs.append("Spatial structure is **simple** β kd/quad trees will be shallow; range queries will be fast.") |
|
elif Hkd_norm >= 0.6: |
|
recs.append("Spatial structure is **complex** β consider clustering/tiling before building indexes.") |
|
|
|
if dup >= 0.05: |
|
recs.append("De-duplicate rows to lower entropy and improve compression & joins.") |
|
|
|
|
|
verdict = ["Outstanding structure for fast algorithms.", |
|
"Strong latent order; plenty of speed to harvest.", |
|
"Mixed: some order present; moderate gains possible.", |
|
"Low order; focus on cleaning and feature engineering.", |
|
"Chaotic: assume worst-case runtimes."][he_idx] |
|
|
|
return { |
|
"he": {"pct": he_pct, "label": he_label, "color": he_color}, |
|
"gzip": {"value": gz, "label": gz_label, "color": gz_color}, |
|
"kd": {"value": Hkd, "label": kd_label, "color": kd_color}, |
|
"runs": {"value": runH_norm, "label": run_label, "color": run_color}, |
|
"sorted": {"value": sort_mean, "label": sort_label, "color": sort_color}, |
|
"dup": {"value": dup, "label": dup_label, "color": dup_color}, |
|
"verdict": verdict, |
|
"recs": recs[:6] |
|
} |
|
|
|
|
|
|
|
|
|
def compute_metrics(df: pd.DataFrame) -> dict: |
|
report = {} |
|
n_rows, n_cols = df.shape |
|
report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)} |
|
|
|
|
|
types = {} |
|
for c in df.columns: |
|
s = df[c] |
|
if pd.api.types.is_numeric_dtype(s): |
|
types[c] = "numeric" |
|
elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower(): |
|
types[c] = "datetime" |
|
else: |
|
types[c] = "categorical" |
|
report["column_types"] = types |
|
|
|
missing = df.isna().mean().to_dict() |
|
dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df))) |
|
report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()} |
|
report["duplicate_row_fraction"] = dup_ratio |
|
|
|
col_stats = {} |
|
for c in df.columns: |
|
s = df[c] |
|
if types[c] == "numeric": |
|
H, k = numeric_binned_entropy(s) |
|
runs, Hruns = monotone_runs_and_entropy(s) |
|
sorted_frac = sortedness_score(s) |
|
col_stats[c] = { |
|
"entropy_binned_bits": float(H), |
|
"active_bins": int(k), |
|
"monotone_runs": int(runs), |
|
"run_entropy_bits": float(Hruns), |
|
"sortedness_fraction": float(sorted_frac), |
|
"min": float(np.nanmin(s.values)) if s.dropna().shape[0] else None, |
|
"max": float(np.nanmax(s.values)) if s.dropna().shape[0] else None, |
|
"mean": float(np.nanmean(s.values)) if s.dropna().shape[0] else None, |
|
"std": float(np.nanstd(s.values)) if s.dropna().shape[0] else None, |
|
} |
|
elif types[c] == "datetime": |
|
try: |
|
sd = pd.to_datetime(s, errors="coerce") |
|
min_dt = sd.min() |
|
max_dt = sd.max() |
|
col_stats[c] = { |
|
"entropy_bits": 0.0, |
|
"unique_values": int(sd.nunique(dropna=True)), |
|
"min_datetime": None if pd.isna(min_dt) else min_dt.isoformat(), |
|
"max_datetime": None if pd.isna(max_dt) else max_dt.isoformat(), |
|
} |
|
except Exception: |
|
col_stats[c] = {"entropy_bits": 0.0, "unique_values": int(s.nunique(dropna=True))} |
|
else: |
|
H, k = categorical_entropy(s) |
|
|
|
vc = s.astype(str).value_counts(dropna=True).head(5) |
|
top5 = [{"value": str(idx), "count": int(cnt)} for idx, cnt in vc.items()] |
|
col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k), "top_values": top5} |
|
report["per_column"] = col_stats |
|
|
|
try: |
|
gzip_ratio = dataframe_gzip_ratio(df) |
|
except Exception: |
|
gzip_ratio = 1.0 |
|
report["gzip_compression_ratio"] = float(gzip_ratio) |
|
|
|
num_cols = [c for c, t in types.items() if t == "numeric"] |
|
if len(num_cols) >= 2: |
|
X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float) |
|
X = X[~np.isnan(X).any(axis=1)] |
|
if X.shape[0] >= 3: |
|
pts2 = X[:, :2] |
|
report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2)) |
|
try: |
|
H_kd = kd_entropy(pts2, max_leaf=128, axis=0) |
|
except Exception: |
|
H_kd = 0.0 |
|
report["kd_partition_entropy_bits"] = float(H_kd) |
|
else: |
|
report["pareto_maxima_2d"] = 0 |
|
report["kd_partition_entropy_bits"] = 0.0 |
|
else: |
|
report["pareto_maxima_2d"] = 0 |
|
report["kd_partition_entropy_bits"] = 0.0 |
|
|
|
|
|
max_bits = math.log2(max(2, n_rows)) |
|
he_parts = [] |
|
he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"]))) |
|
num_run_entropies = [] |
|
for c in df.columns: |
|
st = col_stats.get(c, {}) |
|
if "run_entropy_bits" in st: |
|
num_run_entropies.append(st["run_entropy_bits"]) |
|
if num_run_entropies: |
|
mean_run_H = float(np.mean(num_run_entropies)) |
|
he_parts.append(1.0 - normalize(mean_run_H, max_bits)) |
|
H_kd = report.get("kd_partition_entropy_bits", 0.0) |
|
if H_kd is not None: |
|
he_parts.append(1.0 - normalize(float(H_kd), max_bits)) |
|
if he_parts: |
|
HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts])) |
|
else: |
|
HE = 0.0 |
|
report["harvestable_energy_score"] = HE |
|
|
|
return report |
|
|
|
|
|
|
|
|
|
def dataset_shape_summary(df: pd.DataFrame, report: dict, max_examples: int = 3) -> dict: |
|
"""Compact JSON describing the dataset schema, ranges, and examples for LLM ingestion.""" |
|
cols = [] |
|
for name, t in report["column_types"].items(): |
|
col_info = {"name": name, "type": t} |
|
per = report["per_column"].get(name, {}) |
|
if t == "numeric": |
|
col_info.update({ |
|
"min": per.get("min"), |
|
"max": per.get("max"), |
|
"mean": per.get("mean"), |
|
"std": per.get("std"), |
|
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0) |
|
}) |
|
elif t == "datetime": |
|
col_info.update({ |
|
"min": per.get("min_datetime"), |
|
"max": per.get("max_datetime"), |
|
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0) |
|
}) |
|
else: |
|
col_info.update({ |
|
"unique_values": per.get("unique_values"), |
|
"top_values": per.get("top_values", []), |
|
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0) |
|
}) |
|
cols.append(col_info) |
|
|
|
|
|
examples = df.head(max_examples).astype(str).to_dict(orient="records") |
|
|
|
shape = { |
|
"n_rows": report["shape"]["rows"], |
|
"n_cols": report["shape"]["cols"], |
|
"columns": cols, |
|
"duplicates_fraction": report.get("duplicate_row_fraction", 0.0), |
|
"gzip_compression_ratio": report.get("gzip_compression_ratio", None), |
|
"harvestable_energy_score": report.get("harvestable_energy_score", None), |
|
"examples": examples |
|
} |
|
return shape |
|
|
|
|
|
|
|
|
|
def badge(text: str, color: str) -> str: |
|
return f"<span style='background:{color};color:white;padding:6px 10px;border-radius:999px;font-weight:600'>{text}</span>" |
|
|
|
def metric_card(title: str, value: str, badge_html: str) -> str: |
|
return f""" |
|
<div style="flex:1;min-width:220px;border:1px solid #e5e7eb;border-radius:14px;padding:14px 16px;"> |
|
<div style="font-size:14px;color:#6b7280;margin-bottom:8px">{title}</div> |
|
<div style="font-size:22px;font-weight:700;margin-bottom:10px">{value}</div> |
|
{badge_html} |
|
</div> |
|
""" |
|
|
|
def render_dashboard(report: dict, interp: dict) -> str: |
|
he = interp["he"] |
|
gz = interp["gzip"] |
|
kd = interp["kd"] |
|
runs = interp["runs"] |
|
sortb = interp["sorted"] |
|
dup = interp["dup"] |
|
|
|
cards = [] |
|
cards.append(metric_card("Harvestable Energy", f"{he['pct']} / 100", badge(he['label'], he['color']))) |
|
cards.append(metric_card("Compressibility (gzip)", f"{gz['value']:.3f}", badge(gz['label'], gz['color']))) |
|
cards.append(metric_card("Range-Partition Entropy (kd bits)", f"{kd['value']:.3f}", badge(kd['label'], kd['color']))) |
|
cards.append(metric_card("Run-Entropy (avg, normalized)", f"{runs['value']:.2f}", badge(runs['label'], runs['color']))) |
|
cards.append(metric_card("Sortedness (avg fraction)", f"{sortb['value']:.2f}", badge(sortb['label'], sortb['color']))) |
|
cards.append(metric_card("Duplicate Rows (fraction)", f"{dup['value']:.2f}", badge(dup['label'], dup['color']))) |
|
|
|
grid = "<div style='display:flex;flex-wrap:wrap;gap:12px'>" + "".join(cards) + "</div>" |
|
verdict = f"<div style='margin-top:12px;padding:14px 16px;background:#f9fafb;border:1px solid #e5e7eb;border-radius:14px'><b>Verdict:</b> {interp['verdict']}</div>" |
|
return grid + verdict |
|
|
|
def render_recs(interp: dict) -> str: |
|
lis = "".join([f"<li>{r}</li>" for r in interp["recs"]]) |
|
return f"<ul>{lis}</ul>" |
|
|
|
def render_columns(report: dict) -> str: |
|
rows = [] |
|
for c, st in report.get("per_column", {}).items(): |
|
miss = report["missing_fraction_per_column"].get(c, 0.0) |
|
if "entropy_binned_bits" in st: |
|
rows.append(f"<tr><td><b>{c}</b> (num)</td><td>{miss:.1%}</td><td>{st['entropy_binned_bits']:.2f}</td><td>{st['monotone_runs']}</td><td>{st['run_entropy_bits']:.2f}</td><td>{st['sortedness_fraction']:.2f}</td></tr>") |
|
elif "entropy_bits" in st: |
|
rows.append(f"<tr><td><b>{c}</b> (cat)</td><td>{miss:.1%}</td><td>{st['entropy_bits']:.2f}</td><td>-</td><td>-</td><td>-</td></tr>") |
|
else: |
|
rows.append(f"<tr><td><b>{c}</b></td><td>{miss:.1%}</td><td>-</td><td>-</td><td>-</td><td>-</td></tr>") |
|
header = "<tr><th>Column</th><th>Missing</th><th>Entropy</th><th>Monotone Runs</th><th>Run-Entropy</th><th>Sortedness</th></tr>" |
|
table = "<table style='width:100%;border-collapse:collapse'>" + header + "".join(rows) + "</table>" |
|
table = table.replace("<tr>", "<tr style='border-bottom:1px solid #e5e7eb'>") |
|
table = table.replace("<th>", "<th style='text-align:left;padding:8px 6px;color:#374151'>") |
|
table = table.replace("<td>", "<td style='padding:8px 6px;color:#111827'>") |
|
return table |
|
|
|
|
|
|
|
|
|
def analyze(file): |
|
if file is None: |
|
return "{}", "Please upload a CSV.", "", "", "{}" |
|
try: |
|
df = pd.read_csv(file.name) |
|
except Exception as e: |
|
return "{}", f"Failed to read CSV: {e}", "", "", "{}" |
|
|
|
report = compute_metrics(df) |
|
interp = interpret_report(report) |
|
shape = dataset_shape_summary(df, report, max_examples=3) |
|
|
|
report_json = json.dumps(report, indent=2) |
|
dashboard_html = render_dashboard(report, interp) |
|
recs_html = render_recs(interp) |
|
cols_html = render_columns(report) |
|
shape_json = json.dumps(shape, indent=2) |
|
|
|
return report_json, dashboard_html, recs_html, cols_html, shape_json |
|
|
|
with gr.Blocks(title="OrderLens β Data Interpreter") as demo: |
|
gr.Markdown("# OrderLens β Data Interpreter") |
|
gr.Markdown("Upload a CSV and get **readable** structure metrics with plain-language guidance.") |
|
with gr.Row(): |
|
inp = gr.File(file_types=[".csv"], label="CSV file") |
|
btn = gr.Button("Analyze", variant="primary") |
|
gr.Markdown("---") |
|
gr.Markdown("### Dashboard") |
|
dash = gr.HTML() |
|
gr.Markdown("### Recommendations") |
|
recs = gr.HTML() |
|
gr.Markdown("### Column Details") |
|
cols = gr.HTML() |
|
gr.Markdown("### Dataset Shape Summary (JSON)") |
|
shape_out = gr.Code(label="Shape", language="json") |
|
gr.Markdown("### Raw report (JSON)") |
|
json_out = gr.Code(label="Report", language="json") |
|
|
|
btn.click(analyze, inputs=inp, outputs=[json_out, dash, recs, cols, shape_out]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |