import os import random import uuid import json import time import asyncio import re from threading import Thread import gradio as gr import spaces import torch import numpy as np from PIL import Image import edge_tts from transformers import ( AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, Qwen2VLForConditionalGeneration, AutoProcessor, ) from transformers.image_utils import load_image from diffusers import StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Load text-only model and tokenizer for chat generation model_id = "prithivMLmods/FastThink-0.5B-Tiny" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained( model_id, device_map="auto", torch_dtype=torch.bfloat16, ) model.eval() # TTS Voices and processor for multimodal chat TTS_VOICES = [ "en-US-JennyNeural", # @tts1 "en-US-GuyNeural", # @tts2 ] MODEL_ID_VL = "prithivMLmods/Qwen2-VL-OCR-2B-Instruct" processor = AutoProcessor.from_pretrained(MODEL_ID_VL, trust_remote_code=True) model_m = Qwen2VLForConditionalGeneration.from_pretrained( MODEL_ID_VL, trust_remote_code=True, torch_dtype=torch.float16 ).to("cuda").eval() # A helper function to convert text to speech via Edge TTS async def text_to_speech(text: str, voice: str, output_file="output.mp3"): communicate = edge_tts.Communicate(text, voice) await communicate.save(output_file) return output_file def clean_chat_history(chat_history): cleaned = [] for msg in chat_history: if isinstance(msg, dict) and isinstance(msg.get("content"), str): cleaned.append(msg) return cleaned # Restricted words check (if any) bad_words = json.loads(os.getenv('BAD_WORDS', "[]")) bad_words_negative = json.loads(os.getenv('BAD_WORDS_NEGATIVE', "[]")) default_negative = os.getenv("default_negative", "") def check_text(prompt, negative=""): for i in bad_words: if i in prompt: return True for i in bad_words_negative: if i in negative: return True return False # Use the same random seed function for both text and image generation def randomize_seed_fn(seed: int, randomize_seed: bool) -> int: if randomize_seed: seed = random.randint(0, MAX_SEED) return seed MAX_SEED = np.iinfo(np.int32).max CACHE_EXAMPLES = torch.cuda.is_available() and os.getenv("CACHE_EXAMPLES", "0") == "1" MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "2048")) USE_TORCH_COMPILE = os.getenv("USE_TORCH_COMPILE", "0") == "1" ENABLE_CPU_OFFLOAD = os.getenv("ENABLE_CPU_OFFLOAD", "0") == "1" # Set dtype based on device: use half for CUDA, float32 otherwise. dtype = torch.float16 if device.type == "cuda" else torch.float32 # Load image generation pipelines for the three model choices. if torch.cuda.is_available(): # Lightning 5 model pipe = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V5.0_Lightning", torch_dtype=dtype, use_safetensors=True, add_watermarker=False ).to(device) pipe.text_encoder = pipe.text_encoder.half() if ENABLE_CPU_OFFLOAD: pipe.enable_model_cpu_offload() else: pipe.to(device) print("Loaded RealVisXL_V5.0_Lightning on Device!") if USE_TORCH_COMPILE: pipe.unet = torch.compile(pipe.unet, mode="reduce-overhead", fullgraph=True) print("Model RealVisXL_V5.0_Lightning Compiled!") # Lightning 4 model pipe2 = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V4.0_Lightning", torch_dtype=dtype, use_safetensors=True, add_watermarker=False, ).to(device) pipe2.text_encoder = pipe2.text_encoder.half() if ENABLE_CPU_OFFLOAD: pipe2.enable_model_cpu_offload() else: pipe2.to(device) print("Loaded RealVisXL_V4.0 on Device!") if USE_TORCH_COMPILE: pipe2.unet = torch.compile(pipe2.unet, mode="reduce-overhead", fullgraph=True) print("Model RealVisXL_V4.0 Compiled!") # Turbo v3 model pipe3 = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V3.0_Turbo", torch_dtype=dtype, use_safetensors=True, add_watermarker=False, ).to(device) pipe3.text_encoder = pipe3.text_encoder.half() if ENABLE_CPU_OFFLOAD: pipe3.enable_model_cpu_offload() else: pipe3.to(device) print("Loaded Animagine XL 4.0 on Device!") if USE_TORCH_COMPILE: pipe3.unet = torch.compile(pipe3.unet, mode="reduce-overhead", fullgraph=True) print("Model Animagine XL 4.0 Compiled!") else: pipe = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V5.0_Lightning", torch_dtype=dtype, use_safetensors=True, add_watermarker=False ).to(device) pipe2 = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V4.0_Lightning", torch_dtype=dtype, use_safetensors=True, add_watermarker=False, ).to(device) pipe3 = StableDiffusionXLPipeline.from_pretrained( "SG161222/RealVisXL_V3.0_Turbo", torch_dtype=dtype, use_safetensors=True, add_watermarker=False, ).to(device) print("Running on CPU; models loaded in float32.") # Define available model choices and their mapping. DEFAULT_MODEL = "Lightning 5" MODEL_CHOICES = [DEFAULT_MODEL, "Lightning 4", "Turbo v3"] models = { "Lightning 5": pipe, "Lightning 4": pipe2, "Turbo v3": pipe3 } def generate_image_grid(prompt: str, seed: int, grid_size: str, width: int, height: int, guidance_scale: float, randomize_seed: bool, model_choice: str): if check_text(prompt, ""): raise ValueError("Prompt contains restricted words.") seed = int(randomize_seed_fn(seed, randomize_seed)) generator = torch.Generator(device=device).manual_seed(seed) # Define supported grid sizes. grid_sizes = { "2x1": (2, 1), "1x2": (1, 2), "2x2": (2, 2), "1x1": (1, 1) } grid_size_tuple = grid_sizes.get(grid_size, (1, 1)) num_images = grid_size_tuple[0] * grid_size_tuple[1] options = { "prompt": prompt, "negative_prompt": default_negative, "width": width, "height": height, "guidance_scale": guidance_scale, "num_inference_steps": 30, "generator": generator, "num_images_per_prompt": num_images, "use_resolution_binning": True, "output_type": "pil", } if device.type == "cuda": torch.cuda.empty_cache() selected_pipe = models.get(model_choice, pipe) images = selected_pipe(**options).images # Create a grid image. grid_img = Image.new('RGB', (width * grid_size_tuple[0], height * grid_size_tuple[1])) for i, img in enumerate(images[:num_images]): grid_img.paste(img, ((i % grid_size_tuple[0]) * width, (i // grid_size_tuple[0]) * height)) unique_name = str(uuid.uuid4()) + ".png" grid_img.save(unique_name) return [unique_name], seed # ----------------------------- # Main generate() Function # ----------------------------- @spaces.GPU def generate( input_dict: dict, chat_history: list[dict], max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2, ): text = input_dict["text"] files = input_dict.get("files", []) lower_text = text.lower().strip() # Check if the prompt is an image generation command using model flags. if (lower_text.startswith("@lightningv5") or lower_text.startswith("@lightningv4") or lower_text.startswith("@turbov3")): # Determine model choice based on flag. model_choice = None if "@lightningv5" in lower_text: model_choice = "Lightning 5" elif "@lightningv4" in lower_text: model_choice = "Lightning 4" elif "@turbov3" in lower_text: model_choice = "Turbo v3" # Parse grid size flag e.g. "@2x2" grid_match = re.search(r"@(\d+x\d+)", lower_text) grid_size = grid_match.group(1) if grid_match else "1x1" # Remove the model and grid flags from the prompt. prompt_clean = re.sub(r"@lightningv5", "", text, flags=re.IGNORECASE) prompt_clean = re.sub(r"@lightningv4", "", prompt_clean, flags=re.IGNORECASE) prompt_clean = re.sub(r"@turbov3", "", prompt_clean, flags=re.IGNORECASE) prompt_clean = re.sub(r"@\d+x\d+", "", prompt_clean, flags=re.IGNORECASE) prompt_clean = prompt_clean.strip().strip('"') # Default parameters for image generation. width = 1024 height = 1024 guidance_scale = 6.0 seed_val = 0 randomize_seed = True use_resolution_binning = True yield "Generating image grid..." image_paths, used_seed = generate_image_grid( prompt_clean, seed_val, grid_size, width, height, guidance_scale, randomize_seed, model_choice, ) yield gr.Image(image_paths[0]) return # Otherwise, handle text/chat (and TTS) generation. tts_prefix = "@tts" is_tts = any(text.strip().lower().startswith(f"{tts_prefix}{i}") for i in range(1, 3)) voice_index = next((i for i in range(1, 3) if text.strip().lower().startswith(f"{tts_prefix}{i}")), None) if is_tts and voice_index: voice = TTS_VOICES[voice_index - 1] text = text.replace(f"{tts_prefix}{voice_index}", "").strip() conversation = [{"role": "user", "content": text}] else: voice = None text = text.replace(tts_prefix, "").strip() conversation = clean_chat_history(chat_history) conversation.append({"role": "user", "content": text}) if files: images = [load_image(image) for image in files] if len(files) > 1 else [load_image(files[0])] messages = [{ "role": "user", "content": [ *[{"type": "image", "image": image} for image in images], {"type": "text", "text": text}, ] }] prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor(text=[prompt], images=images, return_tensors="pt", padding=True).to("cuda") streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens} thread = Thread(target=model_m.generate, kwargs=generation_kwargs) thread.start() buffer = "" yield "Thinking..." for new_text in streamer: buffer += new_text buffer = buffer.replace("<|im_end|>", "") time.sleep(0.01) yield buffer else: input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) generation_kwargs = { "input_ids": input_ids, "streamer": streamer, "max_new_tokens": max_new_tokens, "do_sample": True, "top_p": top_p, "top_k": top_k, "temperature": temperature, "num_beams": 1, "repetition_penalty": repetition_penalty, } t = Thread(target=model.generate, kwargs=generation_kwargs) t.start() outputs = [] for new_text in streamer: outputs.append(new_text) yield "".join(outputs) final_response = "".join(outputs) yield final_response if is_tts and voice: output_file = asyncio.run(text_to_speech(final_response, voice)) yield gr.Audio(output_file, autoplay=True) DESCRIPTION = """ # IMAGINEO 4K ⚡ """ css = ''' h1 { text-align: center; display: block; } #duplicate-button { margin: auto; color: #fff; background: #1565c0; border-radius: 100vh; } ''' demo = gr.ChatInterface( fn=generate, additional_inputs=[ gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS), gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6), gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9), gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50), gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2), ], examples=[ ["@tts1 Who is Nikola Tesla, and why did he die?"], ['@lightningv5 @2x2 "Chocolate dripping from a donut against a yellow background, in the style of brocore, hyper-realistic"'], ['@lightningv4 @1x1 "A serene landscape with mountains"'], ['@turbov3 @2x1 "Abstract art, colorful and vibrant"'], ["Write a Python function to check if a number is prime."], ["@tts2 What causes rainbows to form?"], ], cache_examples=False, type="messages", description=DESCRIPTION, css=css, fill_height=True, textbox=gr.MultimodalTextbox(label="Query Input", file_types=["image"], file_count="multiple"), stop_btn="Stop Generation", multimodal=True, ) if __name__ == "__main__": demo.queue(max_size=20).launch(share=True)