TuringsSolutions's picture
Update app.py
610ef64 verified
import io, math, json, gzip
import numpy as np
import pandas as pd
import gradio as gr
# -------------------------------
# Core metric helpers
# -------------------------------
def shannon_entropy_from_counts(counts: np.ndarray) -> float:
counts = counts.astype(float)
total = counts.sum()
if total <= 0:
return 0.0
p = counts / total
p = p[p > 0]
return float(-(p * np.log2(p)).sum())
def numeric_binned_entropy(series: pd.Series, bins: int = 32):
x = series.dropna().astype(float).values
if x.size == 0:
return 0.0, 0
try:
qs = np.linspace(0, 1, bins + 1)
edges = np.unique(np.nanpercentile(x, qs * 100))
if len(edges) < 2:
edges = np.unique(x)
hist, _ = np.histogram(x, bins=edges)
except Exception:
hist, _ = np.histogram(x, bins=bins)
H = shannon_entropy_from_counts(hist)
k = np.count_nonzero(hist)
return H, max(k, 1)
def categorical_entropy(series: pd.Series):
x = series.dropna().astype(str).values
if x.size == 0:
return 0.0, 0
vals, counts = np.unique(x, return_counts=True)
H = shannon_entropy_from_counts(counts)
return H, len(vals)
def monotone_runs_and_entropy(series: pd.Series):
x = series.dropna().values
n = len(x)
if n <= 1:
return 1, 0.0
runs = [1]
for i in range(1, n):
if x[i] >= x[i-1]:
runs[-1] += 1
else:
runs.append(1)
run_lengths = np.array(runs, dtype=float)
H = shannon_entropy_from_counts(run_lengths)
return len(runs), H
def sortedness_score(series: pd.Series) -> float:
x = series.dropna().values
if len(x) <= 1:
return 1.0
return float(np.mean(np.diff(x) >= 0))
def gzip_compress_ratio_from_bytes(b: bytes) -> float:
if len(b) == 0:
return 1.0
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write(b)
compressed = out.getvalue()
return len(compressed) / len(b)
def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float:
s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df
raw = s.to_csv(index=False).encode("utf-8", errors="ignore")
return gzip_compress_ratio_from_bytes(raw)
def pareto_maxima_count(points: np.ndarray) -> int:
if points.shape[1] < 2 or points.shape[0] == 0:
return 0
P = points[:, :2]
order = np.lexsort((-P[:, 1], -P[:, 0]))
best_y = -np.inf
count = 0
for idx in order:
y = P[idx, 1]
if y >= best_y:
count += 1
best_y = y
return int(count)
def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float:
n = points.shape[0]
if n == 0:
return 0.0
if n <= max_leaf:
return 0.0
vals = points[:, axis]
med = np.median(vals)
left = points[vals <= med]
right = points[vals > med]
pL = len(left) / n
pR = len(right) / n
H_here = 0.0
for p in (pL, pR):
if p > 0:
H_here += -p * math.log(p, 2)
next_axis = (axis + 1) % points.shape[1]
return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis)
def normalize(value: float, max_value: float) -> float:
if max_value <= 0:
return 0.0
v = max(0.0, min(1.0, value / max_value))
return float(v)
# -------------------------------
# Scoring + interpretations
# -------------------------------
def grade_band(value: float, thresholds: list, labels: list):
"""Generic banding helper: thresholds ascending; returns (label_idx, label)."""
for i, t in enumerate(thresholds):
if value <= t:
return i, labels[i]
return len(labels)-1, labels[-1]
def interpret_report(report: dict) -> dict:
"""Produce human-friendly interpretations with color badges and advice."""
r, c = report["shape"]["rows"], report["shape"]["cols"]
max_bits = math.log2(max(2, r))
# Harvestable Energy (0..1)
he = report.get("harvestable_energy_score", 0.0)
he_pct = round(100 * he)
he_idx, he_label = grade_band(1.0 - he, [0.15, 0.35, 0.6, 0.85], # invert so higher is better
["Excellent", "High", "Moderate", "Low", "Very Low"])
he_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][he_idx]
# Gzip ratio (lower is better)
gz = report.get("gzip_compression_ratio", 1.0)
gz_idx, gz_label = grade_band(gz, [0.45, 0.7, 0.9, 1.1], ["Highly compressible", "Compressible", "Some structure", "Low structure", "Unstructured"])
gz_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][gz_idx]
# kd-entropy (lower is better). Normalize by log2(n)
Hkd = float(report.get("kd_partition_entropy_bits", 0.0))
Hkd_norm = normalize(Hkd, max_bits)
kd_idx, kd_label = grade_band(Hkd_norm, [0.15, 0.3, 0.5, 0.75], ["Simple spatial blocks", "Moderately simple", "Mixed", "Complex", "Highly complex"])
kd_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][kd_idx]
# Run-entropy / Sortedness aggregation for numeric columns
per_col = report.get("per_column", {})
run_H = []
sorted_fracs = []
for col, st in per_col.items():
if "run_entropy_bits" in st:
run_H.append(st["run_entropy_bits"])
sorted_fracs.append(st.get("sortedness_fraction", 0.0))
if run_H:
runH_mean = float(np.mean(run_H))
runH_norm = normalize(runH_mean, max_bits)
sort_mean = float(np.mean(sorted_fracs)) if sorted_fracs else 0.0
else:
runH_norm = 1.0
sort_mean = 0.0
run_idx, run_label = grade_band(runH_norm, [0.15, 0.3, 0.5, 0.75], ["Long smooth runs", "Mostly smooth", "Mixed runs", "Choppy", "Highly choppy"])
run_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][run_idx]
sort_idx, sort_label = grade_band(1.0 - sort_mean, [0.15, 0.3, 0.5, 0.75], ["Highly sorted", "Mostly sorted", "Partially sorted", "Barely sorted", "Unsorted"])
sort_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][sort_idx]
# Duplicate rows
dup = report.get("duplicate_row_fraction", 0.0)
dup_idx, dup_label = grade_band(dup, [0.01, 0.05, 0.15, 0.3], ["Clean", "Light dups", "Moderate dups", "High dups", "Very high dups"])
dup_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][dup_idx]
# Recommendations (simple rule-based)
recs = []
if he >= 0.7:
recs.append("Leverage **adaptive algorithms** (TimSort-style merges, linear hull/skyline passes) for near-linear performance.")
elif he >= 0.4:
recs.append("Consider **light preprocessing** (bucketing, dedupe) to unlock more adaptive speedups.")
else:
recs.append("Expect **near worst-case costs**; use robust algorithms and consider feature engineering/cleaning.")
if gz <= 0.7:
recs.append("Data is **highly compressible** β†’ try dictionary/columnar encoding and caching to cut memory/IO.")
elif gz >= 1.0:
recs.append("Data is **hard to compress** β†’ prioritize dimensionality reduction or noise filtering.")
if runH_norm <= 0.3 or sort_mean >= 0.7:
recs.append("Columns show **long monotone runs** β†’ merges and single-pass scans will be efficient.")
else:
recs.append("Columns are **choppy** β†’ batch/aggregate before sorting to reduce comparisons.")
if Hkd_norm <= 0.3:
recs.append("Spatial structure is **simple** β†’ kd/quad trees will be shallow; range queries will be fast.")
elif Hkd_norm >= 0.6:
recs.append("Spatial structure is **complex** β†’ consider clustering/tiling before building indexes.")
if dup >= 0.05:
recs.append("De-duplicate rows to lower entropy and improve compression & joins.")
# Summary verdict
verdict = ["Outstanding structure for fast algorithms.",
"Strong latent order; plenty of speed to harvest.",
"Mixed: some order present; moderate gains possible.",
"Low order; focus on cleaning and feature engineering.",
"Chaotic: assume worst-case runtimes."][he_idx]
return {
"he": {"pct": he_pct, "label": he_label, "color": he_color},
"gzip": {"value": gz, "label": gz_label, "color": gz_color},
"kd": {"value": Hkd, "label": kd_label, "color": kd_color},
"runs": {"value": runH_norm, "label": run_label, "color": run_color},
"sorted": {"value": sort_mean, "label": sort_label, "color": sort_color},
"dup": {"value": dup, "label": dup_label, "color": dup_color},
"verdict": verdict,
"recs": recs[:6]
}
# -------------------------------
# Compute metrics
# -------------------------------
def compute_metrics(df: pd.DataFrame) -> dict:
report = {}
n_rows, n_cols = df.shape
report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)}
# Types
types = {}
for c in df.columns:
s = df[c]
if pd.api.types.is_numeric_dtype(s):
types[c] = "numeric"
elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower():
types[c] = "datetime"
else:
types[c] = "categorical"
report["column_types"] = types
missing = df.isna().mean().to_dict()
dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df)))
report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()}
report["duplicate_row_fraction"] = dup_ratio
col_stats = {}
for c in df.columns:
s = df[c]
if types[c] == "numeric":
H, k = numeric_binned_entropy(s)
runs, Hruns = monotone_runs_and_entropy(s)
sorted_frac = sortedness_score(s)
col_stats[c] = {
"entropy_binned_bits": float(H),
"active_bins": int(k),
"monotone_runs": int(runs),
"run_entropy_bits": float(Hruns),
"sortedness_fraction": float(sorted_frac),
"min": float(np.nanmin(s.values)) if s.dropna().shape[0] else None,
"max": float(np.nanmax(s.values)) if s.dropna().shape[0] else None,
"mean": float(np.nanmean(s.values)) if s.dropna().shape[0] else None,
"std": float(np.nanstd(s.values)) if s.dropna().shape[0] else None,
}
elif types[c] == "datetime":
try:
sd = pd.to_datetime(s, errors="coerce")
min_dt = sd.min()
max_dt = sd.max()
col_stats[c] = {
"entropy_bits": 0.0,
"unique_values": int(sd.nunique(dropna=True)),
"min_datetime": None if pd.isna(min_dt) else min_dt.isoformat(),
"max_datetime": None if pd.isna(max_dt) else max_dt.isoformat(),
}
except Exception:
col_stats[c] = {"entropy_bits": 0.0, "unique_values": int(s.nunique(dropna=True))}
else:
H, k = categorical_entropy(s)
# top-5 categories
vc = s.astype(str).value_counts(dropna=True).head(5)
top5 = [{"value": str(idx), "count": int(cnt)} for idx, cnt in vc.items()]
col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k), "top_values": top5}
report["per_column"] = col_stats
try:
gzip_ratio = dataframe_gzip_ratio(df)
except Exception:
gzip_ratio = 1.0
report["gzip_compression_ratio"] = float(gzip_ratio)
num_cols = [c for c, t in types.items() if t == "numeric"]
if len(num_cols) >= 2:
X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float)
X = X[~np.isnan(X).any(axis=1)]
if X.shape[0] >= 3:
pts2 = X[:, :2]
report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2))
try:
H_kd = kd_entropy(pts2, max_leaf=128, axis=0)
except Exception:
H_kd = 0.0
report["kd_partition_entropy_bits"] = float(H_kd)
else:
report["pareto_maxima_2d"] = 0
report["kd_partition_entropy_bits"] = 0.0
else:
report["pareto_maxima_2d"] = 0
report["kd_partition_entropy_bits"] = 0.0
# Harvestable Energy
max_bits = math.log2(max(2, n_rows))
he_parts = []
he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"])))
num_run_entropies = []
for c in df.columns:
st = col_stats.get(c, {})
if "run_entropy_bits" in st:
num_run_entropies.append(st["run_entropy_bits"])
if num_run_entropies:
mean_run_H = float(np.mean(num_run_entropies))
he_parts.append(1.0 - normalize(mean_run_H, max_bits))
H_kd = report.get("kd_partition_entropy_bits", 0.0)
if H_kd is not None:
he_parts.append(1.0 - normalize(float(H_kd), max_bits))
if he_parts:
HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts]))
else:
HE = 0.0
report["harvestable_energy_score"] = HE
return report
# -------------------------------
# Dataset shape summary for other models
# -------------------------------
def dataset_shape_summary(df: pd.DataFrame, report: dict, max_examples: int = 3) -> dict:
"""Compact JSON describing the dataset schema, ranges, and examples for LLM ingestion."""
cols = []
for name, t in report["column_types"].items():
col_info = {"name": name, "type": t}
per = report["per_column"].get(name, {})
if t == "numeric":
col_info.update({
"min": per.get("min"),
"max": per.get("max"),
"mean": per.get("mean"),
"std": per.get("std"),
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0)
})
elif t == "datetime":
col_info.update({
"min": per.get("min_datetime"),
"max": per.get("max_datetime"),
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0)
})
else: # categorical or other
col_info.update({
"unique_values": per.get("unique_values"),
"top_values": per.get("top_values", []),
"missing_frac": report["missing_fraction_per_column"].get(name, 0.0)
})
cols.append(col_info)
# few example rows (stringified to be safe)
examples = df.head(max_examples).astype(str).to_dict(orient="records")
shape = {
"n_rows": report["shape"]["rows"],
"n_cols": report["shape"]["cols"],
"columns": cols,
"duplicates_fraction": report.get("duplicate_row_fraction", 0.0),
"gzip_compression_ratio": report.get("gzip_compression_ratio", None),
"harvestable_energy_score": report.get("harvestable_energy_score", None),
"examples": examples
}
return shape
# -------------------------------
# UI rendering helpers
# -------------------------------
def badge(text: str, color: str) -> str:
return f"<span style='background:{color};color:white;padding:6px 10px;border-radius:999px;font-weight:600'>{text}</span>"
def metric_card(title: str, value: str, badge_html: str) -> str:
return f"""
<div style="flex:1;min-width:220px;border:1px solid #e5e7eb;border-radius:14px;padding:14px 16px;">
<div style="font-size:14px;color:#6b7280;margin-bottom:8px">{title}</div>
<div style="font-size:22px;font-weight:700;margin-bottom:10px">{value}</div>
{badge_html}
</div>
"""
def render_dashboard(report: dict, interp: dict) -> str:
he = interp["he"]
gz = interp["gzip"]
kd = interp["kd"]
runs = interp["runs"]
sortb = interp["sorted"]
dup = interp["dup"]
cards = []
cards.append(metric_card("Harvestable Energy", f"{he['pct']} / 100", badge(he['label'], he['color'])))
cards.append(metric_card("Compressibility (gzip)", f"{gz['value']:.3f}", badge(gz['label'], gz['color'])))
cards.append(metric_card("Range-Partition Entropy (kd bits)", f"{kd['value']:.3f}", badge(kd['label'], kd['color'])))
cards.append(metric_card("Run-Entropy (avg, normalized)", f"{runs['value']:.2f}", badge(runs['label'], runs['color'])))
cards.append(metric_card("Sortedness (avg fraction)", f"{sortb['value']:.2f}", badge(sortb['label'], sortb['color'])))
cards.append(metric_card("Duplicate Rows (fraction)", f"{dup['value']:.2f}", badge(dup['label'], dup['color'])))
grid = "<div style='display:flex;flex-wrap:wrap;gap:12px'>" + "".join(cards) + "</div>"
verdict = f"<div style='margin-top:12px;padding:14px 16px;background:#f9fafb;border:1px solid #e5e7eb;border-radius:14px'><b>Verdict:</b> {interp['verdict']}</div>"
return grid + verdict
def render_recs(interp: dict) -> str:
lis = "".join([f"<li>{r}</li>" for r in interp["recs"]])
return f"<ul>{lis}</ul>"
def render_columns(report: dict) -> str:
rows = []
for c, st in report.get("per_column", {}).items():
miss = report["missing_fraction_per_column"].get(c, 0.0)
if "entropy_binned_bits" in st:
rows.append(f"<tr><td><b>{c}</b> (num)</td><td>{miss:.1%}</td><td>{st['entropy_binned_bits']:.2f}</td><td>{st['monotone_runs']}</td><td>{st['run_entropy_bits']:.2f}</td><td>{st['sortedness_fraction']:.2f}</td></tr>")
elif "entropy_bits" in st:
rows.append(f"<tr><td><b>{c}</b> (cat)</td><td>{miss:.1%}</td><td>{st['entropy_bits']:.2f}</td><td>-</td><td>-</td><td>-</td></tr>")
else:
rows.append(f"<tr><td><b>{c}</b></td><td>{miss:.1%}</td><td>-</td><td>-</td><td>-</td><td>-</td></tr>")
header = "<tr><th>Column</th><th>Missing</th><th>Entropy</th><th>Monotone Runs</th><th>Run-Entropy</th><th>Sortedness</th></tr>"
table = "<table style='width:100%;border-collapse:collapse'>" + header + "".join(rows) + "</table>"
table = table.replace("<tr>", "<tr style='border-bottom:1px solid #e5e7eb'>")
table = table.replace("<th>", "<th style='text-align:left;padding:8px 6px;color:#374151'>")
table = table.replace("<td>", "<td style='padding:8px 6px;color:#111827'>")
return table
# -------------------------------
# Gradio app
# -------------------------------
def analyze(file):
if file is None:
return "{}", "Please upload a CSV.", "", "", "{}"
try:
df = pd.read_csv(file.name)
except Exception as e:
return "{}", f"Failed to read CSV: {e}", "", "", "{}"
report = compute_metrics(df)
interp = interpret_report(report)
shape = dataset_shape_summary(df, report, max_examples=3)
report_json = json.dumps(report, indent=2)
dashboard_html = render_dashboard(report, interp)
recs_html = render_recs(interp)
cols_html = render_columns(report)
shape_json = json.dumps(shape, indent=2)
return report_json, dashboard_html, recs_html, cols_html, shape_json
with gr.Blocks(title="OrderLens β€” Data Interpreter") as demo:
gr.Markdown("# OrderLens β€” Data Interpreter")
gr.Markdown("Upload a CSV and get **readable** structure metrics with plain-language guidance.")
with gr.Row():
inp = gr.File(file_types=[".csv"], label="CSV file")
btn = gr.Button("Analyze", variant="primary")
gr.Markdown("---")
gr.Markdown("### Dashboard") # color-coded cards + verdict
dash = gr.HTML()
gr.Markdown("### Recommendations") # actionable tips
recs = gr.HTML()
gr.Markdown("### Column Details") # per-column table
cols = gr.HTML()
gr.Markdown("### Dataset Shape Summary (JSON)") # compact schema for other models
shape_out = gr.Code(label="Shape", language="json")
gr.Markdown("### Raw report (JSON)") # API-friendly
json_out = gr.Code(label="Report", language="json")
btn.click(analyze, inputs=inp, outputs=[json_out, dash, recs, cols, shape_out])
if __name__ == "__main__":
demo.launch()