import argparse import datetime import json import os import time import hashlib import uuid import traceback import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logging.getLogger("http").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) import spaces import gradio as gr from conversation_public import default_conversation, conv_templates, SeparatorStyle auth_token = os.environ.get("TOKEN_FROM_SECRET") ########################################## # Audio part ########################################## from huggingface_hub import snapshot_download snapshot_download(repo_id="Emova-ollm/emova_speech_tokenizer", local_dir='./emova_speech_tokenizer', token=auth_token) from emova_speech_tokenizer.emova_speech_tokenizer.speech_utils import get_S2U_ckpt_config_path, load_S2U_model, s2u_extract_unit_demo from emova_speech_tokenizer.emova_speech_tokenizer.speech_utils import load_condition_centroid, get_U2S_config_checkpoint_file, load_U2S_model, synthesis #################### # S2U #################### reduced=True reduced_mark = 'reduced' if reduced else 'unreduced' unit_type = '40ms_multilingual_8888' language = 'English' s2u_model_name = 'SPIRAL-FSQ-CTC' ckpt_path, config_path = get_S2U_ckpt_config_path(unit_type, language) s2u_model = load_S2U_model(ckpt_path, config_path, s2u_model_name).cuda() #################### # U2S #################### condition2style_centroid_file = "./speech_tokenization/condition_style_centroid/condition2style_centroid.txt" condition2style_centroid_file_dict, condition2style_centroid_embedding_dict = load_condition_centroid(condition2style_centroid_file) unit_type = '40ms_multilingual_8888_xujing_cosyvoice_FT' language = 'Chinese' model_config_file, model_checkpoint_file = get_U2S_config_checkpoint_file(unit_type, language) net_g, hps = load_U2S_model(model_config_file, model_checkpoint_file, unit_type) net_g = net_g.cuda() #################### # task format #################### asr_format = "Please recognize the text corresponding to the follwing speech.\n" tts_format = "Please synthesize the speech corresponding to the follwing text.\n" chat_format = r'Please recognize the texts, emotion and pitch from the user question speech units and provide the texts, emotion, pitch and speech units for the assistant response. \nEmotion should be chosen from ["neutral", "happy", "sad", "angry", "surprised", "disgusted", "fearful"]. \nPitch should be chosen from ["low", "normal", "high"].\nYour output should be in json format.\nAn output example is:\n{"user question text": "", "user question emotion": "", "user question pitch": "", "assistant response text": "", "assistant response emotion": "", "assistant response pitch": ""īŧŒ"assistant response speech": ""}\n\nuser question speech:' @spaces.GPU(duration=10) def s2u_asr(text, audio_file): return asr_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced) @spaces.GPU(duration=10) def s2u_chat(text, audio_file): return chat_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced) def u2s_tts(text, audio_file): return tts_format + text mode2func = dict( asr=s2u_asr, chat=s2u_chat, tts=u2s_tts, ) ########################################## # LLM part # TODO: 1) change model 2) change arguments ########################################## import torch from transformers import AutoModel, AutoProcessor, TextIteratorStreamer from threading import Thread model_name = "Emova-ollm/emova_llama3_1-8b" model = AutoModel.from_pretrained( model_name, torch_dtype=torch.bfloat16, use_flash_attn=True, low_cpu_mem_usage=True, trust_remote_code=True, token=auth_token).eval().cuda() processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True, token=auth_token) streamer = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True, timeout=15) def stream_response(model, inputs, streamer, prompt, gen_kwargs): thread = Thread(target=model.generate, kwargs=dict( streamer=streamer, **inputs, **gen_kwargs )) thread.start() generated_text = prompt for new_text in streamer: generated_text += new_text yield generated_text ########################################## # Gradio part ########################################## no_change_btn = gr.Button() enable_btn = gr.Button(interactive=True) disable_btn = gr.Button(interactive=False) server_error_msg = "**NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.**" server_oom_msg = "**OUT OF GPU MEMORY DETECTED. PLEASE DECREASE THE MAX OUTPUT TOKENS AND REGENERATE.**" def load_demo_refresh_model_list():"load_demo.") state = default_conversation.copy() return state def regenerate(state, image_process_mode):"regenerate.") state.messages[-1][-1] = None prev_human_msg = state.messages[-2] if type(prev_human_msg[1]) in (tuple, list): prev_human_msg[1] = (*prev_human_msg[1][:2], image_process_mode, *prev_human_msg[1][3:]) state.skip_next = False return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2 def clear_history():"clear_history.") state = default_conversation.copy() return (state, state.to_gradio_chatbot_public(), "", None) + (disable_btn,) * 2 + (None,) ############ # Show prompt in the chatbot # Input: [state, textbox, imagebox, image_process_mode, audio_input, audio_mode] # Return: [state, chatbot, textbox, imagebox, audio_input] + btn_list ############ def add_text(state, text, image, image_process_mode, audio_input, audio_mode): ############ # Input legality checking ############"add_text. len: {len(text)}") if len(text) <= 0 and image is None and audio_input is None: state.skip_next = True return (state, state.to_gradio_chatbot_public(), "", None, None) + (no_change_btn,) * 2 ############ # Re-initialize if having conducted audio conversations ############ for i, (role, msg) in enumerate(state.messages[state.offset:]): if isinstance(msg, tuple) and msg[-1] is not None: state = default_conversation.copy() break ############ # Deal with image inputs ############ if image is not None: if '' not in text: text = text + '\n' text = (text, image, image_process_mode, None) state = default_conversation.copy() ############ # Deal with audio inputs ############ if audio_input is not None or audio_mode == 'tts': if isinstance(text, tuple): if audio_mode == 'chat': prompt = mode2func[audio_mode](text[0][:-len("\n")], audio_input) text = (prompt + "\n", text[1], text[2], audio_input) elif audio_mode == 'tts': prompt = mode2func[audio_mode](text[0][:-len("\n")], audio_input) text = (prompt, None, None, None) else: prompt = mode2func[audio_mode](text, audio_input) text = (prompt, None, None, audio_input) else: prompt = mode2func[audio_mode](text, audio_input) text = (prompt, None, None, audio_input) state = default_conversation.copy() state.append_message(state.roles[0], text) state.append_message(state.roles[1], None) state.skip_next = False return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2 ############ # Get response # Input: [state, temperature, top_p, max_output_tokens, speaker] # Return: [state, chatbot] + btn_list ############ @spaces.GPU def http_bot(state, temperature, top_p, max_new_tokens, speaker):"http_bot.") if state.skip_next: yield (state, state.to_gradio_chatbot_public()) + (no_change_btn,) * 2 return if len(state.messages) == state.offset + 2: # First round of conversation if "llama3" in model_name.lower(): template_name = 'llama3_demo' elif "qwen2" in model_name.lower(): template_name = 'qwen2_demo' else: template_name = "default" new_state = conv_templates[template_name].copy() new_state.append_message(new_state.roles[0], state.messages[-2][1]) new_state.append_message(new_state.roles[1], None) state = new_state # Construct prompt prompt = state.get_prompt() all_images = state.get_images(return_pil=True) # Make requests pload = { "model": model_name, "prompt": prompt, "temperature": float(temperature), "top_p": float(top_p), "max_new_tokens": int(max_new_tokens), "stop": state.sep if state.sep_style in [SeparatorStyle.SINGLE, SeparatorStyle.MPT] else state.sep2, "images": f'List of {len(state.get_images())} images: {all_images}', }"==== request ====\n{pload}") # Process inputs inputs = processor(text=[prompt], images=all_images if len(all_images) > 0 else None, return_tensors="pt") if len(all_images) > 0: inputs['pixel_values'] = inputs['pixel_values'].to(model.dtype) # TODO # Process hyperparameters temperature = float(pload.get("temperature", 1.0)) top_p = float(pload.get("top_p", 1.0)) stop_str = pload.get("stop", None) do_sample = True if temperature > 0.001 else False max_context_length = getattr(model.config, 'max_position_embeddings', 2048) max_new_tokens = int(pload.get("max_new_tokens", 256)) max_new_tokens = min(max_new_tokens, max_context_length - inputs['input_ids'].shape[1]) gen_kwargs = dict( do_sample=do_sample, temperature=temperature, top_p=top_p, max_new_tokens=max_new_tokens, use_cache=True, ) if max_new_tokens < 1: state.messages[-1][-1] = "Exceeds max token length. Please start a new conversation, thanks." yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 return state.messages[-1][-1] = "▌" yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 # Stream output try: for generated_text in stream_response(model, inputs, streamer, prompt, gen_kwargs): output = generated_text[len(prompt):].strip() if tts_format not in prompt and chat_format not in prompt: state.messages[-1][-1] = output + "▌" else: state.messages[-1][-1] = "▌" # state.messages[-1][-1] = "[😁 GENERATING AUDIO {}%...]".format(round(output.count("<|speech_") / max_new_tokens * 100, 1)) + "\n" + output + "▌" yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 except Exception as e: os.system("nvidia-smi") state.messages[-1][-1] = server_error_msg yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 return ################ # decode output to audio ################ temp_file = None if tts_format in prompt or chat_format in prompt: try: try: if output.startswith("{"): if output.endswith("|>"): output += "\"}" elif output.endswith("\""): output += "}" info_dict = json.loads(output) content_unit = info_dict['assistant response speech'].replace('<|speech_', '').replace('|>', ' ').strip() emotion = info_dict['assistant response emotion'] if hasattr(info_dict, 'assistant response emotion') else "neutral" speed = info_dict['assistant response speed'] if hasattr(info_dict, 'assistant response speed') else "normal" pitch = info_dict['assistant response pitch'] if hasattr(info_dict, 'assistant response pitch') else "normal" gender = speaker.lower() if speaker else 'female' except: content_unit = output.replace('<|speech_', '').replace('|>', ' ').strip() emotion = 'neutral' speed = "normal" pitch = "normal" gender = speaker.lower() if speaker else 'female' condition = f'gender-{gender}_emotion-{emotion}_speed-{speed}_pitch-{pitch}' style_centroid_file = condition2style_centroid_file_dict[condition] style_centroid_embedding = condition2style_centroid_embedding_dict[condition].cuda() id = str(uuid.uuid4()) os.makedirs("./demo_audio", exist_ok=True) synthesis(content_unit, style_centroid_embedding, hps, net_g, f"./demo_audio/{id}_temp_audio.wav") temp_file = f"./demo_audio/{id}_temp_audio.wav" except Exception as e: os.system("nvidia-smi") state.messages[-1][-1] = state.messages[-1][-1][:-1] if tts_format in prompt or chat_format in prompt: if temp_file is not None: state.messages[-1][-1] = (output, temp_file) yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 else: state.messages[-1][-1] = server_oom_msg yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 else: yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 if temp_file is not None: os.system("rm {}".format(temp_file))"{output}") ############ # Layout Markdown ############ title_markdown = ("""

EMOVA: Empowering Language Models to See, Hear and Speak with Vivid Emotions

📃 Paper | đŸ’ģ Code | 🤗 HuggingFace | 🌐 Website

1. To chat with EMOVA, upload images, enter texts or record audios and then do not forget to Click đŸ’Ŧ Chat Button ^v^!
2. Heighten the Max output tokens if necessary to talk longer with EMOVA.

""") tos_markdown = (""" ## Terms of use By using this service, users are required to agree to the following terms: The service is a research preview intended for non-commercial use only. It only provides limited safety measures and may generate offensive content. It must not be used for any illegal, harmful, violent, racist, or sexual purposes. The service may collect user dialogue data for future research. For an optimal experience, please use desktop computers for this demo, as mobile devices may compromise its quality. """) learn_more_markdown = (""" ## License The service is a research preview intended for non-commercial use only, subject to the model [License]( of Qwen and [Privacy Practices]( of ShareGPT. Please contact us if you find any potential violation. ## Acknowledgement The service is built upon [LLaVA]( We thanks the authors for open-sourcing the wonderful code. ## Citation
  title={Emova: Empowering language models to see, hear and speak with vivid emotions},
  author={Chen, Kai and Gou, Yunhao and Huang, Runhui and Liu, Zhili and Tan, Daxin and Xu, Jing and Wang, Chunwei and Zhu, Yi and Zeng, Yihan and Yang, Kuo and others},
  journal={arXiv preprint arXiv:2409.18042},
