from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel from typing import List, Optional, Dict, Any import uvicorn import asyncio import json import os from datetime import datetime import logging # Import your scrapers from app3 import PhoneDBScraper, GSMArenaScraperAlternative # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create FastAPI app app = FastAPI( title="Phone Specifications API", description="API for scraping phone specifications from PhoneDB and GSMArena", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify allowed origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files for frontend if os.path.exists("static"): app.mount("/static", StaticFiles(directory="static"), name="static") # Pydantic models class PhoneSearchRequest(BaseModel): phone_name: str source: str = "gsmarena" # "phonedb" or "gsmarena" class MultiplePhoneSearchRequest(BaseModel): phone_names: List[str] source: str = "gsmarena" class PhoneSpecification(BaseModel): name: str brand: str images: List[str] specifications: Dict[str, Any] source_url: str class ApiResponse(BaseModel): success: bool message: str data: Optional[Any] = None timestamp: str = datetime.now().isoformat() # Global scrapers phonedb_scraper = None gsmarena_scraper = None @app.on_event("startup") async def startup_event(): """Initialize scrapers on startup""" global phonedb_scraper, gsmarena_scraper try: phonedb_scraper = PhoneDBScraper() gsmarena_scraper = GSMArenaScraperAlternative() logger.info("Scrapers initialized successfully") except Exception as e: logger.error(f"Error initializing scrapers: {e}") # Routes @app.get("/", response_class=HTMLResponse) async def read_root(): """Serve the main HTML page""" try: with open("templates/index.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) except FileNotFoundError: return HTMLResponse(content=""" Phone Specs API

Phone Specifications API

API is running! Visit /docs for API documentation.

""") @app.get("/health") async def health_check(): """Health check endpoint""" return ApiResponse( success=True, message="API is healthy", data={"status": "running", "scrapers": {"phonedb": phonedb_scraper is not None, "gsmarena": gsmarena_scraper is not None}} ) @app.post("/api/search", response_model=ApiResponse) async def search_phone(request: PhoneSearchRequest): """Search for a single phone""" try: logger.info(f"Searching for phone: {request.phone_name} using {request.source}") # Choose scraper based on source if request.source.lower() == "phonedb" and phonedb_scraper: scraper = phonedb_scraper elif request.source.lower() == "gsmarena" and gsmarena_scraper: scraper = gsmarena_scraper else: # Default to GSMArena if available if gsmarena_scraper: scraper = gsmarena_scraper elif phonedb_scraper: scraper = phonedb_scraper else: raise HTTPException(status_code=503, detail="No scrapers available") # Run scraping in background to avoid blocking loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, scraper.scrape_phone_by_name, request.phone_name ) if result: return ApiResponse( success=True, message=f"Successfully found specifications for {result['name']}", data=result ) else: return ApiResponse( success=False, message=f"No results found for {request.phone_name}", data=None ) except Exception as e: logger.error(f"Error searching for phone {request.phone_name}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/search/multiple", response_model=ApiResponse) async def search_multiple_phones(request: MultiplePhoneSearchRequest): """Search for multiple phones""" try: logger.info(f"Searching for {len(request.phone_names)} phones using {request.source}") # Choose scraper if request.source.lower() == "phonedb" and phonedb_scraper: scraper = phonedb_scraper elif request.source.lower() == "gsmarena" and gsmarena_scraper: scraper = gsmarena_scraper else: if gsmarena_scraper: scraper = gsmarena_scraper elif phonedb_scraper: scraper = phonedb_scraper else: raise HTTPException(status_code=503, detail="No scrapers available") # Run scraping in background loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, scraper.scrape_multiple_phones, request.phone_names ) success_count = len(results) if results else 0 total_count = len(request.phone_names) return ApiResponse( success=success_count > 0, message=f"Successfully scraped {success_count}/{total_count} phones", data={ "phones": results, "success_count": success_count, "total_count": total_count } ) except Exception as e: logger.error(f"Error searching for multiple phones: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/sources") async def get_available_sources(): """Get available scraping sources""" sources = [] if phonedb_scraper: sources.append({ "id": "phonedb", "name": "PhoneDB", "description": "PhoneDB.net database", "available": True }) if gsmarena_scraper: sources.append({ "id": "gsmarena", "name": "GSMArena", "description": "GSMArena.com database", "available": True }) return ApiResponse( success=True, message="Available sources retrieved", data=sources ) @app.get("/api/export/{phone_name}") async def export_phone_data(phone_name: str, source: str = "gsmarena"): """Export phone data as JSON file""" try: # Choose scraper if source.lower() == "phonedb" and phonedb_scraper: scraper = phonedb_scraper else: scraper = gsmarena_scraper if not scraper: raise HTTPException(status_code=503, detail="Scraper not available") # Get phone data loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, scraper.scrape_phone_by_name, phone_name ) if not result: raise HTTPException(status_code=404, detail="Phone not found") # Create temporary file filename = f"{phone_name.replace(' ', '_')}_specs.json" filepath = f"/tmp/{filename}" with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, indent=2, ensure_ascii=False) return FileResponse( filepath, media_type='application/json', filename=filename ) except Exception as e: logger.error(f"Error exporting phone data: {e}") raise HTTPException(status_code=500, detail=str(e)) # Background tasks for long-running scraping jobs background_jobs = {} @app.post("/api/scrape/background") async def start_background_scraping(request: MultiplePhoneSearchRequest, background_tasks: BackgroundTasks): """Start background scraping job for multiple phones""" job_id = f"job_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Initialize job status background_jobs[job_id] = { "status": "started", "progress": 0, "total": len(request.phone_names), "results": [], "started_at": datetime.now().isoformat() } # Add background task background_tasks.add_task( run_background_scraping, job_id, request.phone_names, request.source ) return ApiResponse( success=True, message="Background scraping job started", data={"job_id": job_id} ) @app.get("/api/scrape/status/{job_id}") async def get_scraping_status(job_id: str): """Get status of background scraping job""" if job_id not in background_jobs: raise HTTPException(status_code=404, detail="Job not found") return ApiResponse( success=True, message="Job status retrieved", data=background_jobs[job_id] ) async def run_background_scraping(job_id: str, phone_names: List[str], source: str): """Run background scraping job""" try: # Choose scraper if source.lower() == "phonedb" and phonedb_scraper: scraper = phonedb_scraper else: scraper = gsmarena_scraper if not scraper: background_jobs[job_id]["status"] = "failed" background_jobs[job_id]["error"] = "Scraper not available" return background_jobs[job_id]["status"] = "running" results = [] for i, phone_name in enumerate(phone_names): try: # Update progress background_jobs[job_id]["progress"] = i background_jobs[job_id]["current_phone"] = phone_name # Scrape phone loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, scraper.scrape_phone_by_name, phone_name ) if result: results.append(result) # Add delay between requests await asyncio.sleep(2) except Exception as e: logger.error(f"Error scraping {phone_name} in background job: {e}") continue # Update job status background_jobs[job_id]["status"] = "completed" background_jobs[job_id]["progress"] = len(phone_names) background_jobs[job_id]["results"] = results background_jobs[job_id]["completed_at"] = datetime.now().isoformat() except Exception as e: background_jobs[job_id]["status"] = "failed" background_jobs[job_id]["error"] = str(e) logger.error(f"Background job {job_id} failed: {e}") if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=7860, reload=True, log_level="info" )