Spaces:
Runtime error
Runtime error
import argparse | |
import datetime | |
import json | |
import os | |
import time | |
import hashlib | |
import uuid | |
import traceback | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') | |
import spaces | |
import gradio as gr | |
from conversation_public import default_conversation, conv_templates, SeparatorStyle | |
auth_token = os.environ.get("TOKEN_FROM_SECRET") | |
########################################## | |
# Audio part | |
########################################## | |
from huggingface_hub import snapshot_download | |
snapshot_download(repo_id="Emova-ollm/emova_speech_tokenizer", local_dir='./speech', token=auth_token) | |
from speech.speech_utils import s2u_extract_unit_demo, get_ckpt_config_path, load_model | |
from speech.speech_utils import load_condition_centroid, get_config_checkpoint_file, load_U2S_model, synthesis | |
#################### | |
# S2U | |
#################### | |
reduced=True | |
reduced_mark = 'reduced' if reduced else 'unreduced' | |
unit_type = '40ms_multilingual_8888' | |
language = 'English' | |
s2u_model_name = 'SPIRAL-FSQ-CTC' | |
ckpt_path, config_path = get_ckpt_config_path(unit_type, language) | |
s2u_model = load_model(ckpt_path, config_path, s2u_model_name) | |
#################### | |
# U2S | |
#################### | |
condition2style_centroid_file = "./speech/condition_style_centroid/condition2style_centroid.txt" | |
condition2style_centroid_file_dict, condition2style_centroid_embedding_dict = load_condition_centroid(condition2style_centroid_file) | |
unit_type = '40ms_multilingual_8888_xujing_cosyvoice_FT' | |
language = 'Chinese' | |
model_config_file, model_checkpoint_file = get_config_checkpoint_file(unit_type, language) | |
net_g, hps = load_U2S_model(model_config_file, model_checkpoint_file, unit_type) | |
#################### | |
# task format | |
#################### | |
asr_format = "Please recognize the text corresponding to the follwing speech.\n" | |
tts_format = "Please synthesize the speech corresponding to the follwing text.\n" | |
chat_format = r'Please recognize the texts, emotion and pitch from the user question speech units and provide the texts, emotion, pitch and speech units for the assistant response. \nEmotion should be chosen from ["neutral", "happy", "sad", "angry", "surprised", "disgusted", "fearful"]. \nPitch should be chosen from ["low", "normal", "high"].\nYour output should be in json format.\nAn output example is:\n{"user question text": "", "user question emotion": "", "user question pitch": "", "assistant response text": "", "assistant response emotion": "", "assistant response pitch": "","assistant response speech": ""}\n\nuser question speech:' | |
def s2u_asr(text, audio_file): | |
return asr_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced) | |
def s2u_chat(text, audio_file): | |
return chat_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced) | |
def u2s_tts(text, audio_file): | |
return tts_format + text | |
mode2func = dict( | |
asr=s2u_asr, | |
chat=s2u_chat, | |
tts=u2s_tts, | |
) | |
########################################## | |
# LLM part | |
########################################## | |
import torch | |
from transformers import AutoModel, AutoProcessor, TextIteratorStreamer | |
from threading import Thread | |
model_name = "Emova-ollm/emova_llama3_1-8b" | |
model = AutoModel.from_pretrained( | |
model_name, | |
torch_dtype=torch.bfloat16, | |
use_flash_attn=True, | |
low_cpu_mem_usage=True, | |
trust_remote_code=True, | |
token=auth_token).eval().cuda() | |
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True, token=auth_token) | |
streamer = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True, timeout=15) | |
def stream_response(model, inputs, streamer, prompt, gen_kwargs): | |
thread = Thread(target=model.generate, kwargs=dict( | |
streamer=streamer, | |
**inputs, | |
**gen_kwargs | |
)) | |
thread.start() | |
generated_text = prompt | |
for new_text in streamer: | |
generated_text += new_text | |
yield generated_text | |
########################################## | |
# Gradio part | |
########################################## | |
no_change_btn = gr.Button() | |
enable_btn = gr.Button(interactive=True) | |
disable_btn = gr.Button(interactive=False) | |
server_error_msg = "**NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.**" | |
def load_demo_refresh_model_list(): | |
logging.info(f"load_demo.") | |
state = default_conversation.copy() | |
return state | |
def regenerate(state, image_process_mode): | |
logging.info(f"regenerate.") | |
state.messages[-1][-1] = None | |
prev_human_msg = state.messages[-2] | |
if type(prev_human_msg[1]) in (tuple, list): | |
prev_human_msg[1] = (*prev_human_msg[1][:2], image_process_mode, *prev_human_msg[1][3:]) | |
state.skip_next = False | |
return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2 | |
def clear_history(): | |
logging.info(f"clear_history.") | |
state = default_conversation.copy() | |
return (state, state.to_gradio_chatbot_public(), "", None) + (disable_btn,) * 2 + (None,) | |
############ | |
# Show prompt in the chatbot | |
# Input: [state, textbox, imagebox, image_process_mode, audio_input, audio_mode] | |
# Return: [state, chatbot, textbox, imagebox, audio_input] + btn_list | |
############ | |
def add_text(state, text, image, image_process_mode, audio_input, audio_mode): | |
############ | |
# Input legality checking | |
############ | |
logging.info(f"add_text. len: {len(text)}") | |
if len(text) <= 0 and image is None and audio_input is None: | |
state.skip_next = True | |
return (state, state.to_gradio_chatbot_public(), "", None, None) + (no_change_btn,) * 2 | |
############ | |
# Re-initialize if having conducted audio conversations | |
############ | |
for i, (role, msg) in enumerate(state.messages[state.offset:]): | |
if isinstance(msg, tuple) and msg[-1] is not None: | |
state = default_conversation.copy() | |
break | |
############ | |
# Deal with image inputs | |
############ | |
if image is not None: | |
if '<image>' not in text: | |
text = text + '\n<image>' | |
text = (text, image, image_process_mode, None) | |
state = default_conversation.copy() | |
############ | |
# Deal with audio inputs | |
############ | |
if audio_input is not None or audio_mode == 'tts': | |
if isinstance(text, tuple): | |
if audio_mode == 'chat': | |
prompt = mode2func[audio_mode](text[0][:-len("\n<image>")], audio_input) | |
text = (prompt + "\n<image>", text[1], text[2], audio_input) | |
elif audio_mode == 'tts': | |
prompt = mode2func[audio_mode](text[0][:-len("\n<image>")], audio_input) | |
text = (prompt, None, None, None) | |
else: | |
prompt = mode2func[audio_mode](text, audio_input) | |
text = (prompt, None, None, audio_input) | |
else: | |
prompt = mode2func[audio_mode](text, audio_input) | |
text = (prompt, None, None, audio_input) | |
state = default_conversation.copy() | |
state.append_message(state.roles[0], text) | |
state.append_message(state.roles[1], None) | |
state.skip_next = False | |
logging.info(str(state.messages)) | |
return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2 | |
############ | |
# Get response | |
# Input: [state, temperature, top_p, max_output_tokens, speaker] | |
# Return: [state, chatbot] + btn_list | |
############ | |
def http_bot(state, temperature, top_p, max_new_tokens, speaker): | |
logging.info(f"http_bot.") | |
if state.skip_next: | |
yield (state, state.to_gradio_chatbot_public()) + (no_change_btn,) * 2 | |
return | |
if len(state.messages) == state.offset + 2: | |
# First round of conversation | |
if "llama3" in model_name.lower(): | |
template_name = 'llama3_demo' | |
elif "qwen2" in model_name.lower(): | |
template_name = 'qwen2_demo' | |
else: | |
template_name = "default" | |
new_state = conv_templates[template_name].copy() | |
new_state.append_message(new_state.roles[0], state.messages[-2][1]) | |
new_state.append_message(new_state.roles[1], None) | |
state = new_state | |
# Construct prompt | |
prompt = state.get_prompt() | |
all_images = state.get_images(return_pil=True) | |
# Make requests | |
pload = { | |
"model": model_name, | |
"prompt": prompt, | |
"temperature": float(temperature), | |
"top_p": float(top_p), | |
"max_new_tokens": int(max_new_tokens), | |
"stop": state.sep if state.sep_style in [SeparatorStyle.SINGLE, SeparatorStyle.MPT] else state.sep2, | |
"images": f'List of {len(state.get_images())} images: {all_images}', | |
} | |
logging.info(f"==== request ====\n{pload}") | |
# Process inputs | |
inputs = processor(text=[prompt], images=all_images if len(all_images) > 0 else None, return_tensors="pt") | |
inputs.to(model.device) | |
if len(all_images) > 0: | |
inputs['pixel_values'] = inputs['pixel_values'].to(model.dtype) | |
# Process hyperparameters | |
temperature = float(pload.get("temperature", 1.0)) | |
top_p = float(pload.get("top_p", 1.0)) | |
stop_str = pload.get("stop", None) | |
do_sample = True if temperature > 0.001 else False | |
max_context_length = getattr(model.config, 'max_position_embeddings', 2048) | |
max_new_tokens = int(pload.get("max_new_tokens", 256)) | |
max_new_tokens = min(max_new_tokens, max_context_length - inputs['input_ids'].shape[1]) | |
gen_kwargs = dict( | |
do_sample=do_sample, | |
temperature=temperature, | |
top_p=top_p, | |
max_new_tokens=max_new_tokens, | |
use_cache=True, | |
) | |
if max_new_tokens < 1: | |
state.messages[-1][-1] = "Exceeds max token length. Please start a new conversation, thanks." | |
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 | |
return | |
state.messages[-1][-1] = "▌" | |
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 | |
# Stream output | |
try: | |
for generated_text in stream_response(model, inputs, streamer, prompt, gen_kwargs): | |
output = generated_text[len(prompt):].strip() | |
if tts_format not in prompt and chat_format not in prompt: | |
state.messages[-1][-1] = output + "▌" | |
else: | |
state.messages[-1][-1] = "▌" | |
# state.messages[-1][-1] = "[😁 GENERATING AUDIO {}%...]".format(round(output.count("<|speech_") / max_new_tokens * 100, 1)) + "\n" + output + "▌" | |
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2 | |
except Exception as e: | |
os.system("nvidia-smi") | |
logging.info(traceback.print_exc()) | |
state.messages[-1][-1] = server_error_msg | |
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 | |
return | |
################ | |
# decode output to audio | |
################ | |
temp_file = None | |
if tts_format in prompt or chat_format in prompt: | |
try: | |
try: | |
if output.startswith("{"): | |
if output.endswith("|>"): | |
output += "\"}" | |
elif output.endswith("\""): | |
output += "}" | |
info_dict = json.loads(output) | |
content_unit = info_dict['assistant response speech'].replace('<|speech_', '').replace('|>', ' ').strip() | |
emotion = info_dict['assistant response emotion'] if hasattr(info_dict, 'assistant response emotion') else "neutral" | |
speed = info_dict['assistant response speed'] if hasattr(info_dict, 'assistant response speed') else "normal" | |
pitch = info_dict['assistant response pitch'] if hasattr(info_dict, 'assistant response pitch') else "normal" | |
gender = speaker.lower() if speaker else 'female' | |
except: | |
content_unit = output.replace('<|speech_', '').replace('|>', ' ').strip() | |
emotion = 'neutral' | |
speed = "normal" | |
pitch = "normal" | |
gender = speaker.lower() if speaker else 'female' | |
condition = f'gender-{gender}_emotion-{emotion}_speed-{speed}_pitch-{pitch}' | |
style_centroid_file = condition2style_centroid_file_dict[condition] | |
style_centroid_embedding = condition2style_centroid_embedding_dict[condition].cuda() | |
logging.info(condition) | |
id = str(uuid.uuid4()) | |
os.makedirs("./demo_audio", exist_ok=True) | |
synthesis(content_unit, style_centroid_embedding, hps, net_g, f"./demo_audio/{id}_temp_audio.wav") | |
temp_file = f"./demo_audio/{id}_temp_audio.wav" | |
except Exception as e: | |
os.system("nvidia-smi") | |
logging.info(traceback.print_exc()) | |
state.messages[-1][-1] = state.messages[-1][-1][:-1] | |
if tts_format in prompt or chat_format in prompt: | |
if temp_file is not None: | |
state.messages[-1][-1] = (output, temp_file) | |
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 | |
else: | |
state.messages[-1][-1] = server_error_msg | |
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 | |
else: | |
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2 | |
if temp_file is not None: | |
os.system("rm {}".format(temp_file)) | |
logging.info(f"{output}") | |
############ | |
# Layout Markdown | |
############ | |
title_markdown = (""" | |
<div style="display: flex; align-items: center; padding: 20px; border-radius: 10px; background-color: #f0f0f0;"> | |
<div style="margin-left: 20px; margin-right: 40px;"> | |
<img src="https://emova-ollm.github.io/static/images/icons/emova.png" alt="Icon" style="width: 100px; height: 100px; border-radius: 10px;"> | |
</div> | |
<div> | |
<h1 style="margin: 0;">EMOVA: Empowering Language Models to See, Hear and Speak with Vivid Emotions</h1> | |
<h2 style="margin: 10px 0;">📃 <a href="https://arxiv.org/abs/2409.18042" style="font-weight: 300;">Paper</a> | 💻 <a href="https://github.com/emova-ollm/EMOVA" style="font-weight: 300;">Code</a> | 🤗 <a href="https://huggingface.co/Emova-ollm" style="font-weight: 300;">HuggingFace</a> | 🌐 <a href="https://emova-ollm.github.io/" style="font-weight: 300;">Website</a></h2> | |
<p style="margin: 20px 0;"> | |
<strong>1. To chat with EMOVA, upload images, enter texts or record audios and then do not forget to <mark>Click 💬 Chat Button</mark> ^v^!</strong><br/> | |
<strong>2. Heighten the <code>Max output tokens</code> if necessary to talk longer with EMOVA.</strong> | |
</p> | |
</div> | |
</div> | |
""") | |
tos_markdown = (""" | |
## Terms of use | |
By using this service, users are required to agree to the following terms: | |
The service is a research preview intended for non-commercial use only. It only provides limited safety measures and may generate offensive content. It must not be used for any illegal, harmful, violent, racist, or sexual purposes. The service may collect user dialogue data for future research. | |
For an optimal experience, please use desktop computers for this demo, as mobile devices may compromise its quality. | |
""") | |
learn_more_markdown = (""" | |
## License | |
The service is a research preview intended for non-commercial use only, subject to the model [License](https://github.com/facebookresearch/llama/blob/main/MODEL_CARD.md) of LLaMA and [Privacy Practices](https://chrome.google.com/webstore/detail/sharegpt-share-your-chatg/daiacboceoaocpibfodeljbdfacokfjb) of ShareGPT. Please contact us if you find any potential violation. | |
## Acknowledgement | |
The service is built upon [LLaVA](https://github.com/haotian-liu/LLaVA/). We thanks the authors for open-sourcing the wonderful code. | |
## Citation | |
<pre><code>@article{chen2024emova, | |
title={Emova: Empowering language models to see, hear and speak with vivid emotions}, | |
author={Chen, Kai and Gou, Yunhao and Huang, Runhui and Liu, Zhili and Tan, Daxin and Xu, Jing and Wang, Chunwei and Zhu, Yi and Zeng, Yihan and Yang, Kuo and others}, | |
journal={arXiv preprint arXiv:2409.18042}, | |
year={2024} | |
}</code></pre> | |
""") | |
block_css = """ | |
#buttons button { | |
min-width: min(120px,100%); | |
} | |
.message-row img { | |
margin: 0px !important; | |
} | |
.avatar-container img { | |
padding: 0px !important; | |
} | |
""" | |
############ | |
# Layout Demo | |
############ | |
def build_demo(embed_mode): | |
textbox = gr.Textbox(label="Text", show_label=False, placeholder="Enter text or record audio in the right and then click 💬 Chat to talk with me ^v^", container=False, scale=6) | |
audio_input = gr.Audio(label="Audio", sources=["microphone", "upload"], type="filepath", max_length=10, show_download_button=True, waveform_options=dict(sample_rate=16000), scale=2) | |
with gr.Blocks(title="EMOVA", theme=gr.themes.Default(), css=block_css) as demo: | |
state = gr.State() | |
if not embed_mode: | |
gr.HTML(title_markdown) | |
############## | |
# Chatbot | |
############## | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
imagebox = gr.Image(type="pil", label="Image") | |
image_process_mode = gr.Radio( | |
["Crop", "Resize", "Pad", "Default"], | |
value="Default", | |
label="Preprocess for non-square image", visible=False) | |
############## | |
# Parameters | |
############## | |
with gr.Accordion("Parameters", open=True) as parameter_row: | |
temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, step=0.1, interactive=True, label="Temperature") | |
top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, step=0.1, interactive=True, label="Top P") | |
max_output_tokens = gr.Slider(minimum=0, maximum=4096, value=2048, step=32, interactive=True, label="Max output tokens") | |
speaker = gr.Radio(["Female", "Male"], value="Female", label="Speaker") | |
with gr.Column(scale=8): | |
chatbot = gr.Chatbot( | |
elem_id="chatbot", | |
label="EMOVA Chatbot", | |
layout="bubble", | |
avatar_images=["examples/user_avator.png", "examples/icon_256.png"] | |
) | |
with gr.Row(equal_height=True): | |
textbox.render() | |
audio_input.render() | |
with gr.Row(elem_id="buttons") as button_row: | |
submit_btn = gr.Button(value="💬 Chat", variant="primary") | |
#stop_btn = gr.Button(value="⏹️ Stop Generation", interactive=False) | |
regenerate_btn = gr.Button(value="🔄 Regenerate", interactive=False) | |
clear_btn = gr.Button(value="🗑️ Clear", interactive=False) | |
############## | |
# Examples | |
############## | |
with gr.Row(): | |
with gr.Column(scale=9): | |
gr.Examples(examples=[ | |
["./examples/emo-speech/what_is_your_name.wav"], | |
["./examples/emo-speech/I_am_so_sad.wav"], | |
["./examples/emo-speech/parent.wav"], | |
["./examples/emo-speech/wedding(CH).wav"], | |
], inputs=[audio_input], label='Audio Examples (Click to load the examples~)') | |
with gr.Row(equal_height=True): | |
gr.Examples(examples=[ | |
["./examples/image-text/example_1.png", "Why is this image funny?"], | |
["./examples/image-text/example_2.png", "First please perform reasoning, and think step by step to provide best answer to the following question:\n\nWhat is the original price for pork belly before discount?"], | |
["./examples/image-text/example_3.png", "Convert this table to markdown format."], | |
], inputs=[imagebox, textbox], label='Image Examples') | |
gr.Examples(examples=[ | |
["./examples/emo-speech/write_a_poem.jfif", "./examples/emo-speech/write_a_poem.wav"], | |
["./examples/emo-speech/I_am_happy_get_my_offer.webp", "./examples/emo-speech/I_am_happy_get_my_offer.wav"], | |
["./examples/structure-speech/names_of_main_actors.jpg", "./examples/structure-speech/names_of_main_actors.wav"], | |
], inputs=[imagebox, audio_input], label='Omni Examples 1') | |
gr.Examples(examples=[ | |
["./examples/structure-speech/how_to_save_water.png", "./examples/structure-speech/how_to_save_water.wav"], | |
["./examples/structure-speech/internet_coverage.png", "./examples/structure-speech/internet_coverage.wav"], | |
["./examples/structure-speech/how_to_use_website.PNG", "./examples/structure-speech/how_to_use_website.wav"], | |
], inputs=[imagebox, audio_input], label='Omni Examples 2') | |
if not embed_mode: | |
gr.Markdown(tos_markdown) | |
gr.Markdown(learn_more_markdown) | |
# Register listeners | |
btn_list = [regenerate_btn, clear_btn] | |
regenerate_btn.click( | |
regenerate, | |
[state, image_process_mode], | |
[state, chatbot, textbox, imagebox, audio_input] + btn_list | |
).then( | |
http_bot, | |
[state, temperature, top_p, max_output_tokens, speaker], | |
[state, chatbot] + btn_list, | |
) | |
clear_btn.click( | |
clear_history, | |
None, | |
[state, chatbot, textbox, imagebox] + btn_list + [audio_input], | |
queue=False | |
) | |
# probably mean press enter | |
textbox.submit( | |
add_text, | |
[state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)], | |
[state, chatbot, textbox, imagebox, audio_input] + btn_list, | |
queue=False | |
).then( | |
http_bot, | |
[state, temperature, top_p, max_output_tokens, speaker], | |
[state, chatbot] + btn_list, | |
) | |
submit_btn.click( | |
add_text, | |
[state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)], | |
[state, chatbot, textbox, imagebox, audio_input] + btn_list | |
).then( | |
http_bot, | |
[state, temperature, top_p, max_output_tokens, speaker], | |
[state, chatbot] + btn_list, | |
) | |
############## | |
# Demo loading | |
############## | |
demo.load( | |
load_demo_refresh_model_list, | |
None, | |
[state], | |
queue=False | |
) | |
return demo | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--share", action="store_true") | |
parser.add_argument("--embed", action="store_true") | |
args = parser.parse_args() | |
demo = build_demo(args.embed) | |
demo.queue( | |
max_size=10, | |
api_open=False | |
).launch( | |
favicon_path="./examples/icon_256.png", | |
allowed_paths=["/"], | |
share=args.share | |
) | |