dataset-quest / app.py
cgeorgiaw's picture
cgeorgiaw HF Staff
Update app.py
0c12a2b verified
raw
history blame
7.93 kB
from __future__ import annotations
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import gradio as gr
from huggingface_hub import CommitScheduler, snapshot_download
# ------------------------------
# Config
# ------------------------------
DATASET_REPO_ID = "hugging-science/dataset-quest-index"
COMMIT_EVERY_MIN = 2
LOCAL_SUBMISSIONS_DIR = Path("submissions")
LOCAL_SUBMISSIONS_DIR.mkdir(parents=True, exist_ok=True)
LOCAL_FILE = LOCAL_SUBMISSIONS_DIR / f"records_{uuid.uuid4().hex}.jsonl"
scheduler = CommitScheduler(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
folder_path=LOCAL_SUBMISSIONS_DIR,
path_in_repo="data",
every=COMMIT_EVERY_MIN,
)
# ------------------------------
# Utilities
# ------------------------------
def _now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def read_all_records() -> List[Dict[str, Any]]:
records: List[Dict[str, Any]] = []
local_files = sorted(LOCAL_SUBMISSIONS_DIR.glob("*.jsonl"))
sources = list(local_files)
if not sources:
try:
snap_dir = Path(snapshot_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
allow_patterns="data/*.jsonl"
))
hub_data_dir = snap_dir / "data"
sources = sorted(hub_data_dir.glob("*.jsonl"))
except Exception:
# If snapshot fails (e.g., offline), we just return empty
sources = []
for p in sources:
try:
with p.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except Exception:
pass
except FileNotFoundError:
pass
return records
def append_record(record: Dict[str, Any]) -> None:
LOCAL_FILE.parent.mkdir(parents=True, exist_ok=True)
with LOCAL_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def filter_records(records: List[Dict[str, Any]], field: str | None, search: str | None) -> List[Dict[str, Any]]:
def match(rec: Dict[str, Any]) -> bool:
ok = True
if field and field != "All":
ok = ok and (rec.get("field") == field)
if search:
s = search.lower()
hay = " ".join(
str(rec.get(k, "")) for k in ["dataset_name", "dataset_url", "description", "user", "field"]
).lower()
ok = ok and (s in hay)
return ok
return [r for r in records if match(r)]
# ------------------------------
# App logic
# ------------------------------
SIZE_UNITS = ["KB", "MB", "GB", "TB"]
def submit_entry(
dataset_name: str,
dataset_url: str,
description: str,
size_value: float,
size_unit: str,
field: str,
profile: gr.OAuthProfile | None,
):
errors = []
if not dataset_name.strip():
errors.append("Dataset name is required.")
if not dataset_url.strip() or not dataset_url.startswith(("http://", "https://", "https://huggingface.co/")):
errors.append("Dataset URL must be an http(s) link.")
if size_value is None or size_value < 0:
errors.append("Approximate size must be a non-negative number.")
if not field.strip():
errors.append("Please provide a field.")
# Check for existing dataset URL and name
existing_records = read_all_records()
for record in existing_records:
if record.get("dataset_url", "").strip().lower() == dataset_url.strip().lower():
errors.append(f"Dataset URL already exists: {record.get('dataset_url')}")
if record.get("dataset_name", "").strip().lower() == dataset_name.strip().lower():
errors.append(f"Dataset name already exists: {record.get('dataset_name')}")
if errors:
return gr.update(value=f"Submission failed:\n- " + "\n- ".join(errors), visible=True), gr.update(visible=False)
user_display = profile.name if profile else "anonymous"
user_handle = getattr(profile, "preferred_username", None) if profile else None
record = {
"id": uuid.uuid4().hex,
"created_at": _now_iso(),
"dataset_name": dataset_name.strip(),
"dataset_url": dataset_url.strip(),
"description": description.strip(),
"approx_size": float(size_value),
"size_unit": size_unit,
"field": field.strip(),
"user": user_handle or user_display,
}
append_record(record)
ok = f"Thanks, {user_display}. Your entry has been saved locally and will sync to the Hub within ~{COMMIT_EVERY_MIN} minutes."
updated = read_all_records()
rows = [
[r["dataset_name"], f'<a href="{r["dataset_url"]}" target="_blank">{r["dataset_url"]}</a>', r["description"], f"{r['approx_size']} {r['size_unit']}", r["field"], r["user"], r["created_at"]]
for r in updated
]
return gr.update(value=ok, visible=True), rows
def refresh_table(field: str, search: str):
data = read_all_records()
data = filter_records(data, field, search)
rows = [
[r["dataset_name"], f'<a href="{r["dataset_url"]}" target="_blank">{r["dataset_url"]}</a>', r["description"], f"{r['approx_size']} {r['size_unit']}", r["field"], r["user"], r["created_at"]]
for r in data
]
return rows
# ------------------------------
# UI
# ------------------------------
with gr.Blocks(title="Community Dataset Index", css=".wrap {margin: 0 auto}", fill_width=True) as demo:
gr.Markdown("# Community Dataset Index\nContribute datasets with a short description. Sign in to record your HF username.")
gr.LoginButton()
with gr.Row(elem_classes=["wrap"]):
with gr.Column(scale=1):
gr.Markdown("### Submit a dataset")
name = gr.Textbox(label="Dataset name", placeholder="e.g. The Pile")
url = gr.Textbox(label="Dataset URL (HF, website or paper)", placeholder="https://huggingface.co/datasets/... or https://...")
desc = gr.Textbox(label="Short description", lines=4)
with gr.Row():
size_val = gr.Number(label="Approx. size", minimum=0, value=0)
size_unit = gr.Dropdown(SIZE_UNITS, value="GB", label="Unit")
field = gr.Textbox(label="Field (e.g. PDEs, multi-omics, single-cell, catalysts, etc.)")
submit = gr.Button("Submit", variant="primary")
notice = gr.Markdown(visible=False)
with gr.Column(scale=2):
gr.Markdown("### Browse & filter")
with gr.Row():
field_filter = gr.Textbox(label="Field filter (leave blank for all)")
search = gr.Textbox(label="Search", placeholder="Search name, URL, description, user…")
refresh = gr.Button("Refresh")
table = gr.Dataframe(
headers=["Name", "URL", "Description", "Size", "Field", "User", "Created"],
datatype=["str", "html", "str", "str", "str", "str", "str"],
interactive=False,
wrap=True,
show_fullscreen_button=True,
)
submit.click(
submit_entry,
inputs=[name, url, desc, size_val, size_unit, field],
outputs=[notice, table],
show_progress="minimal",
)
refresh.click(refresh_table, inputs=[field_filter, search], outputs=table)
field_filter.change(refresh_table, inputs=[field_filter, search], outputs=table)
search.submit(refresh_table, inputs=[field_filter, search], outputs=table)
demo.load(lambda: refresh_table("", ""), inputs=None, outputs=table)
if __name__ == "__main__":
demo.launch(ssr_mode=False)