TuringsSolutions's picture
Create app.py
a7284df verified
raw
history blame
9.35 kB
import io, math, json, gzip, textwrap
import numpy as np
import pandas as pd
import gradio as gr
from typing import Dict, Any
# --- (Functions below are minimal clones to keep the Gradio app standalone) ---
def shannon_entropy_from_counts(counts: np.ndarray) -> float:
counts = counts.astype(float)
total = counts.sum()
if total <= 0:
return 0.0
p = counts / total
p = p[p > 0]
return float(-(p * np.log2(p)).sum())
def numeric_binned_entropy(series: pd.Series, bins: int = 32):
x = series.dropna().astype(float).values
if x.size == 0:
return 0.0, 0
try:
qs = np.linspace(0, 1, bins + 1)
edges = np.unique(np.nanpercentile(x, qs * 100))
if len(edges) < 2:
edges = np.unique(x)
hist, _ = np.histogram(x, bins=edges)
except Exception:
hist, _ = np.histogram(x, bins=bins)
H = shannon_entropy_from_counts(hist)
k = np.count_nonzero(hist)
return H, max(k, 1)
def categorical_entropy(series: pd.Series):
x = series.dropna().astype(str).values
if x.size == 0:
return 0.0, 0
vals, counts = np.unique(x, return_counts=True)
H = shannon_entropy_from_counts(counts)
return H, len(vals)
def monotone_runs_and_entropy(series: pd.Series):
x = series.dropna().values
n = len(x)
if n <= 1:
return 1, 0.0
runs = [1]
for i in range(1, n):
if x[i] >= x[i-1]:
runs[-1] += 1
else:
runs.append(1)
run_lengths = np.array(runs, dtype=float)
H = shannon_entropy_from_counts(run_lengths)
return len(runs), H
def sortedness_score(series: pd.Series) -> float:
x = series.dropna().values
if len(x) <= 1:
return 1.0
return float(np.mean(np.diff(x) >= 0))
def gzip_compress_ratio_from_bytes(b: bytes) -> float:
if len(b) == 0:
return 1.0
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write(b)
compressed = out.getvalue()
return len(compressed) / len(b)
def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float:
s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df
raw = s.to_csv(index=False).encode("utf-8", errors="ignore")
return gzip_compress_ratio_from_bytes(raw)
def pareto_maxima_count(points: np.ndarray) -> int:
if points.shape[1] < 2 or points.shape[0] == 0:
return 0
P = points[:, :2]
order = np.lexsort((-P[:, 1], -P[:, 0]))
best_y = -np.inf
count = 0
for idx in order:
y = P[idx, 1]
if y >= best_y:
count += 1
best_y = y
return int(count)
def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float:
n = points.shape[0]
if n == 0:
return 0.0
if n <= max_leaf:
return 0.0
d = points.shape[1]
vals = points[:, axis]
med = np.median(vals)
left = points[vals <= med]
right = points[vals > med]
pL = len(left) / n
pR = len(right) / n
H_here = 0.0
for p in (pL, pR):
if p > 0:
H_here += -p * math.log(p, 2)
next_axis = (axis + 1) % d
return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis)
def normalize(value: float, max_value: float) -> float:
if max_value <= 0:
return 0.0
v = max(0.0, min(1.0, value / max_value))
return float(v)
def compute_metrics(df: pd.DataFrame):
report = {}
n_rows, n_cols = df.shape
report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)}
# Types
types = {}
for c in df.columns:
s = df[c]
if pd.api.types.is_numeric_dtype(s):
types[c] = "numeric"
elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower():
types[c] = "datetime"
else:
types[c] = "categorical"
report["column_types"] = types
missing = df.isna().mean().to_dict()
dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df)))
report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()}
report["duplicate_row_fraction"] = dup_ratio
col_stats = {}
for c in df.columns:
s = df[c]
if types[c] == "numeric":
H, k = numeric_binned_entropy(s)
runs, Hruns = monotone_runs_and_entropy(s)
sorted_frac = sortedness_score(s)
col_stats[c] = {
"entropy_binned_bits": float(H),
"active_bins": int(k),
"monotone_runs": int(runs),
"run_entropy_bits": float(Hruns),
"sortedness_fraction": float(sorted_frac),
}
else:
H, k = categorical_entropy(s)
col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k)}
report["per_column"] = col_stats
try:
gzip_ratio = dataframe_gzip_ratio(df)
except Exception:
gzip_ratio = 1.0
report["gzip_compression_ratio"] = float(gzip_ratio)
num_cols = [c for c, t in types.items() if t == "numeric"]
if len(num_cols) >= 2:
X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float)
X = X[~np.isnan(X).any(axis=1)]
if X.shape[0] >= 3:
pts2 = X[:, :2]
report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2))
try:
H_kd = kd_entropy(pts2, max_leaf=128, axis=0)
except Exception:
H_kd = 0.0
report["kd_partition_entropy_bits"] = float(H_kd)
else:
report["pareto_maxima_2d"] = 0
report["kd_partition_entropy_bits"] = 0.0
else:
report["pareto_maxima_2d"] = 0
report["kd_partition_entropy_bits"] = 0.0
max_bits = math.log2(max(2, n_rows))
he_parts = []
he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"])))
num_run_entropies = []
for c in df.columns:
st = col_stats.get(c, {})
if "run_entropy_bits" in st:
num_run_entropies.append(st["run_entropy_bits"])
if num_run_entropies:
mean_run_H = float(np.mean(num_run_entropies))
he_parts.append(1.0 - normalize(mean_run_H, max_bits))
H_kd = report.get("kd_partition_entropy_bits", 0.0)
if H_kd is not None:
he_parts.append(1.0 - normalize(float(H_kd), max_bits))
if he_parts:
HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts]))
else:
HE = 0.0
report["harvestable_energy_score"] = HE
return report
def explain_report(report: Dict[str, Any]) -> str:
lines = []
r, c = report["shape"]["rows"], report["shape"]["cols"]
lines.append(f"**Dataset shape:** {r} rows × {c} columns.")
g = report.get("gzip_compression_ratio", None)
if g is not None:
lines.append(f"**Global compressibility (gzip ratio):** {g:.3f}. Lower = more structure.")
he = report.get("harvestable_energy_score", 0.0)
he_pct = int(100 * he)
lines.append(f"**Harvestable Energy (0–100):** ~{he_pct}. Higher = more exploitable order.")
pm = report.get("pareto_maxima_2d", None)
if pm is not None:
lines.append(f"**2D Pareto maxima (first two numeric cols):** {pm}.")
Hkd = report.get("kd_partition_entropy_bits", None)
if Hkd is not None:
lines.append(f"**Range-partition entropy (kd approx):** {Hkd:.3f} bits.")
lines.append("\\n**Column-level:**")
for c, st in report.get("per_column", {}).items():
m = report["missing_fraction_per_column"].get(c, 0.0)
if "entropy_binned_bits" in st:
lines.append(f"- **{c}** (numeric): missing {m:.1%}, binned entropy {st['entropy_binned_bits']:.2f} bits, "
f"{st['monotone_runs']} runs (run-entropy {st['run_entropy_bits']:.2f} bits), "
f"sortedness {st['sortedness_fraction']:.2f}.")
elif "entropy_bits" in st:
lines.append(f"- **{c}** (categorical): missing {m:.1%}, entropy {st['entropy_bits']:.2f} bits, "
f"{st['unique_values']} unique.")
else:
lines.append(f"- **{c}**: missing {m:.1%}.")
lines.append("\\n**Tips:** Higher energy and lower entropies often allow near-linear algorithms (run-aware sorts, hull scans, envelope merges).")
return "\\n".join(lines)
def analyze(file):
if file is None:
return "Please upload a CSV.", ""
try:
df = pd.read_csv(file.name)
except Exception as e:
return f"Failed to read CSV: {e}", ""
report = compute_metrics(df)
md = explain_report(report)
return json.dumps(report, indent=2), md
with gr.Blocks(title="Dataset Energy & Entropy Analyzer") as demo:
gr.Markdown("# Dataset Energy & Entropy Analyzer\nUpload a CSV to compute dataset structure metrics (entropy, runs, compressibility, kd-entropy) and an overall **Harvestable Energy** score.")
with gr.Row():
inp = gr.File(file_types=[".csv"], label="CSV file")
with gr.Row():
btn = gr.Button("Analyze", variant="primary")
with gr.Row():
json_out = gr.Code(label="Raw report (JSON)", language="json")
md_out = gr.Markdown()
btn.click(analyze, inputs=inp, outputs=[json_out, md_out])
if __name__ == "__main__":
demo.launch()