|
import io, math, json, gzip, textwrap |
|
import numpy as np |
|
import pandas as pd |
|
import gradio as gr |
|
|
|
from typing import Dict, Any |
|
|
|
|
|
def shannon_entropy_from_counts(counts: np.ndarray) -> float: |
|
counts = counts.astype(float) |
|
total = counts.sum() |
|
if total <= 0: |
|
return 0.0 |
|
p = counts / total |
|
p = p[p > 0] |
|
return float(-(p * np.log2(p)).sum()) |
|
|
|
def numeric_binned_entropy(series: pd.Series, bins: int = 32): |
|
x = series.dropna().astype(float).values |
|
if x.size == 0: |
|
return 0.0, 0 |
|
try: |
|
qs = np.linspace(0, 1, bins + 1) |
|
edges = np.unique(np.nanpercentile(x, qs * 100)) |
|
if len(edges) < 2: |
|
edges = np.unique(x) |
|
hist, _ = np.histogram(x, bins=edges) |
|
except Exception: |
|
hist, _ = np.histogram(x, bins=bins) |
|
H = shannon_entropy_from_counts(hist) |
|
k = np.count_nonzero(hist) |
|
return H, max(k, 1) |
|
|
|
def categorical_entropy(series: pd.Series): |
|
x = series.dropna().astype(str).values |
|
if x.size == 0: |
|
return 0.0, 0 |
|
vals, counts = np.unique(x, return_counts=True) |
|
H = shannon_entropy_from_counts(counts) |
|
return H, len(vals) |
|
|
|
def monotone_runs_and_entropy(series: pd.Series): |
|
x = series.dropna().values |
|
n = len(x) |
|
if n <= 1: |
|
return 1, 0.0 |
|
runs = [1] |
|
for i in range(1, n): |
|
if x[i] >= x[i-1]: |
|
runs[-1] += 1 |
|
else: |
|
runs.append(1) |
|
run_lengths = np.array(runs, dtype=float) |
|
H = shannon_entropy_from_counts(run_lengths) |
|
return len(runs), H |
|
|
|
def sortedness_score(series: pd.Series) -> float: |
|
x = series.dropna().values |
|
if len(x) <= 1: |
|
return 1.0 |
|
return float(np.mean(np.diff(x) >= 0)) |
|
|
|
def gzip_compress_ratio_from_bytes(b: bytes) -> float: |
|
if len(b) == 0: |
|
return 1.0 |
|
out = io.BytesIO() |
|
with gzip.GzipFile(fileobj=out, mode="wb") as f: |
|
f.write(b) |
|
compressed = out.getvalue() |
|
return len(compressed) / len(b) |
|
|
|
def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float: |
|
s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df |
|
raw = s.to_csv(index=False).encode("utf-8", errors="ignore") |
|
return gzip_compress_ratio_from_bytes(raw) |
|
|
|
def pareto_maxima_count(points: np.ndarray) -> int: |
|
if points.shape[1] < 2 or points.shape[0] == 0: |
|
return 0 |
|
P = points[:, :2] |
|
order = np.lexsort((-P[:, 1], -P[:, 0])) |
|
best_y = -np.inf |
|
count = 0 |
|
for idx in order: |
|
y = P[idx, 1] |
|
if y >= best_y: |
|
count += 1 |
|
best_y = y |
|
return int(count) |
|
|
|
def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float: |
|
n = points.shape[0] |
|
if n == 0: |
|
return 0.0 |
|
if n <= max_leaf: |
|
return 0.0 |
|
d = points.shape[1] |
|
vals = points[:, axis] |
|
med = np.median(vals) |
|
left = points[vals <= med] |
|
right = points[vals > med] |
|
pL = len(left) / n |
|
pR = len(right) / n |
|
H_here = 0.0 |
|
for p in (pL, pR): |
|
if p > 0: |
|
H_here += -p * math.log(p, 2) |
|
next_axis = (axis + 1) % d |
|
return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis) |
|
|
|
def normalize(value: float, max_value: float) -> float: |
|
if max_value <= 0: |
|
return 0.0 |
|
v = max(0.0, min(1.0, value / max_value)) |
|
return float(v) |
|
|
|
def compute_metrics(df: pd.DataFrame): |
|
report = {} |
|
n_rows, n_cols = df.shape |
|
report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)} |
|
|
|
|
|
types = {} |
|
for c in df.columns: |
|
s = df[c] |
|
if pd.api.types.is_numeric_dtype(s): |
|
types[c] = "numeric" |
|
elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower(): |
|
types[c] = "datetime" |
|
else: |
|
types[c] = "categorical" |
|
report["column_types"] = types |
|
|
|
missing = df.isna().mean().to_dict() |
|
dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df))) |
|
report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()} |
|
report["duplicate_row_fraction"] = dup_ratio |
|
|
|
col_stats = {} |
|
for c in df.columns: |
|
s = df[c] |
|
if types[c] == "numeric": |
|
H, k = numeric_binned_entropy(s) |
|
runs, Hruns = monotone_runs_and_entropy(s) |
|
sorted_frac = sortedness_score(s) |
|
col_stats[c] = { |
|
"entropy_binned_bits": float(H), |
|
"active_bins": int(k), |
|
"monotone_runs": int(runs), |
|
"run_entropy_bits": float(Hruns), |
|
"sortedness_fraction": float(sorted_frac), |
|
} |
|
else: |
|
H, k = categorical_entropy(s) |
|
col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k)} |
|
report["per_column"] = col_stats |
|
|
|
try: |
|
gzip_ratio = dataframe_gzip_ratio(df) |
|
except Exception: |
|
gzip_ratio = 1.0 |
|
report["gzip_compression_ratio"] = float(gzip_ratio) |
|
|
|
num_cols = [c for c, t in types.items() if t == "numeric"] |
|
if len(num_cols) >= 2: |
|
X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float) |
|
X = X[~np.isnan(X).any(axis=1)] |
|
if X.shape[0] >= 3: |
|
pts2 = X[:, :2] |
|
report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2)) |
|
try: |
|
H_kd = kd_entropy(pts2, max_leaf=128, axis=0) |
|
except Exception: |
|
H_kd = 0.0 |
|
report["kd_partition_entropy_bits"] = float(H_kd) |
|
else: |
|
report["pareto_maxima_2d"] = 0 |
|
report["kd_partition_entropy_bits"] = 0.0 |
|
else: |
|
report["pareto_maxima_2d"] = 0 |
|
report["kd_partition_entropy_bits"] = 0.0 |
|
|
|
max_bits = math.log2(max(2, n_rows)) |
|
he_parts = [] |
|
he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"]))) |
|
num_run_entropies = [] |
|
for c in df.columns: |
|
st = col_stats.get(c, {}) |
|
if "run_entropy_bits" in st: |
|
num_run_entropies.append(st["run_entropy_bits"]) |
|
if num_run_entropies: |
|
mean_run_H = float(np.mean(num_run_entropies)) |
|
he_parts.append(1.0 - normalize(mean_run_H, max_bits)) |
|
H_kd = report.get("kd_partition_entropy_bits", 0.0) |
|
if H_kd is not None: |
|
he_parts.append(1.0 - normalize(float(H_kd), max_bits)) |
|
if he_parts: |
|
HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts])) |
|
else: |
|
HE = 0.0 |
|
report["harvestable_energy_score"] = HE |
|
|
|
return report |
|
|
|
def explain_report(report: Dict[str, Any]) -> str: |
|
lines = [] |
|
r, c = report["shape"]["rows"], report["shape"]["cols"] |
|
lines.append(f"**Dataset shape:** {r} rows × {c} columns.") |
|
g = report.get("gzip_compression_ratio", None) |
|
if g is not None: |
|
lines.append(f"**Global compressibility (gzip ratio):** {g:.3f}. Lower = more structure.") |
|
he = report.get("harvestable_energy_score", 0.0) |
|
he_pct = int(100 * he) |
|
lines.append(f"**Harvestable Energy (0–100):** ~{he_pct}. Higher = more exploitable order.") |
|
pm = report.get("pareto_maxima_2d", None) |
|
if pm is not None: |
|
lines.append(f"**2D Pareto maxima (first two numeric cols):** {pm}.") |
|
Hkd = report.get("kd_partition_entropy_bits", None) |
|
if Hkd is not None: |
|
lines.append(f"**Range-partition entropy (kd approx):** {Hkd:.3f} bits.") |
|
lines.append("\\n**Column-level:**") |
|
for c, st in report.get("per_column", {}).items(): |
|
m = report["missing_fraction_per_column"].get(c, 0.0) |
|
if "entropy_binned_bits" in st: |
|
lines.append(f"- **{c}** (numeric): missing {m:.1%}, binned entropy {st['entropy_binned_bits']:.2f} bits, " |
|
f"{st['monotone_runs']} runs (run-entropy {st['run_entropy_bits']:.2f} bits), " |
|
f"sortedness {st['sortedness_fraction']:.2f}.") |
|
elif "entropy_bits" in st: |
|
lines.append(f"- **{c}** (categorical): missing {m:.1%}, entropy {st['entropy_bits']:.2f} bits, " |
|
f"{st['unique_values']} unique.") |
|
else: |
|
lines.append(f"- **{c}**: missing {m:.1%}.") |
|
lines.append("\\n**Tips:** Higher energy and lower entropies often allow near-linear algorithms (run-aware sorts, hull scans, envelope merges).") |
|
return "\\n".join(lines) |
|
|
|
def analyze(file): |
|
if file is None: |
|
return "Please upload a CSV.", "" |
|
try: |
|
df = pd.read_csv(file.name) |
|
except Exception as e: |
|
return f"Failed to read CSV: {e}", "" |
|
report = compute_metrics(df) |
|
md = explain_report(report) |
|
return json.dumps(report, indent=2), md |
|
|
|
with gr.Blocks(title="Dataset Energy & Entropy Analyzer") as demo: |
|
gr.Markdown("# Dataset Energy & Entropy Analyzer\nUpload a CSV to compute dataset structure metrics (entropy, runs, compressibility, kd-entropy) and an overall **Harvestable Energy** score.") |
|
with gr.Row(): |
|
inp = gr.File(file_types=[".csv"], label="CSV file") |
|
with gr.Row(): |
|
btn = gr.Button("Analyze", variant="primary") |
|
with gr.Row(): |
|
json_out = gr.Code(label="Raw report (JSON)", language="json") |
|
md_out = gr.Markdown() |
|
btn.click(analyze, inputs=inp, outputs=[json_out, md_out]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |