import gradio as gr
from gradio_client import Client, handle_file
from google import genai
import os
from typing import Optional, List, Tuple, Union
from huggingface_hub import whoami
from PIL import Image
from io import BytesIO
import tempfile
import ffmpeg
# --- Google Gemini API Configuration ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY environment variable not set.")
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
GEMINI_MODEL_NAME = 'gemini-2.5-flash-image-preview'
def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool:
"""Verifies if the user is a Hugging Face PRO user or part of an enterprise org."""
if not token:
return False
if isinstance(token, gr.OAuthToken):
token_str = token.token
elif isinstance(token, str):
token_str = token
else:
return False
try:
user_info = whoami(token=token_str)
return (
user_info.get("isPro", False) or
any(org.get("isEnterprise", False) for org in user_info.get("orgs", []))
)
except Exception as e:
print(f"Could not verify user's PRO/Enterprise status: {e}")
return False
def _extract_image_data_from_response(response) -> Optional[bytes]:
"""Helper to extract image data from the model's response."""
if hasattr(response, 'candidates') and response.candidates:
for part in response.candidates[0].content.parts:
if hasattr(part, 'inline_data') and hasattr(part.inline_data, 'data'):
return part.inline_data.data
return None
def _get_video_info(video_path: str) -> Tuple[float, Tuple[int, int]]:
"""Instantly gets the framerate and (width, height) of a video using ffprobe."""
probe = ffmpeg.probe(video_path)
video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None)
if not video_stream:
raise ValueError("No video stream found in the file.")
framerate = eval(video_stream['avg_frame_rate'])
resolution = (int(video_stream['width']), int(video_stream['height']))
return framerate, resolution
def _resize_image(image_path: str, target_size: Tuple[int, int]) -> str:
"""Resizes an image to a target size and saves it to a new temp file."""
with Image.open(image_path) as img:
if img.size == target_size:
return image_path
resized_img = img.resize(target_size, Image.Resampling.LANCZOS)
suffix = os.path.splitext(image_path)[1] or ".png"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
resized_img.save(tmp_file.name)
return tmp_file.name
def _trim_first_frame_fast(video_path: str) -> str:
"""Removes exactly the first frame of a video without re-encoding."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
output_path = tmp_output_file.name
try:
framerate, _ = _get_video_info(video_path)
if framerate == 0: raise ValueError("Framerate cannot be zero.")
start_time = 1 / framerate
(
ffmpeg
.input(video_path, ss=start_time)
.output(output_path, c='copy', avoid_negative_ts='make_zero')
.run(overwrite_output=True, quiet=True)
)
return output_path
except Exception as e:
raise RuntimeError(f"FFmpeg trim error: {e}")
def _combine_videos_simple(video1_path: str, video2_path: str) -> str:
"""Combines two videos using the fast concat demuxer."""
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file:
tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n")
tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n")
list_file_path = tmp_list_file.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file:
output_path = tmp_output_file.name
try:
(
ffmpeg
.input(list_file_path, format='concat', safe=0)
.output(output_path, c='copy')
.run(overwrite_output=True, quiet=True)
)
return output_path
except ffmpeg.Error as e:
raise RuntimeError(f"FFmpeg combine error: {e.stderr.decode()}")
finally:
if os.path.exists(list_file_path):
os.remove(list_file_path)
def _generate_video_segment(input_image_path: str, output_image_path: str, prompt: str, token: str) -> str:
"""Generates a single video segment using the external service."""
video_client = Client("multimodalart/wan-2-2-first-last-frame", hf_token=token)
result = video_client.predict(
start_image_pil=handle_file(input_image_path),
end_image_pil=handle_file(output_image_path),
prompt=prompt, api_name="/generate_video"
)
return result[0]["video"]
def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)): raise gr.Error("Access Denied.")
try:
contents = [Image.open(image_path[0]) for image_path in images] if images else []
contents.append(prompt)
response = client.models.generate_content(model=GEMINI_MODEL_NAME, contents=contents)
image_data = _extract_image_data_from_response(response)
if not image_data: raise gr.Error("No image data in response")
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
Image.open(BytesIO(image_data)).save(tmp.name)
output_path = tmp.name
can_create_video = bool(images and len(images) == 1)
can_extend_video = False
if can_create_video and previous_video_path and last_frame_path:
# The crucial check for continuity
if images[0][0] == last_frame_path:
can_extend_video = True
return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False))
except Exception as e:
raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again")
def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
try:
new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token)
return new_segment_path, new_segment_path, output_image
except Exception as e:
raise gr.Error(f"Video creation failed: {e}")
def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple:
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.")
if not previous_video_path: raise gr.Error("No previous video to extend.")
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.")
try:
_, target_resolution = _get_video_info(previous_video_path)
resized_input_path = _resize_image(input_image_gallery[0][0], target_resolution)
resized_output_path = _resize_image(output_image, target_resolution)
new_segment_path = _generate_video_segment(resized_input_path, resized_output_path, prompt_input, oauth_token.token)
trimmed_segment_path = _trim_first_frame_fast(new_segment_path)
final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path)
return final_video_path, final_video_path, output_image
except Exception as e:
raise gr.Error(f"Video extension failed: {e}")
css = '''
#sub_title{margin-top: -35px !important}
.tab-wrapper{margin-bottom: -33px !important}
.tabitem{padding: 0px !important}
.fillable{max-width: 980px !important}
.dark .progress-text {color: white}
.logo-dark{display: none}
.dark .logo-dark{display: block !important}
.dark .logo-light{display: none}
.grid-container img{object-fit: contain}
.grid-container {display: grid;grid-template-columns: repeat(2, 1fr)}
.grid-container:has(> .gallery-item:only-child) {grid-template-columns: 1fr}
#wan_ad p{text-align: center;padding: .5em}
'''
with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo:
gr.HTML('''
''')
gr.HTML("