""") block_css = """ #buttons button { min-width: min(120px,100%); } .message-row img { margin: 0px !important; } .avatar-container img { padding: 0px !important; } """ ############ # Layout Demo ############ def build_demo(embed_mode): textbox = gr.Textbox(label="Text", show_label=False, placeholder="Enter text or record audio in the right and then click đŸ’Ŧ Chat to talk with me ^v^", container=False, scale=6) audio_input = gr.Audio(label="Audio", sources=["microphone", "upload"], type="filepath", max_length=10, show_download_button=True, waveform_options=dict(sample_rate=16000), scale=2) with gr.Blocks(title="EMOVA", theme=gr.themes.Default(), css=block_css) as demo: state = gr.State() if not embed_mode: gr.HTML(title_markdown) ############## # Chatbot ############## with gr.Row(equal_height=True): with gr.Column(scale=1): imagebox = gr.Image(type="pil", label="Image") image_process_mode = gr.Radio( ["Crop", "Resize", "Pad", "Default"], value="Default", label="Preprocess for non-square image", visible=False) ############## # Parameters ############## with gr.Accordion("Parameters", open=True) as parameter_row: temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, step=0.1, interactive=True, label="Temperature") top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, step=0.1, interactive=True, label="Top P") max_output_tokens = gr.Slider(minimum=0, maximum=4096, value=2048, step=32, interactive=True, label="Max output tokens") speaker = gr.Radio(["Female", "Male"], value="Female", label="Speaker") with gr.Column(scale=8): chatbot = gr.Chatbot( elem_id="chatbot", label="EMOVA Chatbot", layout="bubble", avatar_images=["examples/user_avator.png", "examples/icon_256.png"] ) with gr.Row(equal_height=True): textbox.render() audio_input.render() with gr.Row(elem_id="buttons") as button_row: submit_btn = gr.Button(value="đŸ’Ŧ Chat", variant="primary") #stop_btn = gr.Button(value="⏚ī¸ Stop Generation", interactive=False) regenerate_btn = gr.Button(value="🔄 Regenerate", interactive=False) clear_btn = gr.Button(value="🗑ī¸ Clear", interactive=False) ############## # Examples ############## with gr.Row(): with gr.Column(scale=9): gr.Examples(examples=[ ["./examples/emo-speech/what_is_your_name.wav"], ["./examples/emo-speech/I_am_so_sad.wav"], ["./examples/emo-speech/parent.wav"], ["./examples/emo-speech/wedding(CH).wav"], ], inputs=[audio_input], label='Audio Examples (Click to load the examples~)') with gr.Row(equal_height=True): gr.Examples(examples=[ ["./examples/image-text/example_1.png", "Why is this image funny?"], ["./examples/image-text/example_2.png", "First please perform reasoning, and think step by step to provide best answer to the following question:\n\nWhat is the original price for pork belly before discount?"], ["./examples/image-text/example_3.png", "Convert this table to markdown format."], ], inputs=[imagebox, textbox], label='Image Examples') gr.Examples(examples=[ ["./examples/emo-speech/write_a_poem.jfif", "./examples/emo-speech/write_a_poem.wav"], ["./examples/emo-speech/I_am_happy_get_my_offer.webp", "./examples/emo-speech/I_am_happy_get_my_offer.wav"], ["./examples/structure-speech/names_of_main_actors.jpg", "./examples/structure-speech/names_of_main_actors.wav"], ], inputs=[imagebox, audio_input], label='Omni Examples 1') gr.Examples(examples=[ ["./examples/structure-speech/how_to_save_water.png", "./examples/structure-speech/how_to_save_water.wav"], ["./examples/structure-speech/internet_coverage.png", "./examples/structure-speech/internet_coverage.wav"], ["./examples/structure-speech/how_to_use_website.PNG", "./examples/structure-speech/how_to_use_website.wav"], ], inputs=[imagebox, audio_input], label='Omni Examples 2') if not embed_mode: gr.Markdown(tos_markdown) gr.Markdown(learn_more_markdown) # Register listeners btn_list = [regenerate_btn, clear_btn] regenerate, [state, image_process_mode], [state, chatbot, textbox, imagebox, audio_input] + btn_list ).then( http_bot, [state, temperature, top_p, max_output_tokens, speaker], [state, chatbot] + btn_list, ) clear_history, None, [state, chatbot, textbox, imagebox] + btn_list + [audio_input], queue=False ) # probably mean press enter textbox.submit( add_text, [state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)], [state, chatbot, textbox, imagebox, audio_input] + btn_list, queue=False ).then( http_bot, [state, temperature, top_p, max_output_tokens, speaker], [state, chatbot] + btn_list, ) add_text, [state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)], [state, chatbot, textbox, imagebox, audio_input] + btn_list ).then( http_bot, [state, temperature, top_p, max_output_tokens, speaker], [state, chatbot] + btn_list, ) ############## # Demo loading ############## demo.load( load_demo_refresh_model_list, None, [state], queue=False ) return demo parser = argparse.ArgumentParser() parser.add_argument("--share", action="store_true") parser.add_argument("--embed", action="store_true") args = parser.parse_args() demo = build_demo(args.embed) demo.queue( max_size=10, api_open=False ).launch( favicon_path="./examples/icon_256.png", allowed_paths=["/"], share=args.share )