TuringsSolutions commited on
Commit
a7284df
·
verified ·
1 Parent(s): 37072b5

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +259 -0
app.py ADDED
@@ -0,0 +1,259 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import io, math, json, gzip, textwrap
2
+ import numpy as np
3
+ import pandas as pd
4
+ import gradio as gr
5
+
6
+ from typing import Dict, Any
7
+
8
+ # --- (Functions below are minimal clones to keep the Gradio app standalone) ---
9
+ def shannon_entropy_from_counts(counts: np.ndarray) -> float:
10
+ counts = counts.astype(float)
11
+ total = counts.sum()
12
+ if total <= 0:
13
+ return 0.0
14
+ p = counts / total
15
+ p = p[p > 0]
16
+ return float(-(p * np.log2(p)).sum())
17
+
18
+ def numeric_binned_entropy(series: pd.Series, bins: int = 32):
19
+ x = series.dropna().astype(float).values
20
+ if x.size == 0:
21
+ return 0.0, 0
22
+ try:
23
+ qs = np.linspace(0, 1, bins + 1)
24
+ edges = np.unique(np.nanpercentile(x, qs * 100))
25
+ if len(edges) < 2:
26
+ edges = np.unique(x)
27
+ hist, _ = np.histogram(x, bins=edges)
28
+ except Exception:
29
+ hist, _ = np.histogram(x, bins=bins)
30
+ H = shannon_entropy_from_counts(hist)
31
+ k = np.count_nonzero(hist)
32
+ return H, max(k, 1)
33
+
34
+ def categorical_entropy(series: pd.Series):
35
+ x = series.dropna().astype(str).values
36
+ if x.size == 0:
37
+ return 0.0, 0
38
+ vals, counts = np.unique(x, return_counts=True)
39
+ H = shannon_entropy_from_counts(counts)
40
+ return H, len(vals)
41
+
42
+ def monotone_runs_and_entropy(series: pd.Series):
43
+ x = series.dropna().values
44
+ n = len(x)
45
+ if n <= 1:
46
+ return 1, 0.0
47
+ runs = [1]
48
+ for i in range(1, n):
49
+ if x[i] >= x[i-1]:
50
+ runs[-1] += 1
51
+ else:
52
+ runs.append(1)
53
+ run_lengths = np.array(runs, dtype=float)
54
+ H = shannon_entropy_from_counts(run_lengths)
55
+ return len(runs), H
56
+
57
+ def sortedness_score(series: pd.Series) -> float:
58
+ x = series.dropna().values
59
+ if len(x) <= 1:
60
+ return 1.0
61
+ return float(np.mean(np.diff(x) >= 0))
62
+
63
+ def gzip_compress_ratio_from_bytes(b: bytes) -> float:
64
+ if len(b) == 0:
65
+ return 1.0
66
+ out = io.BytesIO()
67
+ with gzip.GzipFile(fileobj=out, mode="wb") as f:
68
+ f.write(b)
69
+ compressed = out.getvalue()
70
+ return len(compressed) / len(b)
71
+
72
+ def dataframe_gzip_ratio(df: pd.DataFrame, max_rows: int = 20000) -> float:
73
+ s = df.sample(min(len(df), max_rows), random_state=0) if len(df) > max_rows else df
74
+ raw = s.to_csv(index=False).encode("utf-8", errors="ignore")
75
+ return gzip_compress_ratio_from_bytes(raw)
76
+
77
+ def pareto_maxima_count(points: np.ndarray) -> int:
78
+ if points.shape[1] < 2 or points.shape[0] == 0:
79
+ return 0
80
+ P = points[:, :2]
81
+ order = np.lexsort((-P[:, 1], -P[:, 0]))
82
+ best_y = -np.inf
83
+ count = 0
84
+ for idx in order:
85
+ y = P[idx, 1]
86
+ if y >= best_y:
87
+ count += 1
88
+ best_y = y
89
+ return int(count)
90
+
91
+ def kd_entropy(points: np.ndarray, max_leaf: int = 128, axis: int = 0) -> float:
92
+ n = points.shape[0]
93
+ if n == 0:
94
+ return 0.0
95
+ if n <= max_leaf:
96
+ return 0.0
97
+ d = points.shape[1]
98
+ vals = points[:, axis]
99
+ med = np.median(vals)
100
+ left = points[vals <= med]
101
+ right = points[vals > med]
102
+ pL = len(left) / n
103
+ pR = len(right) / n
104
+ H_here = 0.0
105
+ for p in (pL, pR):
106
+ if p > 0:
107
+ H_here += -p * math.log(p, 2)
108
+ next_axis = (axis + 1) % d
109
+ return H_here + kd_entropy(left, max_leaf, next_axis) + kd_entropy(right, max_leaf, next_axis)
110
+
111
+ def normalize(value: float, max_value: float) -> float:
112
+ if max_value <= 0:
113
+ return 0.0
114
+ v = max(0.0, min(1.0, value / max_value))
115
+ return float(v)
116
+
117
+ def compute_metrics(df: pd.DataFrame):
118
+ report = {}
119
+ n_rows, n_cols = df.shape
120
+ report["shape"] = {"rows": int(n_rows), "cols": int(n_cols)}
121
+
122
+ # Types
123
+ types = {}
124
+ for c in df.columns:
125
+ s = df[c]
126
+ if pd.api.types.is_numeric_dtype(s):
127
+ types[c] = "numeric"
128
+ elif pd.api.types.is_datetime64_any_dtype(s) or "date" in str(s.dtype).lower():
129
+ types[c] = "datetime"
130
+ else:
131
+ types[c] = "categorical"
132
+ report["column_types"] = types
133
+
134
+ missing = df.isna().mean().to_dict()
135
+ dup_ratio = float((len(df) - len(df.drop_duplicates())) / max(1, len(df)))
136
+ report["missing_fraction_per_column"] = {k: float(v) for k, v in missing.items()}
137
+ report["duplicate_row_fraction"] = dup_ratio
138
+
139
+ col_stats = {}
140
+ for c in df.columns:
141
+ s = df[c]
142
+ if types[c] == "numeric":
143
+ H, k = numeric_binned_entropy(s)
144
+ runs, Hruns = monotone_runs_and_entropy(s)
145
+ sorted_frac = sortedness_score(s)
146
+ col_stats[c] = {
147
+ "entropy_binned_bits": float(H),
148
+ "active_bins": int(k),
149
+ "monotone_runs": int(runs),
150
+ "run_entropy_bits": float(Hruns),
151
+ "sortedness_fraction": float(sorted_frac),
152
+ }
153
+ else:
154
+ H, k = categorical_entropy(s)
155
+ col_stats[c] = {"entropy_bits": float(H), "unique_values": int(k)}
156
+ report["per_column"] = col_stats
157
+
158
+ try:
159
+ gzip_ratio = dataframe_gzip_ratio(df)
160
+ except Exception:
161
+ gzip_ratio = 1.0
162
+ report["gzip_compression_ratio"] = float(gzip_ratio)
163
+
164
+ num_cols = [c for c, t in types.items() if t == "numeric"]
165
+ if len(num_cols) >= 2:
166
+ X = df[num_cols].select_dtypes(include=[np.number]).values.astype(float)
167
+ X = X[~np.isnan(X).any(axis=1)]
168
+ if X.shape[0] >= 3:
169
+ pts2 = X[:, :2]
170
+ report["pareto_maxima_2d"] = int(pareto_maxima_count(pts2))
171
+ try:
172
+ H_kd = kd_entropy(pts2, max_leaf=128, axis=0)
173
+ except Exception:
174
+ H_kd = 0.0
175
+ report["kd_partition_entropy_bits"] = float(H_kd)
176
+ else:
177
+ report["pareto_maxima_2d"] = 0
178
+ report["kd_partition_entropy_bits"] = 0.0
179
+ else:
180
+ report["pareto_maxima_2d"] = 0
181
+ report["kd_partition_entropy_bits"] = 0.0
182
+
183
+ max_bits = math.log2(max(2, n_rows))
184
+ he_parts = []
185
+ he_parts.append(1.0 - max(0.0, min(1.0, report["gzip_compression_ratio"])))
186
+ num_run_entropies = []
187
+ for c in df.columns:
188
+ st = col_stats.get(c, {})
189
+ if "run_entropy_bits" in st:
190
+ num_run_entropies.append(st["run_entropy_bits"])
191
+ if num_run_entropies:
192
+ mean_run_H = float(np.mean(num_run_entropies))
193
+ he_parts.append(1.0 - normalize(mean_run_H, max_bits))
194
+ H_kd = report.get("kd_partition_entropy_bits", 0.0)
195
+ if H_kd is not None:
196
+ he_parts.append(1.0 - normalize(float(H_kd), max_bits))
197
+ if he_parts:
198
+ HE = float(np.mean([max(0.0, min(1.0, v)) for v in he_parts]))
199
+ else:
200
+ HE = 0.0
201
+ report["harvestable_energy_score"] = HE
202
+
203
+ return report
204
+
205
+ def explain_report(report: Dict[str, Any]) -> str:
206
+ lines = []
207
+ r, c = report["shape"]["rows"], report["shape"]["cols"]
208
+ lines.append(f"**Dataset shape:** {r} rows × {c} columns.")
209
+ g = report.get("gzip_compression_ratio", None)
210
+ if g is not None:
211
+ lines.append(f"**Global compressibility (gzip ratio):** {g:.3f}. Lower = more structure.")
212
+ he = report.get("harvestable_energy_score", 0.0)
213
+ he_pct = int(100 * he)
214
+ lines.append(f"**Harvestable Energy (0–100):** ~{he_pct}. Higher = more exploitable order.")
215
+ pm = report.get("pareto_maxima_2d", None)
216
+ if pm is not None:
217
+ lines.append(f"**2D Pareto maxima (first two numeric cols):** {pm}.")
218
+ Hkd = report.get("kd_partition_entropy_bits", None)
219
+ if Hkd is not None:
220
+ lines.append(f"**Range-partition entropy (kd approx):** {Hkd:.3f} bits.")
221
+ lines.append("\\n**Column-level:**")
222
+ for c, st in report.get("per_column", {}).items():
223
+ m = report["missing_fraction_per_column"].get(c, 0.0)
224
+ if "entropy_binned_bits" in st:
225
+ lines.append(f"- **{c}** (numeric): missing {m:.1%}, binned entropy {st['entropy_binned_bits']:.2f} bits, "
226
+ f"{st['monotone_runs']} runs (run-entropy {st['run_entropy_bits']:.2f} bits), "
227
+ f"sortedness {st['sortedness_fraction']:.2f}.")
228
+ elif "entropy_bits" in st:
229
+ lines.append(f"- **{c}** (categorical): missing {m:.1%}, entropy {st['entropy_bits']:.2f} bits, "
230
+ f"{st['unique_values']} unique.")
231
+ else:
232
+ lines.append(f"- **{c}**: missing {m:.1%}.")
233
+ lines.append("\\n**Tips:** Higher energy and lower entropies often allow near-linear algorithms (run-aware sorts, hull scans, envelope merges).")
234
+ return "\\n".join(lines)
235
+
236
+ def analyze(file):
237
+ if file is None:
238
+ return "Please upload a CSV.", ""
239
+ try:
240
+ df = pd.read_csv(file.name)
241
+ except Exception as e:
242
+ return f"Failed to read CSV: {e}", ""
243
+ report = compute_metrics(df)
244
+ md = explain_report(report)
245
+ return json.dumps(report, indent=2), md
246
+
247
+ with gr.Blocks(title="Dataset Energy & Entropy Analyzer") as demo:
248
+ gr.Markdown("# Dataset Energy & Entropy Analyzer\nUpload a CSV to compute dataset structure metrics (entropy, runs, compressibility, kd-entropy) and an overall **Harvestable Energy** score.")
249
+ with gr.Row():
250
+ inp = gr.File(file_types=[".csv"], label="CSV file")
251
+ with gr.Row():
252
+ btn = gr.Button("Analyze", variant="primary")
253
+ with gr.Row():
254
+ json_out = gr.Code(label="Raw report (JSON)", language="json")
255
+ md_out = gr.Markdown()
256
+ btn.click(analyze, inputs=inp, outputs=[json_out, md_out])
257
+
258
+ if __name__ == "__main__":
259
+ demo.launch()