import io, math, json, gzip, textwrap import numpy as np import pandas as pd import gradio as gr from typing import Dict, Any # --- (Functions below are minimal clones to keep the Gradio app standalone) --- def shannon_entropy_from_counts(counts: np.ndarray) -> float: counts = counts.astype(float) total = counts.sum() if total <= 0: return 0.0 p = counts / total p = p[p > 0] return float(-(p * np.log2(p)).sum()) def numeric_binned_entropy(series: pd.Series, bins: int = 32): x = series.dropna().astype(float).values if x.size == 0: return 0.0, 0 try: qs = np.linspace(0, 1, bins + 1) edges = np.unique(np.nanpercentile(x, qs * 100)) if len(edges) < 2: edges = np.unique(x) hist, _ = np.histogram(x, bins=edges) except Exception: hist, _ = np.histogram(x, bins=bins) H = shannon_entropy_from_counts(hist) k = np.count_nonzero(hist) return H, max(k, 1) def categorical_entropy(series: pd.Series): x = series.dropna().astype(str).values if x.size == 0: return 0.0, 0 vals, counts = np.unique(x, return_counts=True) H = shannon_entropy_from_counts(counts) return H, len(vals) def monotone_runs_and_entropy(series: pd.Series): x = series.dropna().values n = len(x) if n <= 1: return 1, 0.0 runs = [1] for i in range(1, n): if x[i] >= x[i-1]: runs[-1] += 1 else: runs.append(1) run_lengths = np.array(runs, dtype=float) H = shannon_entropy_from_counts(run_lengths) return len(runs), H def sortedness_score(series: pd.Series) -> float: x = series.dropna().values if len(x) <= 1: return 1.0 return float(np.mean(np.diff(x) >= 0)) def gzip_compress_ratio_from_bytes(b: bytes) -> float: if len(b) == 0: return 1.0 out = io.BytesIO() with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write(b) compressed = out.getvalue() return len(compressed) / len(b) def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float: s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df raw = s.to_csv(index=False).encode("utf-8", errors="ignore") return gzip_compress_ratio_from_bytes(raw) def pareto_maxima_count(points: np.ndarray) -> int: if points.shape[1] < 2 or points.shape[0] == 0: return 0 P = points[:, :2] order = np.lexsort((-P[:, 1], -P[:, 0])) best_y = -np.inf count = 0 for idx in order: y = P[idx, 1] if y >= best_y: count += 1 best_y = y return int(count) def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float: n = points.shape[0] if n == 0: return 0.0 if n <= max_leaf: return 0.0 d = points.shape[1] vals = points[:, axis] med = np.median(vals) left = points[vals <= med] right = points[vals > med] pL = len(left) / n pR = len(right) / n H_here = 0.0 for p in (pL, pR): if p > 0: H_here += -p * math.log(p, 2) next_axis = (axis + 1) % d return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis) def normalize(value: float, max_value: float) -> float: if max_value <= 0: return 0.0 v = max(0.0, min(1.0, value / max_value)) return float(v) def compute_metrics(df: pd.DataFrame): report = {} n_rows, n_cols = df.shape report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)} # Types types = {} for c in df.columns: s = df[c] if pd.api.types.is_numeric_dtype(s): types[c] = "numeric" elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower(): types[c] = "datetime" else: types[c] = "categorical" report["column_types"] = types missing = df.isna().mean().to_dict() dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df))) report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()} report["duplicate_row_fraction"] = dup_ratio col_stats = {} for c in df.columns: s = df[c] if types[c] == "numeric": H, k = numeric_binned_entropy(s) runs, Hruns = monotone_runs_and_entropy(s) sorted_frac = sortedness_score(s) col_stats[c] = { "entropy_binned_bits": float(H), "active_bins": int(k), "monotone_runs": int(runs), "run_entropy_bits": float(Hruns), "sortedness_fraction": float(sorted_frac), } else: H, k = categorical_entropy(s) col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k)} report["per_column"] = col_stats try: gzip_ratio = dataframe_gzip_ratio(df) except Exception: gzip_ratio = 1.0 report["gzip_compression_ratio"] = float(gzip_ratio) num_cols = [c for c, t in types.items() if t == "numeric"] if len(num_cols) >= 2: X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float) X = X[~np.isnan(X).any(axis=1)] if X.shape[0] >= 3: pts2 = X[:, :2] report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2)) try: H_kd = kd_entropy(pts2, max_leaf=128, axis=0) except Exception: H_kd = 0.0 report["kd_partition_entropy_bits"] = float(H_kd) else: report["pareto_maxima_2d"] = 0 report["kd_partition_entropy_bits"] = 0.0 else: report["pareto_maxima_2d"] = 0 report["kd_partition_entropy_bits"] = 0.0 max_bits = math.log2(max(2, n_rows)) he_parts = [] he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"]))) num_run_entropies = [] for c in df.columns: st = col_stats.get(c, {}) if "run_entropy_bits" in st: num_run_entropies.append(st["run_entropy_bits"]) if num_run_entropies: mean_run_H = float(np.mean(num_run_entropies)) he_parts.append(1.0 - normalize(mean_run_H, max_bits)) H_kd = report.get("kd_partition_entropy_bits", 0.0) if H_kd is not None: he_parts.append(1.0 - normalize(float(H_kd), max_bits)) if he_parts: HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts])) else: HE = 0.0 report["harvestable_energy_score"] = HE return report def explain_report(report: Dict[str, Any]) -> str: lines = [] r, c = report["shape"]["rows"], report["shape"]["cols"] lines.append(f"**Dataset shape:** {r} rows × {c} columns.") g = report.get("gzip_compression_ratio", None) if g is not None: lines.append(f"**Global compressibility (gzip ratio):** {g:.3f}. Lower = more structure.") he = report.get("harvestable_energy_score", 0.0) he_pct = int(100 * he) lines.append(f"**Harvestable Energy (0–100):** ~{he_pct}. Higher = more exploitable order.") pm = report.get("pareto_maxima_2d", None) if pm is not None: lines.append(f"**2D Pareto maxima (first two numeric cols):** {pm}.") Hkd = report.get("kd_partition_entropy_bits", None) if Hkd is not None: lines.append(f"**Range-partition entropy (kd approx):** {Hkd:.3f} bits.") lines.append("\\n**Column-level:**") for c, st in report.get("per_column", {}).items(): m = report["missing_fraction_per_column"].get(c, 0.0) if "entropy_binned_bits" in st: lines.append(f"- **{c}** (numeric): missing {m:.1%}, binned entropy {st['entropy_binned_bits']:.2f} bits, " f"{st['monotone_runs']} runs (run-entropy {st['run_entropy_bits']:.2f} bits), " f"sortedness {st['sortedness_fraction']:.2f}.") elif "entropy_bits" in st: lines.append(f"- **{c}** (categorical): missing {m:.1%}, entropy {st['entropy_bits']:.2f} bits, " f"{st['unique_values']} unique.") else: lines.append(f"- **{c}**: missing {m:.1%}.") lines.append("\\n**Tips:** Higher energy and lower entropies often allow near-linear algorithms (run-aware sorts, hull scans, envelope merges).") return "\\n".join(lines) def analyze(file): if file is None: return "Please upload a CSV.", "" try: df = pd.read_csv(file.name) except Exception as e: return f"Failed to read CSV: {e}", "" report = compute_metrics(df) md = explain_report(report) return json.dumps(report, indent=2), md with gr.Blocks(title="Dataset Energy & Entropy Analyzer") as demo: gr.Markdown("# Dataset Energy & Entropy Analyzer\nUpload a CSV to compute dataset structure metrics (entropy, runs, compressibility, kd-entropy) and an overall **Harvestable Energy** score.") with gr.Row(): inp = gr.File(file_types=[".csv"], label="CSV file") with gr.Row(): btn = gr.Button("Analyze", variant="primary") with gr.Row(): json_out = gr.Code(label="Raw report (JSON)", language="json") md_out = gr.Markdown() btn.click(analyze, inputs=inp, outputs=[json_out, md_out]) if __name__ == "__main__": demo.launch()