|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import os |
|
|
import threading |
|
|
import time |
|
|
import urllib.parse |
|
|
import zipfile |
|
|
import shutil |
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException, Form |
|
|
from fastapi.responses import JSONResponse |
|
|
import json |
|
|
import io |
|
|
from pathlib import Path |
|
|
from huggingface_hub import HfApi, hf_hub_download |
|
|
import asyncio |
|
|
import uvicorn |
|
|
from typing import Optional, Dict, Tuple, List |
|
|
import aiohttp |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
app = FastAPI( |
|
|
title="Cursor Detection and Tracking Server", |
|
|
description="Processes images to detect cursors and uploads results to dataset" |
|
|
) |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") |
|
|
HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "Fred808/data") |
|
|
HF_STATE_FILE = os.getenv("HF_STATE_FILE", "processing_state_cursors2.json") |
|
|
TEMP_DATASET_DIR = Path("temp_cursor_detection") |
|
|
TEMP_DATASET_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
def _get_zip_file_list_from_hf() -> list: |
|
|
"""Return sorted list of zip file paths from HF_DATASET_ID.""" |
|
|
try: |
|
|
api = HfApi(token=HF_TOKEN) |
|
|
files = api.list_repo_files(repo_id=HF_DATASET_ID, repo_type="dataset") |
|
|
zip_files = sorted([f for f in files if f.startswith('frames_zips/') and f.lower().endswith('.zip')]) |
|
|
print(f"[DATASET] Found {len(zip_files)} zip files in {HF_DATASET_ID}.") |
|
|
return zip_files |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Error listing HF dataset files: {e}") |
|
|
return [] |
|
|
|
|
|
def _download_and_extract_zip(repo_path: str) -> Optional[Path]: |
|
|
"""Download zip from HF dataset and extract into a temp subfolder.""" |
|
|
try: |
|
|
zip_local = hf_hub_download(repo_id=HF_DATASET_ID, filename=repo_path, repo_type="dataset", token=HF_TOKEN) |
|
|
zip_name = Path(repo_path).name |
|
|
extract_dir = TEMP_DATASET_DIR / zip_name.replace('.zip','') |
|
|
if extract_dir.exists(): |
|
|
shutil.rmtree(extract_dir) |
|
|
extract_dir.mkdir(parents=True, exist_ok=True) |
|
|
with zipfile.ZipFile(zip_local, 'r') as z: |
|
|
z.extractall(extract_dir) |
|
|
try: |
|
|
os.remove(zip_local) |
|
|
except Exception: |
|
|
pass |
|
|
return extract_dir |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Error downloading/extracting {repo_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
CURSOR_TEMPLATES: Dict[str, np.ndarray] = {} |
|
|
CURSOR_TEMPLATES_DIR = Path("cursors") |
|
|
|
|
|
|
|
|
|
|
|
def to_rgb(img: np.ndarray) -> Optional[np.ndarray]: |
|
|
"""Converts image to BGR format (3 channels). Handles None input.""" |
|
|
if img is None: |
|
|
return None |
|
|
if len(img.shape) == 2: |
|
|
return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) |
|
|
if img.shape[2] == 4: |
|
|
return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) |
|
|
return img |
|
|
|
|
|
def get_mask_from_alpha(template_img: np.ndarray) -> Optional[np.ndarray]: |
|
|
"""Extracts a mask from the alpha channel of a 4-channel image.""" |
|
|
if template_img is not None and len(template_img.shape) == 3 and template_img.shape[2] == 4: |
|
|
return (template_img[:, :, 3] > 0).astype(np.uint8) * 255 |
|
|
return None |
|
|
|
|
|
def detect_cursor_in_frame_multi( |
|
|
frame: np.ndarray, |
|
|
cursor_templates: Dict[str, np.ndarray], |
|
|
threshold: float = 0.8 |
|
|
) -> Tuple[Optional[Tuple[int, int]], float, Optional[str]]: |
|
|
""" |
|
|
Detects the best matching cursor template in a single frame. |
|
|
Returns (position, confidence, template_name). |
|
|
""" |
|
|
best_pos = None |
|
|
best_conf = -1.0 |
|
|
best_template_name = None |
|
|
frame_rgb = to_rgb(frame) |
|
|
|
|
|
if frame_rgb is None: |
|
|
return None, -1.0, None |
|
|
|
|
|
for template_name, cursor_template in cursor_templates.items(): |
|
|
template_rgb = to_rgb(cursor_template) |
|
|
mask = get_mask_from_alpha(cursor_template) |
|
|
|
|
|
if template_rgb is None or template_rgb.shape[2] != frame_rgb.shape[2]: |
|
|
continue |
|
|
|
|
|
if template_rgb.shape[0] > frame_rgb.shape[0] or template_rgb.shape[1] > frame_rgb.shape[1]: |
|
|
continue |
|
|
|
|
|
try: |
|
|
result = cv2.matchTemplate(frame_rgb, template_rgb, cv2.TM_CCOEFF_NORMED, mask=mask) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(result) |
|
|
|
|
|
if max_val > best_conf: |
|
|
best_conf = max_val |
|
|
if max_val >= threshold: |
|
|
cursor_w, cursor_h = template_rgb.shape[1], template_rgb.shape[0] |
|
|
cursor_x = max_loc[0] + cursor_w // 2 |
|
|
cursor_y = max_loc[1] + cursor_h // 2 |
|
|
best_pos = (cursor_x, cursor_y) |
|
|
best_template_name = template_name |
|
|
|
|
|
if best_conf >= threshold: |
|
|
return best_pos, best_conf, best_template_name |
|
|
return None, best_conf, None |
|
|
|
|
|
async def download_image_from_url(url: str) -> bytes: |
|
|
"""Download image from URL and return as bytes.""" |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url) as response: |
|
|
if response.status != 200: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Failed to fetch image from URL. Status code: {response.status}" |
|
|
) |
|
|
return await response.read() |
|
|
|
|
|
def load_cursor_templates(): |
|
|
"""Loads all cursor templates from the specified directory.""" |
|
|
global CURSOR_TEMPLATES |
|
|
if CURSOR_TEMPLATES: |
|
|
print("Templates already loaded.") |
|
|
return |
|
|
|
|
|
print(f"Loading cursor templates from: {CURSOR_TEMPLATES_DIR}") |
|
|
|
|
|
if not CURSOR_TEMPLATES_DIR.is_dir(): |
|
|
print(f"Error: Template directory not found at {CURSOR_TEMPLATES_DIR}") |
|
|
return |
|
|
|
|
|
for template_file in CURSOR_TEMPLATES_DIR.glob('*.png'): |
|
|
template_img = cv2.imread(str(template_file), cv2.IMREAD_UNCHANGED) |
|
|
if template_img is not None: |
|
|
CURSOR_TEMPLATES[template_file.name] = template_img |
|
|
else: |
|
|
print(f"[WARN] Could not load template: {template_file.name}") |
|
|
|
|
|
if not CURSOR_TEMPLATES: |
|
|
print(f"FATAL: No cursor templates found in: {CURSOR_TEMPLATES_DIR}") |
|
|
else: |
|
|
print(f"Successfully loaded {len(CURSOR_TEMPLATES)} templates.") |
|
|
|
|
|
|
|
|
|
|
|
def _load_hf_state() -> dict: |
|
|
"""Download the HF state file from the dataset and return parsed JSON.""" |
|
|
default = {"next_download_index": 0, "file_states": {}} |
|
|
try: |
|
|
api = HfApi(token=HF_TOKEN) |
|
|
files = api.list_repo_files(repo_id=HF_DATASET_ID, repo_type="dataset") |
|
|
if HF_STATE_FILE not in files: |
|
|
print(f"[DATASET] State file not found in {HF_DATASET_ID}. Using default state.") |
|
|
return default |
|
|
|
|
|
hf_hub_download(repo_id=HF_DATASET_ID, filename=HF_STATE_FILE, repo_type="dataset", token=HF_TOKEN, local_dir=TEMP_DATASET_DIR) |
|
|
p = TEMP_DATASET_DIR / HF_STATE_FILE |
|
|
with p.open('r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
if "file_states" not in data or not isinstance(data["file_states"], dict): |
|
|
data["file_states"] = {} |
|
|
if "next_download_index" not in data: |
|
|
data["next_download_index"] = 0 |
|
|
return data |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Failed to load HF state: {e}") |
|
|
return default |
|
|
|
|
|
def _upload_hf_state(state: dict) -> bool: |
|
|
"""Upload the HF state file to the dataset.""" |
|
|
try: |
|
|
p = TEMP_DATASET_DIR / HF_STATE_FILE |
|
|
with p.open('w', encoding='utf-8') as f: |
|
|
json.dump(state, f, indent=2) |
|
|
|
|
|
api = HfApi(token=HF_TOKEN) |
|
|
api.upload_file( |
|
|
path_or_fileobj=str(p), |
|
|
path_in_repo=HF_STATE_FILE, |
|
|
repo_id=HF_DATASET_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Update processing state: next_index={state.get('next_download_index')}" |
|
|
) |
|
|
print(f"[DATASET] Uploaded state to {HF_DATASET_ID}.") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Failed to upload HF state: {e}") |
|
|
return False |
|
|
|
|
|
def _lock_file_for_processing(image_name: str, state: dict) -> bool: |
|
|
"""Attempt to mark image as 'processing' and upload state to establish lock.""" |
|
|
print(f"[DATASET] Attempting to lock {image_name}...") |
|
|
state.setdefault('file_states', {}) |
|
|
state['file_states'][image_name] = 'processing' |
|
|
if _upload_hf_state(state): |
|
|
print(f"[DATASET] Locked {image_name}.") |
|
|
return True |
|
|
else: |
|
|
state['file_states'].pop(image_name, None) |
|
|
return False |
|
|
|
|
|
def _unlock_file_as_processed(image_name: str, state: dict, next_index: int) -> bool: |
|
|
"""Mark as processed and update next index, upload state.""" |
|
|
print(f"[DATASET] Marking {image_name} as processed...") |
|
|
state.setdefault('file_states', {}) |
|
|
state['file_states'][image_name] = 'processed' |
|
|
state['next_download_index'] = next_index |
|
|
return _upload_hf_state(state) |
|
|
|
|
|
|
|
|
|
|
|
def _upload_cursor_results(zip_name: str, results: dict) -> bool: |
|
|
"""Upload cursor detection results JSON to output dataset.""" |
|
|
try: |
|
|
filename = Path(zip_name).with_suffix('.json').name |
|
|
content = json.dumps(results, indent=2, ensure_ascii=False).encode('utf-8') |
|
|
api = HfApi(token=HF_TOKEN) |
|
|
api.upload_file( |
|
|
path_or_fileobj=io.BytesIO(content), |
|
|
path_in_repo=f"cursor_results/{filename}", |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Cursor detection results for {zip_name}" |
|
|
) |
|
|
print(f"[DATASET] Uploaded results for {zip_name} to {HF_OUTPUT_DATASET_ID}.") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Failed to upload results for {zip_name}: {e}") |
|
|
return False |
|
|
|
|
|
class DatasetProgress: |
|
|
"""Track dataset processing progress""" |
|
|
def __init__(self): |
|
|
self.current_image = None |
|
|
self.total_images = 0 |
|
|
self.processed_images = 0 |
|
|
self.status = "idle" |
|
|
self.error = None |
|
|
self.start_time = None |
|
|
|
|
|
def to_dict(self): |
|
|
return { |
|
|
"status": self.status, |
|
|
"current_image": self.current_image, |
|
|
"progress": f"{self.processed_images}/{self.total_images}" if self.total_images else "0/0", |
|
|
"elapsed": time.time() - self.start_time if self.start_time else 0, |
|
|
"error": self.error |
|
|
} |
|
|
|
|
|
|
|
|
dataset_progress = DatasetProgress() |
|
|
|
|
|
async def process_image(image_path: Path, threshold: float = 0.8) -> dict: |
|
|
"""Process a single image and return cursor detection results.""" |
|
|
try: |
|
|
|
|
|
frame = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED) |
|
|
if frame is None: |
|
|
raise ValueError(f"Could not read image: {image_path}") |
|
|
|
|
|
print(f"[DETECT] Processing image {image_path.name}, shape: {frame.shape}") |
|
|
|
|
|
|
|
|
pos, conf, template_name = detect_cursor_in_frame_multi(frame, CURSOR_TEMPLATES, threshold) |
|
|
|
|
|
|
|
|
if pos is not None: |
|
|
print(f"[DETECT] Found cursor in {image_path.name} at {pos} using template {template_name} (conf: {conf:.3f})") |
|
|
else: |
|
|
print(f"[DETECT] No cursor found in {image_path.name} (best conf: {conf:.3f})") |
|
|
|
|
|
|
|
|
confidence = float(conf) |
|
|
if confidence == float('inf') or confidence == float('-inf'): |
|
|
confidence = 1.0 if confidence > 0 else 0.0 |
|
|
|
|
|
return { |
|
|
'cursor_active': pos is not None, |
|
|
'x': pos[0] if pos else None, |
|
|
'y': pos[1] if pos else None, |
|
|
'confidence': confidence, |
|
|
'template': template_name, |
|
|
'image_shape': list(frame.shape) |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to process {image_path.name}: {str(e)}") |
|
|
raise ValueError(f"Error processing image {image_path.name}: {str(e)}") |
|
|
|
|
|
async def dataset_task(start_index: int = 1): |
|
|
"""Main dataset processing loop for processing zip files.""" |
|
|
global dataset_progress |
|
|
|
|
|
dataset_progress = DatasetProgress() |
|
|
dataset_progress.status = "starting" |
|
|
dataset_progress.start_time = time.time() |
|
|
|
|
|
print(f"[DATASET] Starting dataset task from index {start_index}...") |
|
|
|
|
|
if not CURSOR_TEMPLATES: |
|
|
err = "No cursor templates loaded" |
|
|
dataset_progress.status = "error" |
|
|
dataset_progress.error = err |
|
|
print(f"[DATASET] {err}") |
|
|
return False |
|
|
|
|
|
try: |
|
|
state = await asyncio.to_thread(_load_hf_state) |
|
|
zip_list = await asyncio.to_thread(_get_zip_file_list_from_hf) |
|
|
|
|
|
if not zip_list: |
|
|
err = "No zip files found in dataset" |
|
|
dataset_progress.status = "error" |
|
|
dataset_progress.error = err |
|
|
print(f"[DATASET] {err}") |
|
|
return False |
|
|
|
|
|
dataset_progress.total_images = len(zip_list) |
|
|
dataset_progress.status = "processing" |
|
|
|
|
|
if start_index < 1: |
|
|
start_index = 1 |
|
|
|
|
|
for idx in range(start_index-1, len(zip_list)): |
|
|
try: |
|
|
zip_path = zip_list[idx] |
|
|
zip_name = Path(zip_path).name |
|
|
print(f"[DATASET] Processing zip {idx + 1}/{len(zip_list)}: {zip_name}") |
|
|
|
|
|
file_state = state.get('file_states', {}).get(zip_name) |
|
|
if file_state == 'processed': |
|
|
print(f"[DATASET] Skipping {zip_name}: already processed.") |
|
|
dataset_progress.processed_images += 1 |
|
|
continue |
|
|
if file_state == 'processing': |
|
|
print(f"[DATASET] Skipping {zip_name}: currently processing by another worker.") |
|
|
continue |
|
|
|
|
|
|
|
|
locked = await asyncio.to_thread(_lock_file_for_processing, zip_name, state) |
|
|
if not locked: |
|
|
print(f"[DATASET] Could not lock {zip_name}; skipping.") |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
print(f"[DATASET] Downloading and extracting {zip_name}...") |
|
|
extract_dir = await asyncio.to_thread(_download_and_extract_zip, zip_path) |
|
|
if not extract_dir: |
|
|
print(f"[DATASET] Failed to download/extract {zip_name}; marking failed.") |
|
|
state['file_states'][zip_name] = 'failed' |
|
|
await asyncio.to_thread(_upload_hf_state, state) |
|
|
continue |
|
|
|
|
|
|
|
|
image_paths = [p for p in extract_dir.rglob('*') if p.is_file() and p.suffix.lower() in ('.jpg','.jpeg','.png')] |
|
|
print(f"[DATASET] Found {len(image_paths)} images in {zip_name}") |
|
|
|
|
|
|
|
|
results = [] |
|
|
print(f"[DATASET] Starting cursor detection on {len(image_paths)} images...") |
|
|
|
|
|
for i, image_path in enumerate(image_paths, 1): |
|
|
try: |
|
|
|
|
|
print(f"[DATASET] Processing image {i}/{len(image_paths)}: {image_path.name}") |
|
|
image_result = await process_image(image_path) |
|
|
|
|
|
|
|
|
image_result['image_name'] = image_path.name |
|
|
image_result['image_path'] = str(image_path.relative_to(extract_dir)) |
|
|
|
|
|
results.append(image_result) |
|
|
|
|
|
|
|
|
if image_result['cursor_active']: |
|
|
print(f"[DATASET] ✓ Found cursor in {image_path.name}") |
|
|
else: |
|
|
print(f"[DATASET] ✗ No cursor found in {image_path.name}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DATASET] Error processing {image_path.name}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
zip_results = { |
|
|
'zip_name': zip_name, |
|
|
'zip_path': zip_path, |
|
|
'total_images': len(image_paths), |
|
|
'processed_images': len(results), |
|
|
'results': results |
|
|
} |
|
|
|
|
|
|
|
|
uploaded = await asyncio.to_thread(_upload_cursor_results, zip_name, zip_results) |
|
|
|
|
|
if uploaded: |
|
|
next_index = idx + 2 |
|
|
ok = await asyncio.to_thread(_unlock_file_as_processed, zip_name, state, next_index) |
|
|
if not ok: |
|
|
print(f"[DATASET] Warning: processed but failed to update state for {zip_name}.") |
|
|
dataset_progress.processed_images += 1 |
|
|
print(f"[DATASET] Successfully processed {zip_name}") |
|
|
else: |
|
|
print(f"[DATASET] Failed to upload results for {zip_name}") |
|
|
state['file_states'][zip_name] = 'failed' |
|
|
await asyncio.to_thread(_upload_hf_state, state) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DATASET] Error processing zip {zip_name}: {e}") |
|
|
state['file_states'][zip_name] = 'failed' |
|
|
await asyncio.to_thread(_upload_hf_state, state) |
|
|
continue |
|
|
finally: |
|
|
|
|
|
try: |
|
|
if extract_dir and extract_dir.exists(): |
|
|
shutil.rmtree(extract_dir) |
|
|
except Exception as e: |
|
|
print(f"[DATASET] Warning: Failed to clean up {extract_dir}: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DATASET] Error in zip processing loop: {e}") |
|
|
continue |
|
|
|
|
|
print(f"[DATASET] Task completed. Processed {dataset_progress.processed_images}/{len(zip_list)} zip files.") |
|
|
dataset_progress.status = "completed" |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
err = f"Error in main processing loop: {str(e)}" |
|
|
dataset_progress.status = "error" |
|
|
dataset_progress.error = err |
|
|
print(f"[DATASET] {err}") |
|
|
return False |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
"""Load templates when the application starts.""" |
|
|
if not CURSOR_TEMPLATES_DIR.exists(): |
|
|
print(f"Creating cursor templates directory: {CURSOR_TEMPLATES_DIR}") |
|
|
CURSOR_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if not list(CURSOR_TEMPLATES_DIR.glob('*.png')): |
|
|
print("WARNING: No cursor template files found in cursors directory!") |
|
|
print(f"Please add cursor template PNG files to: {CURSOR_TEMPLATES_DIR}") |
|
|
print("The server will start but cursor detection will not work without templates.") |
|
|
|
|
|
load_cursor_templates() |
|
|
|
|
|
@app.post('/start_dataset') |
|
|
async def start_dataset(start_index: int = Form(1)): |
|
|
"""Trigger dataset processing in background.""" |
|
|
try: |
|
|
if dataset_progress and dataset_progress.status in ("starting", "processing"): |
|
|
return JSONResponse( |
|
|
status_code=400, |
|
|
content={ |
|
|
"status": "error", |
|
|
"error": "Dataset processing already running", |
|
|
"progress": dataset_progress.to_dict() |
|
|
} |
|
|
) |
|
|
|
|
|
if not CURSOR_TEMPLATES: |
|
|
return JSONResponse( |
|
|
status_code=503, |
|
|
content={ |
|
|
"status": "error", |
|
|
"error": "Cursor templates not loaded. Please ensure templates are available." |
|
|
} |
|
|
) |
|
|
|
|
|
import asyncio as _asyncio |
|
|
_asyncio.create_task(dataset_task(start_index)) |
|
|
return JSONResponse(content={ |
|
|
"status": "started", |
|
|
"start_index": start_index, |
|
|
"message": "Dataset processing started. Check /status endpoint for progress." |
|
|
}) |
|
|
except Exception as e: |
|
|
return JSONResponse(status_code=500, content={"status": "error", "error": str(e)}) |
|
|
|
|
|
@app.get('/dataset_status') |
|
|
async def get_dataset_status(): |
|
|
"""Get current dataset processing status and progress.""" |
|
|
if not dataset_progress: |
|
|
return {"status": "idle"} |
|
|
return dataset_progress.to_dict() |
|
|
|
|
|
@app.post("/track_cursor") |
|
|
async def track_cursor_endpoint( |
|
|
file: UploadFile = File(...), |
|
|
threshold: float = Form(0.8) |
|
|
): |
|
|
"""Process a single uploaded image and return cursor detection results.""" |
|
|
if not CURSOR_TEMPLATES: |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="Cursor templates are not loaded." |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
temp_file = TEMP_DATASET_DIR / "temp_image" |
|
|
temp_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
content = await file.read() |
|
|
with open(temp_file, 'wb') as f: |
|
|
f.write(content) |
|
|
|
|
|
|
|
|
results = await process_image(temp_file, threshold) |
|
|
|
|
|
|
|
|
try: |
|
|
os.remove(temp_file) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return JSONResponse(content=results) |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Error processing image: {str(e)}" |
|
|
) |
|
|
|
|
|
@app.post("/track_cursor_url") |
|
|
async def track_cursor_url_endpoint( |
|
|
image_url: str = Form(...), |
|
|
threshold: float = Form(0.8) |
|
|
): |
|
|
"""Process an image from URL and return cursor detection results.""" |
|
|
if not CURSOR_TEMPLATES: |
|
|
raise HTTPException( |
|
|
status_code=503, |
|
|
detail="Cursor templates are not loaded." |
|
|
) |
|
|
|
|
|
try: |
|
|
parsed_url = urlparse(image_url) |
|
|
if not all([parsed_url.scheme, parsed_url.netloc]): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Invalid URL provided" |
|
|
) |
|
|
|
|
|
content = await download_image_from_url(image_url) |
|
|
results = await process_image(content, threshold) |
|
|
results['source_url'] = image_url |
|
|
return JSONResponse(content=results) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"An error occurred while processing the image: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=port, timeout_keep_alive=75) |