Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, BackgroundTasks | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import HTMLResponse, FileResponse | |
from pydantic import BaseModel | |
from typing import List, Optional, Dict, Any | |
import uvicorn | |
import asyncio | |
import json | |
import os | |
from datetime import datetime | |
import logging | |
# Import your scrapers | |
from app3 import PhoneDBScraper, GSMArenaScraperAlternative | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Create FastAPI app | |
app = FastAPI( | |
title="Phone Specifications API", | |
description="API for scraping phone specifications from PhoneDB and GSMArena", | |
version="1.0.0" | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # In production, specify allowed origins | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Mount static files for frontend | |
if os.path.exists("static"): | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Pydantic models | |
class PhoneSearchRequest(BaseModel): | |
phone_name: str | |
source: str = "gsmarena" # "phonedb" or "gsmarena" | |
class MultiplePhoneSearchRequest(BaseModel): | |
phone_names: List[str] | |
source: str = "gsmarena" | |
class PhoneSpecification(BaseModel): | |
name: str | |
brand: str | |
images: List[str] | |
specifications: Dict[str, Any] | |
source_url: str | |
class ApiResponse(BaseModel): | |
success: bool | |
message: str | |
data: Optional[Any] = None | |
timestamp: str = datetime.now().isoformat() | |
# Global scrapers | |
phonedb_scraper = None | |
gsmarena_scraper = None | |
async def startup_event(): | |
"""Initialize scrapers on startup""" | |
global phonedb_scraper, gsmarena_scraper | |
try: | |
phonedb_scraper = PhoneDBScraper() | |
gsmarena_scraper = GSMArenaScraperAlternative() | |
logger.info("Scrapers initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing scrapers: {e}") | |
# Routes | |
async def read_root(): | |
"""Serve the main HTML page""" | |
try: | |
with open("templates/index.html", "r", encoding="utf-8") as f: | |
return HTMLResponse(content=f.read()) | |
except FileNotFoundError: | |
return HTMLResponse(content=""" | |
<html> | |
<head><title>Phone Specs API</title></head> | |
<body> | |
<h1>Phone Specifications API</h1> | |
<p>API is running! Visit <a href="/docs">/docs</a> for API documentation.</p> | |
</body> | |
</html> | |
""") | |
async def health_check(): | |
"""Health check endpoint""" | |
return ApiResponse( | |
success=True, | |
message="API is healthy", | |
data={"status": "running", "scrapers": {"phonedb": phonedb_scraper is not None, "gsmarena": gsmarena_scraper is not None}} | |
) | |
async def search_phone(request: PhoneSearchRequest): | |
"""Search for a single phone""" | |
try: | |
logger.info(f"Searching for phone: {request.phone_name} using {request.source}") | |
# Choose scraper based on source | |
if request.source.lower() == "phonedb" and phonedb_scraper: | |
scraper = phonedb_scraper | |
elif request.source.lower() == "gsmarena" and gsmarena_scraper: | |
scraper = gsmarena_scraper | |
else: | |
# Default to GSMArena if available | |
if gsmarena_scraper: | |
scraper = gsmarena_scraper | |
elif phonedb_scraper: | |
scraper = phonedb_scraper | |
else: | |
raise HTTPException(status_code=503, detail="No scrapers available") | |
# Run scraping in background to avoid blocking | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor( | |
None, | |
scraper.scrape_phone_by_name, | |
request.phone_name | |
) | |
if result: | |
return ApiResponse( | |
success=True, | |
message=f"Successfully found specifications for {result['name']}", | |
data=result | |
) | |
else: | |
return ApiResponse( | |
success=False, | |
message=f"No results found for {request.phone_name}", | |
data=None | |
) | |
except Exception as e: | |
logger.error(f"Error searching for phone {request.phone_name}: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def search_multiple_phones(request: MultiplePhoneSearchRequest): | |
"""Search for multiple phones""" | |
try: | |
logger.info(f"Searching for {len(request.phone_names)} phones using {request.source}") | |
# Choose scraper | |
if request.source.lower() == "phonedb" and phonedb_scraper: | |
scraper = phonedb_scraper | |
elif request.source.lower() == "gsmarena" and gsmarena_scraper: | |
scraper = gsmarena_scraper | |
else: | |
if gsmarena_scraper: | |
scraper = gsmarena_scraper | |
elif phonedb_scraper: | |
scraper = phonedb_scraper | |
else: | |
raise HTTPException(status_code=503, detail="No scrapers available") | |
# Run scraping in background | |
loop = asyncio.get_event_loop() | |
results = await loop.run_in_executor( | |
None, | |
scraper.scrape_multiple_phones, | |
request.phone_names | |
) | |
success_count = len(results) if results else 0 | |
total_count = len(request.phone_names) | |
return ApiResponse( | |
success=success_count > 0, | |
message=f"Successfully scraped {success_count}/{total_count} phones", | |
data={ | |
"phones": results, | |
"success_count": success_count, | |
"total_count": total_count | |
} | |
) | |
except Exception as e: | |
logger.error(f"Error searching for multiple phones: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_available_sources(): | |
"""Get available scraping sources""" | |
sources = [] | |
if phonedb_scraper: | |
sources.append({ | |
"id": "phonedb", | |
"name": "PhoneDB", | |
"description": "PhoneDB.net database", | |
"available": True | |
}) | |
if gsmarena_scraper: | |
sources.append({ | |
"id": "gsmarena", | |
"name": "GSMArena", | |
"description": "GSMArena.com database", | |
"available": True | |
}) | |
return ApiResponse( | |
success=True, | |
message="Available sources retrieved", | |
data=sources | |
) | |
async def export_phone_data(phone_name: str, source: str = "gsmarena"): | |
"""Export phone data as JSON file""" | |
try: | |
# Choose scraper | |
if source.lower() == "phonedb" and phonedb_scraper: | |
scraper = phonedb_scraper | |
else: | |
scraper = gsmarena_scraper | |
if not scraper: | |
raise HTTPException(status_code=503, detail="Scraper not available") | |
# Get phone data | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor( | |
None, | |
scraper.scrape_phone_by_name, | |
phone_name | |
) | |
if not result: | |
raise HTTPException(status_code=404, detail="Phone not found") | |
# Create temporary file | |
filename = f"{phone_name.replace(' ', '_')}_specs.json" | |
filepath = f"/tmp/{filename}" | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(result, f, indent=2, ensure_ascii=False) | |
return FileResponse( | |
filepath, | |
media_type='application/json', | |
filename=filename | |
) | |
except Exception as e: | |
logger.error(f"Error exporting phone data: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Background tasks for long-running scraping jobs | |
background_jobs = {} | |
async def start_background_scraping(request: MultiplePhoneSearchRequest, background_tasks: BackgroundTasks): | |
"""Start background scraping job for multiple phones""" | |
job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
# Initialize job status | |
background_jobs[job_id] = { | |
"status": "started", | |
"progress": 0, | |
"total": len(request.phone_names), | |
"results": [], | |
"started_at": datetime.now().isoformat() | |
} | |
# Add background task | |
background_tasks.add_task( | |
run_background_scraping, | |
job_id, | |
request.phone_names, | |
request.source | |
) | |
return ApiResponse( | |
success=True, | |
message="Background scraping job started", | |
data={"job_id": job_id} | |
) | |
async def get_scraping_status(job_id: str): | |
"""Get status of background scraping job""" | |
if job_id not in background_jobs: | |
raise HTTPException(status_code=404, detail="Job not found") | |
return ApiResponse( | |
success=True, | |
message="Job status retrieved", | |
data=background_jobs[job_id] | |
) | |
async def run_background_scraping(job_id: str, phone_names: List[str], source: str): | |
"""Run background scraping job""" | |
try: | |
# Choose scraper | |
if source.lower() == "phonedb" and phonedb_scraper: | |
scraper = phonedb_scraper | |
else: | |
scraper = gsmarena_scraper | |
if not scraper: | |
background_jobs[job_id]["status"] = "failed" | |
background_jobs[job_id]["error"] = "Scraper not available" | |
return | |
background_jobs[job_id]["status"] = "running" | |
results = [] | |
for i, phone_name in enumerate(phone_names): | |
try: | |
# Update progress | |
background_jobs[job_id]["progress"] = i | |
background_jobs[job_id]["current_phone"] = phone_name | |
# Scrape phone | |
loop = asyncio.get_event_loop() | |
result = await loop.run_in_executor( | |
None, | |
scraper.scrape_phone_by_name, | |
phone_name | |
) | |
if result: | |
results.append(result) | |
# Add delay between requests | |
await asyncio.sleep(2) | |
except Exception as e: | |
logger.error(f"Error scraping {phone_name} in background job: {e}") | |
continue | |
# Update job status | |
background_jobs[job_id]["status"] = "completed" | |
background_jobs[job_id]["progress"] = len(phone_names) | |
background_jobs[job_id]["results"] = results | |
background_jobs[job_id]["completed_at"] = datetime.now().isoformat() | |
except Exception as e: | |
background_jobs[job_id]["status"] = "failed" | |
background_jobs[job_id]["error"] = str(e) | |
logger.error(f"Background job {job_id} failed: {e}") | |
if __name__ == "__main__": | |
uvicorn.run( | |
"main:app", | |
host="0.0.0.0", | |
port=7860, | |
reload=True, | |
log_level="info" | |
) |