Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, UniPCMultistepScheduler | |
import torch | |
import os | |
from PIL import Image | |
import base64 | |
import time | |
import logging | |
from huggingface_hub import list_repo_files | |
import psutil | |
# Disable GPU detection (remove these lines if GPU is available) | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
os.environ["CUDA_DEVICE_ORDER"] = "" | |
os.environ["TORCH_CUDA_ARCH_LIST"] = "" | |
torch.set_default_device("cpu") | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Log device and memory info | |
logger.info(f"Device in use: {torch.device('cpu')}") | |
logger.info(f"Available memory: {psutil.virtual_memory().available / (1024 ** 3):.2f} GB") | |
# Model cache | |
model_cache = {} | |
model_paths = { | |
"ssd-1b": "segmind/SSD-1B", # Using segmind/SSD-1B for testing | |
"sd-v1-5": "remiai3/stable-diffusion-v1-5" | |
} | |
# Image ratio to dimensions (optimized for CPU, multiple of 8) | |
ratio_to_dims = { | |
"1:1": (512, 512), # Default for SSD-1B | |
"3:4": (384, 512), | |
"16:9": (512, 288) | |
} | |
def load_model(model_id): | |
if model_id not in model_cache: | |
logger.info(f"Loading model {model_id} from {model_paths[model_id]}") | |
logger.info(f"HF_TOKEN present: {os.getenv('HF_TOKEN') is not None}") | |
try: | |
# Log repository files for debugging | |
repo_files = list_repo_files(model_paths[model_id], token=os.getenv("HF_TOKEN")) | |
logger.info(f"Files in {model_paths[model_id]}: {repo_files}") | |
# Choose pipeline based on model | |
pipe_class = StableDiffusionXLPipeline if model_id == "ssd-1b" else StableDiffusionPipeline | |
pipe = pipe_class.from_pretrained( | |
model_paths[model_id], | |
torch_dtype=torch.float32, | |
use_auth_token=os.getenv("HF_TOKEN"), | |
use_safetensors=True, | |
low_cpu_mem_usage=True | |
) | |
# Use UniPCMultistepScheduler for SSD-1B, DPMSolver for SD-v1-5 | |
scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) if model_id == "ssd-1b" else DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
pipe.scheduler = scheduler | |
pipe.enable_attention_slicing() | |
pipe.to(torch.device("cpu")) # Change to "cuda" if GPU is available | |
model_cache[model_id] = pipe | |
logger.info(f"Model {model_id} loaded successfully") | |
except Exception as e: | |
logger.error(f"Error loading model {model_id}: {str(e)}") | |
raise | |
return model_cache[model_id] | |
def index(): | |
return app.send_static_file('index.html') | |
def serve_assets(filename): | |
return app.send_static_file(os.path.join('assets', filename)) | |
def generate(): | |
try: | |
data = request.json | |
model_id = data.get('model', 'ssd-1b') | |
prompt = data.get('prompt', '') | |
ratio = data.get('ratio', '1:1') | |
num_images = min(int(data.get('num_images', 1)), 4) | |
# Handle guidance_scale with explicit type conversion | |
guidance_scale_raw = data.get('guidance_scale', 7.5) | |
logger.info(f"Raw guidance_scale: {guidance_scale_raw} (type: {type(guidance_scale_raw)})") | |
try: | |
guidance_scale = float(guidance_scale_raw) | |
guidance_scale = min(max(guidance_scale, 1.0), 20.0) # Clamp between 1.0 and 20.0 | |
except (ValueError, TypeError): | |
logger.error(f"Invalid guidance_scale value: {guidance_scale_raw}") | |
return jsonify({"error": "guidance_scale must be a valid number"}), 400 | |
# Log input parameters | |
logger.info(f"Generating with model: {model_id}, prompt: {prompt}, ratio: {ratio}, num_images: {num_images}, guidance_scale: {guidance_scale}") | |
if not prompt: | |
return jsonify({"error": "Prompt is required"}), 400 | |
if len(prompt) > 512: | |
return jsonify({"error": "Prompt is too long (max 512 characters)"}), 400 | |
if model_id == 'ssd-1b' and num_images > 1: | |
return jsonify({"error": "SSD-1B allows only 1 image per generation"}), 400 | |
if model_id == 'ssd-1b' and ratio != '1:1': | |
return jsonify({"error": "SSD-1B supports only 1:1 ratio"}), 400 | |
if model_id == 'sd-v1-5' and len(prompt.split()) > 77: | |
return jsonify({"error": "Prompt exceeds 77 tokens for Stable Diffusion v1.5"}), 400 | |
width, height = ratio_to_dims.get(ratio, (512, 512)) | |
if width % 8 != 0 or height % 8 != 0: | |
return jsonify({"error": "Width and height must be multiples of 8"}), 400 | |
# Log memory before generation | |
logger.info(f"Memory before generation: {psutil.virtual_memory().available / (1024 ** 3):.2f} GB") | |
pipe = load_model(model_id) | |
pipe.to(torch.device("cpu")) # Change to "cuda" if GPU is available | |
images = [] | |
num_inference_steps = 30 if model_id == 'ssd-1b' else 30 # Unified steps for stability | |
try: | |
for _ in range(num_images): | |
image = pipe( | |
prompt=prompt, | |
height=height, | |
width=width, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=guidance_scale | |
).images[0] | |
images.append(image) | |
except IndexError as e: | |
logger.error(f"IndexError during generation: {str(e)}") | |
return jsonify({"error": f"Generation failed due to invalid index access: {str(e)}"}), 500 | |
except Exception as e: | |
logger.error(f"Unexpected error during generation: {str(e)}") | |
return jsonify({"error": f"Generation failed: {str(e)}"}), 500 | |
output_dir = "outputs" | |
os.makedirs(output_dir, exist_ok=True) | |
image_urls = [] | |
for i, img in enumerate(images): | |
img_path = os.path.join(output_dir, f"generated_{int(time.time())}_{i}.png") | |
img.save(img_path) | |
with open(img_path, "rb") as f: | |
img_data = base64.b64encode(f.read()).decode('utf-8') | |
image_urls.append(f"data:image/png;base64,{img_data}") | |
os.remove(img_path) | |
logger.info(f"Generation successful, returning {len(image_urls)} images") | |
return jsonify({"images": image_urls}) | |
except Exception as e: | |
logger.error(f"Image generation failed: {str(e)}") | |
return jsonify({"error": f"Image generation failed: {str(e)}"}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860) |