import io, math, json, gzip import numpy as np import pandas as pd import gradio as gr # ------------------------------- # Core metric helpers # ------------------------------- def shannon_entropy_from_counts(counts: np.ndarray) -> float: counts = counts.astype(float) total = counts.sum() if total <= 0: return 0.0 p = counts / total p = p[p > 0] return float(-(p * np.log2(p)).sum()) def numeric_binned_entropy(series: pd.Series, bins: int = 32): x = series.dropna().astype(float).values if x.size == 0: return 0.0, 0 try: qs = np.linspace(0, 1, bins + 1) edges = np.unique(np.nanpercentile(x, qs * 100)) if len(edges) < 2: edges = np.unique(x) hist, _ = np.histogram(x, bins=edges) except Exception: hist, _ = np.histogram(x, bins=bins) H = shannon_entropy_from_counts(hist) k = np.count_nonzero(hist) return H, max(k, 1) def categorical_entropy(series: pd.Series): x = series.dropna().astype(str).values if x.size == 0: return 0.0, 0 vals, counts = np.unique(x, return_counts=True) H = shannon_entropy_from_counts(counts) return H, len(vals) def monotone_runs_and_entropy(series: pd.Series): x = series.dropna().values n = len(x) if n <= 1: return 1, 0.0 runs = [1] for i in range(1, n): if x[i] >= x[i-1]: runs[-1] += 1 else: runs.append(1) run_lengths = np.array(runs, dtype=float) H = shannon_entropy_from_counts(run_lengths) return len(runs), H def sortedness_score(series: pd.Series) -> float: x = series.dropna().values if len(x) <= 1: return 1.0 return float(np.mean(np.diff(x) >= 0)) def gzip_compress_ratio_from_bytes(b: bytes) -> float: if len(b) == 0: return 1.0 out = io.BytesIO() with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write(b) compressed = out.getvalue() return len(compressed) / len(b) def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float: s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df raw = s.to_csv(index=False).encode("utf-8", errors="ignore") return gzip_compress_ratio_from_bytes(raw) def pareto_maxima_count(points: np.ndarray) -> int: if points.shape[1] < 2 or points.shape[0] == 0: return 0 P = points[:, :2] order = np.lexsort((-P[:, 1], -P[:, 0])) best_y = -np.inf count = 0 for idx in order: y = P[idx, 1] if y >= best_y: count += 1 best_y = y return int(count) def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float: n = points.shape[0] if n == 0: return 0.0 if n <= max_leaf: return 0.0 vals = points[:, axis] med = np.median(vals) left = points[vals <= med] right = points[vals > med] pL = len(left) / n pR = len(right) / n H_here = 0.0 for p in (pL, pR): if p > 0: H_here += -p * math.log(p, 2) next_axis = (axis + 1) % points.shape[1] return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis) def normalize(value: float, max_value: float) -> float: if max_value <= 0: return 0.0 v = max(0.0, min(1.0, value / max_value)) return float(v) # ------------------------------- # Scoring + interpretations # ------------------------------- def grade_band(value: float, thresholds: list, labels: list): """Generic banding helper: thresholds ascending; returns (label_idx, label).""" for i, t in enumerate(thresholds): if value <= t: return i, labels[i] return len(labels)-1, labels[-1] def interpret_report(report: dict) -> dict: """Produce human-friendly interpretations with color badges and advice.""" r, c = report["shape"]["rows"], report["shape"]["cols"] max_bits = math.log2(max(2, r)) # Harvestable Energy (0..1) he = report.get("harvestable_energy_score", 0.0) he_pct = round(100 * he) he_idx, he_label = grade_band(1.0 - he, [0.15, 0.35, 0.6, 0.85], # invert so higher is better ["Excellent", "High", "Moderate", "Low", "Very Low"]) he_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][he_idx] # Gzip ratio (lower is better) gz = report.get("gzip_compression_ratio", 1.0) gz_idx, gz_label = grade_band(gz, [0.45, 0.7, 0.9, 1.1], ["Highly compressible", "Compressible", "Some structure", "Low structure", "Unstructured"]) gz_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][gz_idx] # kd-entropy (lower is better). Normalize by log2(n) Hkd = float(report.get("kd_partition_entropy_bits", 0.0)) Hkd_norm = normalize(Hkd, max_bits) kd_idx, kd_label = grade_band(Hkd_norm, [0.15, 0.3, 0.5, 0.75], ["Simple spatial blocks", "Moderately simple", "Mixed", "Complex", "Highly complex"]) kd_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][kd_idx] # Run-entropy / Sortedness aggregation for numeric columns per_col = report.get("per_column", {}) run_H = [] sorted_fracs = [] for col, st in per_col.items(): if "run_entropy_bits" in st: run_H.append(st["run_entropy_bits"]) sorted_fracs.append(st.get("sortedness_fraction", 0.0)) if run_H: runH_mean = float(np.mean(run_H)) runH_norm = normalize(runH_mean, max_bits) sort_mean = float(np.mean(sorted_fracs)) if sorted_fracs else 0.0 else: runH_norm = 1.0 sort_mean = 0.0 run_idx, run_label = grade_band(runH_norm, [0.15, 0.3, 0.5, 0.75], ["Long smooth runs", "Mostly smooth", "Mixed runs", "Choppy", "Highly choppy"]) run_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][run_idx] sort_idx, sort_label = grade_band(1.0 - sort_mean, [0.15, 0.3, 0.5, 0.75], ["Highly sorted", "Mostly sorted", "Partially sorted", "Barely sorted", "Unsorted"]) sort_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][sort_idx] # Duplicate rows dup = report.get("duplicate_row_fraction", 0.0) dup_idx, dup_label = grade_band(dup, [0.01, 0.05, 0.15, 0.3], ["Clean", "Light dups", "Moderate dups", "High dups", "Very high dups"]) dup_color = ["#10b981", "#34d399", "#f59e0b", "#f97316", "#ef4444"][dup_idx] # Recommendations (simple rule-based) recs = [] if he >= 0.7: recs.append("Leverage **adaptive algorithms** (TimSort-style merges, linear hull/skyline passes) for near-linear performance.") elif he >= 0.4: recs.append("Consider **light preprocessing** (bucketing, dedupe) to unlock more adaptive speedups.") else: recs.append("Expect **near worst-case costs**; use robust algorithms and consider feature engineering/cleaning.") if gz <= 0.7: recs.append("Data is **highly compressible** → try dictionary/columnar encoding and caching to cut memory/IO.") elif gz >= 1.0: recs.append("Data is **hard to compress** → prioritize dimensionality reduction or noise filtering.") if runH_norm <= 0.3 or sort_mean >= 0.7: recs.append("Columns show **long monotone runs** → merges and single-pass scans will be efficient.") else: recs.append("Columns are **choppy** → batch/aggregate before sorting to reduce comparisons.") if Hkd_norm <= 0.3: recs.append("Spatial structure is **simple** → kd/quad trees will be shallow; range queries will be fast.") elif Hkd_norm >= 0.6: recs.append("Spatial structure is **complex** → consider clustering/tiling before building indexes.") if dup >= 0.05: recs.append("De-duplicate rows to lower entropy and improve compression & joins.") # Summary verdict verdict = ["Outstanding structure for fast algorithms.", "Strong latent order; plenty of speed to harvest.", "Mixed: some order present; moderate gains possible.", "Low order; focus on cleaning and feature engineering.", "Chaotic: assume worst-case runtimes."][he_idx] return { "he": {"pct": he_pct, "label": he_label, "color": he_color}, "gzip": {"value": gz, "label": gz_label, "color": gz_color}, "kd": {"value": Hkd, "label": kd_label, "color": kd_color}, "runs": {"value": runH_norm, "label": run_label, "color": run_color}, "sorted": {"value": sort_mean, "label": sort_label, "color": sort_color}, "dup": {"value": dup, "label": dup_label, "color": dup_color}, "verdict": verdict, "recs": recs[:6] } # ------------------------------- # Compute metrics # ------------------------------- def compute_metrics(df: pd.DataFrame) -> dict: report = {} n_rows, n_cols = df.shape report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)} # Types types = {} for c in df.columns: s = df[c] if pd.api.types.is_numeric_dtype(s): types[c] = "numeric" elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower(): types[c] = "datetime" else: types[c] = "categorical" report["column_types"] = types missing = df.isna().mean().to_dict() dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df))) report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()} report["duplicate_row_fraction"] = dup_ratio col_stats = {} for c in df.columns: s = df[c] if types[c] == "numeric": H, k = numeric_binned_entropy(s) runs, Hruns = monotone_runs_and_entropy(s) sorted_frac = sortedness_score(s) col_stats[c] = { "entropy_binned_bits": float(H), "active_bins": int(k), "monotone_runs": int(runs), "run_entropy_bits": float(Hruns), "sortedness_fraction": float(sorted_frac), "min": float(np.nanmin(s.values)) if s.dropna().shape[0] else None, "max": float(np.nanmax(s.values)) if s.dropna().shape[0] else None, "mean": float(np.nanmean(s.values)) if s.dropna().shape[0] else None, "std": float(np.nanstd(s.values)) if s.dropna().shape[0] else None, } elif types[c] == "datetime": try: sd = pd.to_datetime(s, errors="coerce") min_dt = sd.min() max_dt = sd.max() col_stats[c] = { "entropy_bits": 0.0, "unique_values": int(sd.nunique(dropna=True)), "min_datetime": None if pd.isna(min_dt) else min_dt.isoformat(), "max_datetime": None if pd.isna(max_dt) else max_dt.isoformat(), } except Exception: col_stats[c] = {"entropy_bits": 0.0, "unique_values": int(s.nunique(dropna=True))} else: H, k = categorical_entropy(s) # top-5 categories vc = s.astype(str).value_counts(dropna=True).head(5) top5 = [{"value": str(idx), "count": int(cnt)} for idx, cnt in vc.items()] col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k), "top_values": top5} report["per_column"] = col_stats try: gzip_ratio = dataframe_gzip_ratio(df) except Exception: gzip_ratio = 1.0 report["gzip_compression_ratio"] = float(gzip_ratio) num_cols = [c for c, t in types.items() if t == "numeric"] if len(num_cols) >= 2: X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float) X = X[~np.isnan(X).any(axis=1)] if X.shape[0] >= 3: pts2 = X[:, :2] report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2)) try: H_kd = kd_entropy(pts2, max_leaf=128, axis=0) except Exception: H_kd = 0.0 report["kd_partition_entropy_bits"] = float(H_kd) else: report["pareto_maxima_2d"] = 0 report["kd_partition_entropy_bits"] = 0.0 else: report["pareto_maxima_2d"] = 0 report["kd_partition_entropy_bits"] = 0.0 # Harvestable Energy max_bits = math.log2(max(2, n_rows)) he_parts = [] he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"]))) num_run_entropies = [] for c in df.columns: st = col_stats.get(c, {}) if "run_entropy_bits" in st: num_run_entropies.append(st["run_entropy_bits"]) if num_run_entropies: mean_run_H = float(np.mean(num_run_entropies)) he_parts.append(1.0 - normalize(mean_run_H, max_bits)) H_kd = report.get("kd_partition_entropy_bits", 0.0) if H_kd is not None: he_parts.append(1.0 - normalize(float(H_kd), max_bits)) if he_parts: HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts])) else: HE = 0.0 report["harvestable_energy_score"] = HE return report # ------------------------------- # Dataset shape summary for other models # ------------------------------- def dataset_shape_summary(df: pd.DataFrame, report: dict, max_examples: int = 3) -> dict: """Compact JSON describing the dataset schema, ranges, and examples for LLM ingestion.""" cols = [] for name, t in report["column_types"].items(): col_info = {"name": name, "type": t} per = report["per_column"].get(name, {}) if t == "numeric": col_info.update({ "min": per.get("min"), "max": per.get("max"), "mean": per.get("mean"), "std": per.get("std"), "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) }) elif t == "datetime": col_info.update({ "min": per.get("min_datetime"), "max": per.get("max_datetime"), "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) }) else: # categorical or other col_info.update({ "unique_values": per.get("unique_values"), "top_values": per.get("top_values", []), "missing_frac": report["missing_fraction_per_column"].get(name, 0.0) }) cols.append(col_info) # few example rows (stringified to be safe) examples = df.head(max_examples).astype(str).to_dict(orient="records") shape = { "n_rows": report["shape"]["rows"], "n_cols": report["shape"]["cols"], "columns": cols, "duplicates_fraction": report.get("duplicate_row_fraction", 0.0), "gzip_compression_ratio": report.get("gzip_compression_ratio", None), "harvestable_energy_score": report.get("harvestable_energy_score", None), "examples": examples } return shape # ------------------------------- # UI rendering helpers # ------------------------------- def badge(text: str, color: str) -> str: return f"{text}" def metric_card(title: str, value: str, badge_html: str) -> str: return f"""
{title}
{value}
{badge_html}
""" def render_dashboard(report: dict, interp: dict) -> str: he = interp["he"] gz = interp["gzip"] kd = interp["kd"] runs = interp["runs"] sortb = interp["sorted"] dup = interp["dup"] cards = [] cards.append(metric_card("Harvestable Energy", f"{he['pct']} / 100", badge(he['label'], he['color']))) cards.append(metric_card("Compressibility (gzip)", f"{gz['value']:.3f}", badge(gz['label'], gz['color']))) cards.append(metric_card("Range-Partition Entropy (kd bits)", f"{kd['value']:.3f}", badge(kd['label'], kd['color']))) cards.append(metric_card("Run-Entropy (avg, normalized)", f"{runs['value']:.2f}", badge(runs['label'], runs['color']))) cards.append(metric_card("Sortedness (avg fraction)", f"{sortb['value']:.2f}", badge(sortb['label'], sortb['color']))) cards.append(metric_card("Duplicate Rows (fraction)", f"{dup['value']:.2f}", badge(dup['label'], dup['color']))) grid = "
" + "".join(cards) + "
" verdict = f"
Verdict: {interp['verdict']}
" return grid + verdict def render_recs(interp: dict) -> str: lis = "".join([f"
  • {r}
  • " for r in interp["recs"]]) return f"" def render_columns(report: dict) -> str: rows = [] for c, st in report.get("per_column", {}).items(): miss = report["missing_fraction_per_column"].get(c, 0.0) if "entropy_binned_bits" in st: rows.append(f"{c} (num){miss:.1%}{st['entropy_binned_bits']:.2f}{st['monotone_runs']}{st['run_entropy_bits']:.2f}{st['sortedness_fraction']:.2f}") elif "entropy_bits" in st: rows.append(f"{c} (cat){miss:.1%}{st['entropy_bits']:.2f}---") else: rows.append(f"{c}{miss:.1%}----") header = "ColumnMissingEntropyMonotone RunsRun-EntropySortedness" table = "" + header + "".join(rows) + "
    " table = table.replace("", "") table = table.replace("", "") table = table.replace("", "") return table # ------------------------------- # Gradio app # ------------------------------- def analyze(file): if file is None: return "{}", "Please upload a CSV.", "", "", "{}" try: df = pd.read_csv(file.name) except Exception as e: return "{}", f"Failed to read CSV: {e}", "", "", "{}" report = compute_metrics(df) interp = interpret_report(report) shape = dataset_shape_summary(df, report, max_examples=3) report_json = json.dumps(report, indent=2) dashboard_html = render_dashboard(report, interp) recs_html = render_recs(interp) cols_html = render_columns(report) shape_json = json.dumps(shape, indent=2) return report_json, dashboard_html, recs_html, cols_html, shape_json with gr.Blocks(title="OrderLens — Data Interpreter") as demo: gr.Markdown("# OrderLens — Data Interpreter") gr.Markdown("Upload a CSV and get **readable** structure metrics with plain-language guidance.") with gr.Row(): inp = gr.File(file_types=[".csv"], label="CSV file") btn = gr.Button("Analyze", variant="primary") gr.Markdown("---") gr.Markdown("### Dashboard") # color-coded cards + verdict dash = gr.HTML() gr.Markdown("### Recommendations") # actionable tips recs = gr.HTML() gr.Markdown("### Column Details") # per-column table cols = gr.HTML() gr.Markdown("### Dataset Shape Summary (JSON)") # compact schema for other models shape_out = gr.Code(label="Shape", language="json") gr.Markdown("### Raw report (JSON)") # API-friendly json_out = gr.Code(label="Report", language="json") btn.click(analyze, inputs=inp, outputs=[json_out, dash, recs, cols, shape_out]) if __name__ == "__main__": demo.launch()