Spaces:
Running
Running
from __future__ import annotations | |
import json | |
import uuid | |
from datetime import datetime | |
from pathlib import Path | |
from typing import List, Dict, Any | |
import gradio as gr | |
from huggingface_hub import CommitScheduler, snapshot_download | |
# ------------------------------ | |
# Config | |
# ------------------------------ | |
DATASET_REPO_ID = "hugging-science/dataset-quest-index" | |
COMMIT_EVERY_MIN = 2 | |
LOCAL_SUBMISSIONS_DIR = Path("submissions") | |
LOCAL_SUBMISSIONS_DIR.mkdir(parents=True, exist_ok=True) | |
LOCAL_FILE = LOCAL_SUBMISSIONS_DIR / f"records_{uuid.uuid4().hex}.jsonl" | |
scheduler = CommitScheduler( | |
repo_id=DATASET_REPO_ID, | |
repo_type="dataset", | |
folder_path=LOCAL_SUBMISSIONS_DIR, | |
path_in_repo="data", | |
every=COMMIT_EVERY_MIN, | |
) | |
# ------------------------------ | |
# Utilities | |
# ------------------------------ | |
def _now_iso() -> str: | |
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" | |
def read_all_records() -> List[Dict[str, Any]]: | |
records: List[Dict[str, Any]] = [] | |
local_files = sorted(LOCAL_SUBMISSIONS_DIR.glob("*.jsonl")) | |
sources = list(local_files) | |
if not sources: | |
try: | |
snap_dir = Path(snapshot_download( | |
repo_id=DATASET_REPO_ID, | |
repo_type="dataset", | |
allow_patterns="data/*.jsonl" | |
)) | |
hub_data_dir = snap_dir / "data" | |
sources = sorted(hub_data_dir.glob("*.jsonl")) | |
except Exception: | |
# If snapshot fails (e.g., offline), we just return empty | |
sources = [] | |
for p in sources: | |
try: | |
with p.open("r", encoding="utf-8") as f: | |
for line in f: | |
line = line.strip() | |
if not line: | |
continue | |
try: | |
records.append(json.loads(line)) | |
except Exception: | |
pass | |
except FileNotFoundError: | |
pass | |
return records | |
def append_record(record: Dict[str, Any]) -> None: | |
LOCAL_FILE.parent.mkdir(parents=True, exist_ok=True) | |
with LOCAL_FILE.open("a", encoding="utf-8") as f: | |
f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
def filter_records(records: List[Dict[str, Any]], field: str | None, search: str | None) -> List[Dict[str, Any]]: | |
def match(rec: Dict[str, Any]) -> bool: | |
ok = True | |
if field and field != "All": | |
ok = ok and (rec.get("field") == field) | |
if search: | |
s = search.lower() | |
hay = " ".join( | |
str(rec.get(k, "")) for k in ["dataset_name", "dataset_url", "description", "user", "field"] | |
).lower() | |
ok = ok and (s in hay) | |
return ok | |
return [r for r in records if match(r)] | |
# ------------------------------ | |
# App logic | |
# ------------------------------ | |
SIZE_UNITS = ["KB", "MB", "GB", "TB"] | |
def submit_entry( | |
dataset_name: str, | |
dataset_url: str, | |
description: str, | |
size_value: float, | |
size_unit: str, | |
field: str, | |
profile: gr.OAuthProfile | None, | |
): | |
errors = [] | |
if not dataset_name.strip(): | |
errors.append("Dataset name is required.") | |
if not dataset_url.strip() or not dataset_url.startswith(("http://", "https://", "https://huggingface.co/")): | |
errors.append("Dataset URL must be an http(s) link.") | |
if size_value is None or size_value < 0: | |
errors.append("Approximate size must be a non-negative number.") | |
if not field.strip(): | |
errors.append("Please provide a field.") | |
# Check for existing dataset URL and name | |
existing_records = read_all_records() | |
for record in existing_records: | |
if record.get("dataset_url", "").strip().lower() == dataset_url.strip().lower(): | |
errors.append(f"Dataset URL already exists: {record.get('dataset_url')}") | |
if record.get("dataset_name", "").strip().lower() == dataset_name.strip().lower(): | |
errors.append(f"Dataset name already exists: {record.get('dataset_name')}") | |
if errors: | |
return gr.update(value=f"Submission failed:\n- " + "\n- ".join(errors), visible=True), gr.update(visible=False) | |
user_display = profile.name if profile else "anonymous" | |
user_handle = getattr(profile, "preferred_username", None) if profile else None | |
record = { | |
"id": uuid.uuid4().hex, | |
"created_at": _now_iso(), | |
"dataset_name": dataset_name.strip(), | |
"dataset_url": dataset_url.strip(), | |
"description": description.strip(), | |
"approx_size": float(size_value), | |
"size_unit": size_unit, | |
"field": field.strip(), | |
"user": user_handle or user_display, | |
} | |
append_record(record) | |
ok = f"Thanks, {user_display}. Your entry has been saved locally and will sync to the Hub within ~{COMMIT_EVERY_MIN} minutes." | |
updated = read_all_records() | |
rows = [ | |
[r["dataset_name"], f'<a href="{r["dataset_url"]}" target="_blank">{r["dataset_url"]}</a>', r["description"], f"{r['approx_size']} {r['size_unit']}", r["field"], r["user"], r["created_at"]] | |
for r in updated | |
] | |
return gr.update(value=ok, visible=True), rows | |
def refresh_table(field: str, search: str): | |
data = read_all_records() | |
data = filter_records(data, field, search) | |
rows = [ | |
[r["dataset_name"], f'<a href="{r["dataset_url"]}" target="_blank">{r["dataset_url"]}</a>', r["description"], f"{r['approx_size']} {r['size_unit']}", r["field"], r["user"], r["created_at"]] | |
for r in data | |
] | |
return rows | |
# ------------------------------ | |
# UI | |
# ------------------------------ | |
with gr.Blocks(title="Community Dataset Index", css=".wrap {margin: 0 auto}", fill_width=True) as demo: | |
gr.Markdown("# Community Dataset Index\nContribute datasets with a short description. Sign in to record your HF username.") | |
gr.LoginButton() | |
with gr.Row(elem_classes=["wrap"]): | |
with gr.Column(scale=1): | |
gr.Markdown("### Submit a dataset") | |
name = gr.Textbox(label="Dataset name", placeholder="e.g. The Pile") | |
url = gr.Textbox(label="Dataset URL (HF, website or paper)", placeholder="https://huggingface.co/datasets/... or https://...") | |
desc = gr.Textbox(label="Short description", lines=4) | |
with gr.Row(): | |
size_val = gr.Number(label="Approx. size", minimum=0, value=0) | |
size_unit = gr.Dropdown(SIZE_UNITS, value="GB", label="Unit") | |
field = gr.Textbox(label="Field (e.g. PDEs, multi-omics, single-cell, catalysts, etc.)") | |
submit = gr.Button("Submit", variant="primary") | |
notice = gr.Markdown(visible=False) | |
with gr.Column(scale=2): | |
gr.Markdown("### Browse & filter") | |
with gr.Row(): | |
field_filter = gr.Textbox(label="Field filter (leave blank for all)") | |
search = gr.Textbox(label="Search", placeholder="Search name, URL, description, user…") | |
refresh = gr.Button("Refresh") | |
table = gr.Dataframe( | |
headers=["Name", "URL", "Description", "Size", "Field", "User", "Created"], | |
datatype=["str", "html", "str", "str", "str", "str", "str"], | |
interactive=False, | |
wrap=True, | |
show_fullscreen_button=True, | |
) | |
submit.click( | |
submit_entry, | |
inputs=[name, url, desc, size_val, size_unit, field], | |
outputs=[notice, table], | |
show_progress="minimal", | |
) | |
refresh.click(refresh_table, inputs=[field_filter, search], outputs=table) | |
field_filter.change(refresh_table, inputs=[field_filter, search], outputs=table) | |
search.submit(refresh_table, inputs=[field_filter, search], outputs=table) | |
demo.load(lambda: refresh_table("", ""), inputs=None, outputs=table) | |
if __name__ == "__main__": | |
demo.launch(ssr_mode=False) | |