|
|
""" |
|
|
Lanternfly Field Capture Space - Resilient GPS (V9) |
|
|
A Gradio app for capturing photos with GPS coordinates and saving to Hugging Face datasets. |
|
|
|
|
|
This version incorporates all debugging fixes: safe handling of empty input, |
|
|
resilient component selection, and relaxed GPS timeout settings. |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
from datetime import datetime |
|
|
from PIL import Image |
|
|
from huggingface_hub import HfApi, hf_hub_download, create_repo, file_exists, upload_file |
|
|
import io |
|
|
import time |
|
|
import requests |
|
|
|
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_TOKEN_SPACE") |
|
|
DATASET_REPO = os.getenv("DATASET_REPO", "rlogh/lanternfly-data") |
|
|
|
|
|
|
|
|
api = None |
|
|
if HF_TOKEN and DATASET_REPO: |
|
|
try: |
|
|
|
|
|
api = HfApi(token=HF_TOKEN) |
|
|
create_repo(DATASET_REPO, repo_type="dataset", exist_ok=True, token=HF_TOKEN) |
|
|
print("β
Hugging Face credentials found - dataset saving enabled") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Error initializing HF API: {e}") |
|
|
api = None |
|
|
else: |
|
|
print("β οΈ Running in test mode - no HF credentials (dataset saving disabled)") |
|
|
|
|
|
|
|
|
METADATA_PATH = "metadata/entries.jsonl" |
|
|
IMAGES_DIR = "images" |
|
|
|
|
|
|
|
|
|
|
|
def get_current_time(): |
|
|
"""Get current timestamp in ISO format""" |
|
|
return datetime.now().isoformat() |
|
|
|
|
|
def handle_time_capture(): |
|
|
"""Handle time capture and return status message and timestamp.""" |
|
|
timestamp = get_current_time() |
|
|
status_msg = f"π **Time Captured**: {timestamp}" |
|
|
return status_msg, timestamp |
|
|
|
|
|
def _append_jsonl_in_repo(new_row: dict) -> None: |
|
|
"""Appends a JSON line to metadata/entries.jsonl in the dataset repo.""" |
|
|
if not api: return |
|
|
|
|
|
buf = io.BytesIO() |
|
|
existing_lines = [] |
|
|
|
|
|
for i in range(3): |
|
|
try: |
|
|
if file_exists(DATASET_REPO, METADATA_PATH, repo_type="dataset", token=HF_TOKEN): |
|
|
local_path = hf_hub_download( |
|
|
repo_id=DATASET_REPO, filename=METADATA_PATH, |
|
|
repo_type="dataset", token=HF_TOKEN |
|
|
) |
|
|
with open(local_path, "r", encoding="utf-8") as f: |
|
|
existing_lines = f.read().splitlines() |
|
|
break |
|
|
except Exception as e: |
|
|
if i == 2: raise e |
|
|
time.sleep(1 * (i + 1)) |
|
|
|
|
|
existing_lines.append(json.dumps(new_row, ensure_ascii=False)) |
|
|
data = "\n".join(existing_lines).encode("utf-8") |
|
|
buf.write(data); buf.seek(0) |
|
|
|
|
|
upload_file( |
|
|
path_or_fileobj=buf, |
|
|
path_in_repo=METADATA_PATH, |
|
|
repo_id=DATASET_REPO, |
|
|
repo_type="dataset", |
|
|
token=HF_TOKEN, |
|
|
commit_message=f"Append 1 entry at {datetime.now().isoformat()}Z", |
|
|
) |
|
|
|
|
|
def _save_image_to_repo(pil_img: Image.Image, dest_rel_path: str) -> None: |
|
|
"""Uploads a PIL image into the dataset repo.""" |
|
|
if not api: return |
|
|
|
|
|
img_bytes = io.BytesIO() |
|
|
pil_img.save(img_bytes, format="JPEG", quality=90) |
|
|
img_bytes.seek(0) |
|
|
|
|
|
upload_file( |
|
|
path_or_fileobj=img_bytes, |
|
|
path_in_repo=dest_rel_path, |
|
|
repo_id=DATASET_REPO, |
|
|
repo_type="dataset", |
|
|
token=HF_TOKEN, |
|
|
commit_message=f"Upload image {dest_rel_path}", |
|
|
) |
|
|
|
|
|
def handle_gps_location(json_str): |
|
|
""" |
|
|
Handles GPS location data from JavaScript. |
|
|
Includes V8 fix to prevent crash on empty initial input. |
|
|
""" |
|
|
|
|
|
if not json_str: |
|
|
return gr.NoAction(), gr.NoAction(), gr.NoAction(), gr.NoAction(), gr.NoAction() |
|
|
|
|
|
try: |
|
|
data = json.loads(json_str) |
|
|
|
|
|
if 'error' in data: |
|
|
|
|
|
error_map = { |
|
|
1: "Permission Denied (Check browser settings)", |
|
|
2: "Position Unavailable (Poor signal/network)", |
|
|
3: "Timeout Expired (Fix took too long)", |
|
|
0: "Geolocation not supported", |
|
|
'N/A': "Unknown Geolocation Error" |
|
|
} |
|
|
error_code = data.get('code', 'N/A') |
|
|
error_msg = error_map.get(error_code, data.get('error', 'Unknown Error')) |
|
|
|
|
|
gr.Warning(f"GPS Error: Code {error_code} ({error_msg})") |
|
|
|
|
|
status_msg = f"β **GPS Error (Code {error_code})**: {error_msg}" |
|
|
return status_msg, "N/A", "N/A", "N/A", get_current_time() |
|
|
|
|
|
lat = str(data.get('latitude', '')) |
|
|
lon = str(data.get('longitude', '')) |
|
|
accuracy = str(data.get('accuracy', '')) |
|
|
timestamp = str(data.get('timestamp', '')) |
|
|
|
|
|
try: |
|
|
acc_display = f"{float(accuracy):.1f}" |
|
|
except ValueError: |
|
|
acc_display = "N/A" |
|
|
|
|
|
status_msg = f"β
**GPS Captured**: {lat[:8]}, {lon[:8]} (accuracy: {acc_display}m)" |
|
|
return status_msg, lat, lon, accuracy, timestamp |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
status_msg = f"β **Error**: Failed to process GPS JSON: {str(e)}" |
|
|
gr.Error(status_msg) |
|
|
return status_msg, "Error", "Error", "Error", "Error" |
|
|
|
|
|
|
|
|
def get_gps_js(): |
|
|
"""JavaScript for robust, manually-triggered GPS capture.""" |
|
|
return """ |
|
|
() => { |
|
|
// Find the specific hidden textarea within the component container |
|
|
const container = document.querySelector('#hidden_gps_input'); |
|
|
let textarea = null; |
|
|
|
|
|
if (container) { |
|
|
// Find the actual textarea element inside the Gradio wrapper |
|
|
textarea = container.querySelector('textarea'); |
|
|
} |
|
|
|
|
|
if (!textarea) { |
|
|
console.error("DEBUG: Fatal: Hidden GPS textbox cannot be found."); |
|
|
return; |
|
|
} |
|
|
|
|
|
if (!navigator.geolocation) { |
|
|
console.error("DEBUG: Geolocation not supported by browser."); |
|
|
textarea.value = JSON.stringify({error: "Geolocation not supported", code: 0}); |
|
|
textarea.dispatchEvent(new Event('input', { bubbles: true })); |
|
|
return; |
|
|
} |
|
|
|
|
|
console.log("DEBUG: Starting Geolocation request (60s timeout, low accuracy preferred)."); |
|
|
|
|
|
navigator.geolocation.getCurrentPosition( |
|
|
function(position) { |
|
|
console.log("DEBUG: Geolocation SUCCESS.", position.coords); |
|
|
const data = { |
|
|
latitude: position.coords.latitude, |
|
|
longitude: position.coords.longitude, |
|
|
accuracy: position.coords.accuracy, |
|
|
timestamp: new Date(position.timestamp).toISOString() |
|
|
}; |
|
|
|
|
|
textarea.value = JSON.stringify(data); |
|
|
textarea.dispatchEvent(new Event('input', { bubbles: true })); |
|
|
}, |
|
|
function(err) { |
|
|
// Pass the error code back to Python |
|
|
console.error(`DEBUG: Geolocation FAILURE. Code: ${err.code}, Message: ${err.message}`); |
|
|
textarea.value = JSON.stringify({ error: err.message, code: err.code }); |
|
|
textarea.dispatchEvent(new Event('input', { bubbles: true })); |
|
|
}, |
|
|
// Options: enableHighAccuracy: false for faster fix, maximumAge for caching, 60s timeout |
|
|
{ enableHighAccuracy: false, timeout: 60000, maximumAge: 5000 } |
|
|
); |
|
|
} |
|
|
""" |
|
|
|
|
|
def save_to_dataset(image, lat, lon, accuracy_m, device_ts): |
|
|
"""Save image and metadata to Hugging Face dataset""" |
|
|
try: |
|
|
if image is None: |
|
|
return "β **Error**: No image captured. Please take a photo first.", "" |
|
|
if lat == "N/A" or lon == "N/A": |
|
|
return "β **Error**: GPS coordinates missing. Please click 'Get GPS' first.", "" |
|
|
|
|
|
|
|
|
if not api: |
|
|
server_ts = datetime.now().isoformat() |
|
|
img_id = str(uuid.uuid4()) |
|
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
row = { |
|
|
"id": img_id, "image": f"test_{timestamp_str}_{img_id[:8]}.jpg", |
|
|
"latitude": float(lat) if lat != 'N/A' else None, "longitude": float(lon) if lon != 'N/A' else None, |
|
|
"accuracy_m": accuracy_m, "device_timestamp": device_ts, |
|
|
"server_timestamp_utc": server_ts, "notes": "" |
|
|
} |
|
|
status = f"π **Test Mode**: Data validated successfully! Sample {img_id[:8]}" |
|
|
preview = json.dumps(row, indent=2) |
|
|
return status, preview |
|
|
|
|
|
|
|
|
sample_id = str(uuid.uuid4()) |
|
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
image_rel_path = f"{IMAGES_DIR}/lanternfly_{timestamp_str}_{sample_id[:8]}.jpg" |
|
|
|
|
|
_save_image_to_repo(image, image_rel_path) |
|
|
|
|
|
server_ts_utc = datetime.now().isoformat() + "Z" |
|
|
|
|
|
row = { |
|
|
"id": sample_id, "image": image_rel_path, |
|
|
"latitude": float(lat), "longitude": float(lon), |
|
|
"accuracy_m": float(accuracy_m), |
|
|
"device_timestamp": device_ts, |
|
|
"server_timestamp_utc": server_ts_utc, |
|
|
"location": f"{lat}, {lon}", |
|
|
"notes": "" |
|
|
} |
|
|
|
|
|
_append_jsonl_in_repo(row) |
|
|
|
|
|
status = ( |
|
|
"β
**Success!** Saved to dataset!\n\n" |
|
|
f"- Image: `{image_rel_path}`\n" |
|
|
f"- Lat/Lon: {row['latitude']}, {row['longitude']} (Β±{row['accuracy_m']} m)" |
|
|
) |
|
|
preview = json.dumps(row, indent=2) |
|
|
return status, preview |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"β **Critical Save Error**: {str(e)}" |
|
|
gr.Error(error_msg) |
|
|
return error_msg, "" |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Lanternfly Field Capture") as app: |
|
|
gr.Markdown("# π¦ Lanternfly Field Capture (Resilient GPS)") |
|
|
gr.Markdown("Click **'π Get GPS'** to capture location. **You must allow location permission** in your browser.") |
|
|
|
|
|
|
|
|
hidden_gps_input = gr.Textbox(visible=False, elem_id="hidden_gps_input") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
camera = gr.Image( |
|
|
streaming=False, |
|
|
height=380, |
|
|
label="π· Capture or Upload Photo", |
|
|
type="pil", |
|
|
sources=["webcam", "upload"] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown("### π Location Capture") |
|
|
|
|
|
|
|
|
gps_btn = gr.Button("π Get GPS", variant="primary") |
|
|
|
|
|
|
|
|
time_btn = gr.Button("π Get Current Time", variant="secondary") |
|
|
|
|
|
|
|
|
save_btn = gr.Button("πΎ Save Photo and Data to Dataset", variant="stop") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
|
|
|
status = gr.Markdown("π **Ready to capture data...**") |
|
|
|
|
|
|
|
|
gr.Markdown("### Captured Data") |
|
|
with gr.Row(): |
|
|
lat_box = gr.Textbox(label="Latitude", interactive=True, value="N/A") |
|
|
lon_box = gr.Textbox(label="Longitude", interactive=True, value="N/A") |
|
|
with gr.Row(): |
|
|
accuracy_box = gr.Textbox(label="Accuracy (meters)", interactive=True, value="N/A") |
|
|
device_ts_box = gr.Textbox(label="Device Timestamp", interactive=True, value="N/A") |
|
|
|
|
|
|
|
|
preview = gr.JSON(label="Preview Data Payload", visible=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gps_btn.click( |
|
|
fn=None, |
|
|
inputs=[], |
|
|
outputs=[], |
|
|
js=get_gps_js() |
|
|
) |
|
|
|
|
|
|
|
|
hidden_gps_input.change( |
|
|
fn=handle_gps_location, |
|
|
inputs=[hidden_gps_input], |
|
|
outputs=[status, lat_box, lon_box, accuracy_box, device_ts_box] |
|
|
) |
|
|
|
|
|
|
|
|
time_btn.click( |
|
|
fn=handle_time_capture, |
|
|
inputs=[], |
|
|
outputs=[status, device_ts_box] |
|
|
) |
|
|
|
|
|
|
|
|
save_btn.click( |
|
|
fn=save_to_dataset, |
|
|
inputs=[camera, lat_box, lon_box, accuracy_box, device_ts_box], |
|
|
outputs=[status, preview] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
app.launch(share=True) |
|
|
|