Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import gradio as gr | |
from gradio_client import Client, handle_file | |
from google import genai | |
import os | |
from typing import Optional, List, Tuple, Union | |
from huggingface_hub import whoami | |
from PIL import Image | |
from io import BytesIO | |
import tempfile | |
import ffmpeg | |
# --- Google Gemini API Configuration --- | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") | |
if not GOOGLE_API_KEY: | |
raise ValueError("GOOGLE_API_KEY environment variable not set.") | |
client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY")) | |
GEMINI_MODEL_NAME = 'gemini-2.5-flash-image-preview' | |
def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: | |
"""Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" | |
if not token: | |
return False | |
if isinstance(token, gr.OAuthToken): | |
token_str = token.token | |
elif isinstance(token, str): | |
token_str = token | |
else: | |
return False | |
try: | |
user_info = whoami(token=token_str) | |
return ( | |
user_info.get("isPro", False) or | |
any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) | |
) | |
except Exception as e: | |
print(f"Could not verify user's PRO/Enterprise status: {e}") | |
return False | |
def _extract_image_data_from_response(response) -> Optional[bytes]: | |
"""Helper to extract image data from the model's response.""" | |
if hasattr(response, 'candidates') and response.candidates: | |
for part in response.candidates[0].content.parts: | |
if hasattr(part, 'inline_data') and hasattr(part.inline_data, 'data'): | |
return part.inline_data.data | |
return None | |
def _get_video_info(video_path: str) -> Tuple[float, Tuple[int, int]]: | |
"""Instantly gets the framerate and (width, height) of a video using ffprobe.""" | |
probe = ffmpeg.probe(video_path) | |
video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None) | |
if not video_stream: | |
raise ValueError("No video stream found in the file.") | |
framerate = eval(video_stream['avg_frame_rate']) | |
resolution = (int(video_stream['width']), int(video_stream['height'])) | |
return framerate, resolution | |
def _resize_image(image_path: str, target_size: Tuple[int, int]) -> str: | |
"""Resizes an image to a target size and saves it to a new temp file.""" | |
with Image.open(image_path) as img: | |
if img.size == target_size: | |
return image_path | |
resized_img = img.resize(target_size, Image.Resampling.LANCZOS) | |
suffix = os.path.splitext(image_path)[1] or ".png" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
resized_img.save(tmp_file.name) | |
return tmp_file.name | |
def _trim_first_frame_fast(video_path: str) -> str: | |
"""Removes exactly the first frame of a video without re-encoding.""" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: | |
output_path = tmp_output_file.name | |
try: | |
framerate, _ = _get_video_info(video_path) | |
if framerate == 0: raise ValueError("Framerate cannot be zero.") | |
start_time = 1 / framerate | |
( | |
ffmpeg | |
.input(video_path, ss=start_time) | |
.output(output_path, c='copy', avoid_negative_ts='make_zero') | |
.run(overwrite_output=True, quiet=True) | |
) | |
return output_path | |
except Exception as e: | |
raise RuntimeError(f"FFmpeg trim error: {e}") | |
def _combine_videos_simple(video1_path: str, video2_path: str) -> str: | |
"""Combines two videos using the fast concat demuxer.""" | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file: | |
tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n") | |
tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n") | |
list_file_path = tmp_list_file.name | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: | |
output_path = tmp_output_file.name | |
try: | |
( | |
ffmpeg | |
.input(list_file_path, format='concat', safe=0) | |
.output(output_path, c='copy') | |
.run(overwrite_output=True, quiet=True) | |
) | |
return output_path | |
except ffmpeg.Error as e: | |
raise RuntimeError(f"FFmpeg combine error: {e.stderr.decode()}") | |
finally: | |
if os.path.exists(list_file_path): | |
os.remove(list_file_path) | |
def _generate_video_segment(input_image_path: str, output_image_path: str, prompt: str, token: str) -> str: | |
"""Generates a single video segment using the external service.""" | |
video_client = Client("multimodalart/wan-2-2-first-last-frame", hf_token=token) | |
result = video_client.predict( | |
start_image_pil=handle_file(input_image_path), | |
end_image_pil=handle_file(output_image_path), | |
prompt=prompt, api_name="/generate_video" | |
) | |
return result[0]["video"] | |
def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], last_frame_path: Optional[str], manual_token: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
if not (verify_pro_status(oauth_token) or verify_pro_status(manual_token)): raise gr.Error("Access Denied.") | |
try: | |
contents = [Image.open(image_path[0]) for image_path in images] if images else [] | |
contents.append(prompt) | |
response = client.models.generate_content(model=GEMINI_MODEL_NAME, contents=contents) | |
image_data = _extract_image_data_from_response(response) | |
if not image_data: raise gr.Error("No image data in response") | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: | |
Image.open(BytesIO(image_data)).save(tmp.name) | |
output_path = tmp.name | |
can_create_video = bool(images and len(images) == 1) | |
can_extend_video = False | |
if can_create_video and previous_video_path and last_frame_path: | |
# The crucial check for continuity | |
if images[0][0] == last_frame_path: | |
can_extend_video = True | |
return (output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False)) | |
except Exception as e: | |
raise gr.Error(f"Image generation failed: {e}. Rephrase your prompt to make image generation explicit and try again") | |
def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") | |
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") | |
try: | |
new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token) | |
return new_segment_path, new_segment_path, output_image | |
except Exception as e: | |
raise gr.Error(f"Video creation failed: {e}") | |
def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: | |
if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") | |
if not previous_video_path: raise gr.Error("No previous video to extend.") | |
if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") | |
try: | |
_, target_resolution = _get_video_info(previous_video_path) | |
resized_input_path = _resize_image(input_image_gallery[0][0], target_resolution) | |
resized_output_path = _resize_image(output_image, target_resolution) | |
new_segment_path = _generate_video_segment(resized_input_path, resized_output_path, prompt_input, oauth_token.token) | |
trimmed_segment_path = _trim_first_frame_fast(new_segment_path) | |
final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path) | |
return final_video_path, final_video_path, output_image | |
except Exception as e: | |
raise gr.Error(f"Video extension failed: {e}") | |
css = ''' | |
#sub_title{margin-top: -35px !important} | |
.tab-wrapper{margin-bottom: -33px !important} | |
.tabitem{padding: 0px !important} | |
.fillable{max-width: 980px !important} | |
.dark .progress-text {color: white} | |
.logo-dark{display: none} | |
.dark .logo-dark{display: block !important} | |
.dark .logo-light{display: none} | |
.grid-container img{object-fit: contain} | |
.grid-container {display: grid;grid-template-columns: repeat(2, 1fr)} | |
.grid-container:has(> .gallery-item:only-child) {grid-template-columns: 1fr} | |
#wan_ad p{text-align: center;padding: .5em} | |
''' | |
with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo: | |
gr.HTML(''' | |
<img class="logo-dark" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros.png' style='margin: 0 auto; max-width: 650px' /> | |
<img class="logo-light" src='https://huggingface.co/spaces/multimodalart/nano-banana/resolve/main/nano_banana_pros_light.png' style='margin: 0 auto; max-width: 650px' /> | |
''') | |
gr.HTML("<h3 style='text-align:center'>Hugging Face PRO users can use Google's Nano Banana (Gemini 2.5 Flash Image Preview) on this Space. <a href='http://huggingface.co/subscribe/pro?source=nana_banana' target='_blank'>Subscribe to PRO</a></h3>", elem_id="sub_title") | |
pro_message = gr.Markdown(visible=False) | |
main_interface = gr.Column(visible=False) | |
previous_video_state = gr.State(None) | |
last_frame_of_video_state = gr.State(None) | |
with main_interface: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
image_input_gallery = gr.Gallery(label="Upload one or more images here. Leave empty for text-to-image", file_types=["image"], height="auto") | |
prompt_input = gr.Textbox(label="Prompt", placeholder="Turns this photo into a masterpiece") | |
generate_button = gr.Button("Generate", variant="primary") | |
with gr.Column(scale=1): | |
output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath") | |
use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary") | |
with gr.Row(): | |
create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False) | |
extend_video_button = gr.Button("Extend existing video with new scene 🎞️", variant="secondary", visible=False) | |
with gr.Group(visible=False) as video_group: | |
video_output = gr.Video(label="Generated Video", show_download_button=True, autoplay=True) | |
gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad") | |
manual_token = gr.Textbox("Manual Token (to use with the API)", visible=False) | |
gr.Markdown("<h2 style='text-align: center'>Thank you for being a PRO! 🤗</h2>") | |
login_button = gr.LoginButton() | |
gr.on( | |
triggers=[generate_button.click, prompt_input.submit], | |
fn=unified_image_generator, | |
inputs=[prompt_input, image_input_gallery, previous_video_state, last_frame_of_video_state, manual_token], | |
outputs=[output_image, create_video_button, extend_video_button, video_group] | |
) | |
use_image_button.click( | |
fn=lambda img: ( | |
[img] if img else None, None, gr.update(visible=False), | |
gr.update(visible=False), gr.update(visible=False) | |
), | |
inputs=[output_image], | |
outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group] | |
) | |
create_video_button.click( | |
fn=lambda: gr.update(visible=True), outputs=[video_group] | |
).then( | |
fn=create_new_video, | |
inputs=[image_input_gallery, prompt_input, output_image], | |
outputs=[video_output, previous_video_state, last_frame_of_video_state], | |
) | |
extend_video_button.click( | |
fn=lambda: gr.update(visible=True), outputs=[video_group] | |
).then( | |
fn=extend_existing_video, | |
inputs=[image_input_gallery, prompt_input, output_image, previous_video_state], | |
outputs=[video_output, previous_video_state, last_frame_of_video_state], | |
) | |
def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): | |
if not profile: return gr.update(visible=False), gr.update(visible=False) | |
if verify_pro_status(oauth_token): return gr.update(visible=True), gr.update(visible=False) | |
else: | |
message = ( | |
"## ✨ Exclusive Access for PRO Users\n\n" | |
"Thank you for your interest! This app is available exclusively for our Hugging Face **PRO** members.\n\n" | |
"To unlock this and many other cool stuff, please consider upgrading your account.\n\n" | |
"### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)" | |
) | |
return gr.update(visible=False), gr.update(visible=True, value=message) | |
demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) | |
if __name__ == "__main__": | |
demo.queue(max_size=None, default_concurrency_limit=None).launch(show_error=True) |