Spaces:
Paused
Paused
| import os | |
| import secrets | |
| import requests | |
| from fastapi import FastAPI, Depends, HTTPException, Header, Request | |
| from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, func | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker, Session | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Environment variables for MySQL and main API settings | |
| MYSQL_USER = os.getenv("MYSQL_USER") | |
| MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD") | |
| MYSQL_HOST = os.getenv("MYSQL_HOST") | |
| MYSQL_DB = os.getenv("MYSQL_DB") | |
| MAIN_API_KEY = os.getenv("MAIN_API_KEY") | |
| MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator") | |
| DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}" | |
| # SQLAlchemy setup | |
| engine = create_engine(DATABASE_URL) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base = declarative_base() | |
| # Updated User model with credits field | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True, index=True) | |
| username = Column(String(50), unique=True, index=True, nullable=False) | |
| hashed_password = Column(String(128), nullable=False) | |
| api_key = Column(String(64), unique=True, index=True, nullable=False) | |
| is_admin = Column(Boolean, default=False) | |
| credits = Column(Integer, default=0) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| # Create tables | |
| Base.metadata.create_all(bind=engine) | |
| app = FastAPI(title="API Key Platform") | |
| # Mount static files and templates | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # Dependency: Database session | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| # Utility: Generate a unique API key | |
| def generate_api_key() -> str: | |
| return secrets.token_hex(16) | |
| # Pydantic models | |
| class UserCreate(BaseModel): | |
| username: str | |
| password: str | |
| class UserOut(BaseModel): | |
| id: int | |
| username: str | |
| api_key: str | |
| is_admin: bool | |
| credits: int | |
| class Config: | |
| orm_mode = True | |
| # Dependency: Validate API key from header and return current user | |
| def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User: | |
| user = db.query(User).filter(User.api_key == x_api_key).first() | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Invalid API Key") | |
| return user | |
| # --- Endpoints --- | |
| # 1. User Registration (generates a unique API key) | |
| def register(user: UserCreate, db: Session = Depends(get_db)): | |
| if db.query(User).filter(User.username == user.username).first(): | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| new_api_key = generate_api_key() | |
| # In production, use proper password hashing | |
| new_user = User(username=user.username, hashed_password=user.password, api_key=new_api_key, is_admin=False) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| return new_user | |
| # 2. User Panel: Get current user info | |
| def read_user_me(current_user: User = Depends(get_current_user)): | |
| return current_user | |
| # 3. Admin Panel: List all users (admin-only) | |
| def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| users = db.query(User).all() | |
| return users | |
| # 4. Proxy Endpoint to access the main API | |
| class RequestPayload(BaseModel): | |
| prompt: str | |
| def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)): | |
| headers = { | |
| "Authorization": f"Bearer {MAIN_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": MODEL_NAME, | |
| "prompt": payload.prompt | |
| } | |
| response = requests.post(MAIN_API_URL, json=data, headers=headers) | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=response.status_code, detail="Error from main API") | |
| return response.json() | |
| # 5. New endpoint for users to test their API key | |
| def test_api(current_user: User = Depends(get_current_user)): | |
| return {"message": "API is working", "username": current_user.username, "credits": current_user.credits} | |
| # 6. New endpoint for admin to add credits to a user account | |
| class CreditPayload(BaseModel): | |
| username: str | |
| credits: int | |
| def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| user = db.query(User).filter(User.username == payload.username).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user.credits += payload.credits | |
| db.commit() | |
| db.refresh(user) | |
| return {"message": f"Added {payload.credits} credits to user {user.username}. Total credits: {user.credits}"} | |
| # 7. Render Admin Panel UI | |
| def admin_ui(request: Request, current_user: User = Depends(get_current_user)): | |
| if not current_user.is_admin: | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| return templates.TemplateResponse("admin.html", {"request": request}) | |
| # 8. Render User Panel UI | |
| def user_ui(request: Request, current_user: User = Depends(get_current_user)): | |
| return templates.TemplateResponse("user.html", {"request": request}) | |