EMOVA-demo / app.py
KaiChen1998's picture
reformulate demo interface
4722d2a
raw
history blame
23.1 kB
import argparse
import datetime
import json
import os
import time
import hashlib
import uuid
import traceback
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
import spaces
import gradio as gr
from conversation_public import default_conversation, conv_templates, SeparatorStyle
auth_token = os.environ.get("TOKEN_FROM_SECRET")
##########################################
# Audio part
##########################################
from huggingface_hub import snapshot_download
snapshot_download(repo_id="Emova-ollm/emova_speech_tokenizer", local_dir='./speech', token=auth_token)
from speech.speech_utils import s2u_extract_unit_demo, get_ckpt_config_path, load_model
from speech.speech_utils import load_condition_centroid, get_config_checkpoint_file, load_U2S_model, synthesis
####################
# S2U
####################
reduced=True
reduced_mark = 'reduced' if reduced else 'unreduced'
unit_type = '40ms_multilingual_8888'
language = 'English'
s2u_model_name = 'SPIRAL-FSQ-CTC'
ckpt_path, config_path = get_ckpt_config_path(unit_type, language)
s2u_model = load_model(ckpt_path, config_path, s2u_model_name)
####################
# U2S
####################
condition2style_centroid_file = "./speech/condition_style_centroid/condition2style_centroid.txt"
condition2style_centroid_file_dict, condition2style_centroid_embedding_dict = load_condition_centroid(condition2style_centroid_file)
unit_type = '40ms_multilingual_8888_xujing_cosyvoice_FT'
language = 'Chinese'
model_config_file, model_checkpoint_file = get_config_checkpoint_file(unit_type, language)
net_g, hps = load_U2S_model(model_config_file, model_checkpoint_file, unit_type)
####################
# task format
####################
asr_format = "Please recognize the text corresponding to the follwing speech.\n"
tts_format = "Please synthesize the speech corresponding to the follwing text.\n"
chat_format = r'Please recognize the texts, emotion and pitch from the user question speech units and provide the texts, emotion, pitch and speech units for the assistant response. \nEmotion should be chosen from ["neutral", "happy", "sad", "angry", "surprised", "disgusted", "fearful"]. \nPitch should be chosen from ["low", "normal", "high"].\nYour output should be in json format.\nAn output example is:\n{"user question text": "", "user question emotion": "", "user question pitch": "", "assistant response text": "", "assistant response emotion": "", "assistant response pitch": "","assistant response speech": ""}\n\nuser question speech:'
@spaces.GPU(duration=15)
def s2u_asr(text, audio_file):
return asr_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced)
@spaces.GPU(duration=15)
def s2u_chat(text, audio_file):
return chat_format + s2u_extract_unit_demo(s2u_model, audio_file, model_name=s2u_model_name, reduced=reduced)
def u2s_tts(text, audio_file):
return tts_format + text
mode2func = dict(
asr=s2u_asr,
chat=s2u_chat,
tts=u2s_tts,
)
##########################################
# LLM part
##########################################
import torch
from transformers import AutoModel, AutoProcessor, TextIteratorStreamer
from threading import Thread
model_name = "Emova-ollm/emova_llama3_1-8b"
model = AutoModel.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
use_flash_attn=True,
low_cpu_mem_usage=True,
trust_remote_code=True,
token=auth_token).eval().cuda()
processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True, token=auth_token)
streamer = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True, timeout=15)
def stream_response(model, inputs, streamer, prompt, gen_kwargs):
thread = Thread(target=model.generate, kwargs=dict(
streamer=streamer,
**inputs,
**gen_kwargs
))
thread.start()
generated_text = prompt
for new_text in streamer:
generated_text += new_text
yield generated_text
##########################################
# Gradio part
##########################################
no_change_btn = gr.Button()
enable_btn = gr.Button(interactive=True)
disable_btn = gr.Button(interactive=False)
server_error_msg = "**NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.**"
def load_demo_refresh_model_list():
logging.info(f"load_demo.")
state = default_conversation.copy()
return state
def regenerate(state, image_process_mode):
logging.info(f"regenerate.")
state.messages[-1][-1] = None
prev_human_msg = state.messages[-2]
if type(prev_human_msg[1]) in (tuple, list):
prev_human_msg[1] = (*prev_human_msg[1][:2], image_process_mode, *prev_human_msg[1][3:])
state.skip_next = False
return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2
def clear_history():
logging.info(f"clear_history.")
state = default_conversation.copy()
return (state, state.to_gradio_chatbot_public(), "", None) + (disable_btn,) * 2 + (None,)
############
# Show prompt in the chatbot
# Input: [state, textbox, imagebox, image_process_mode, audio_input, audio_mode]
# Return: [state, chatbot, textbox, imagebox, audio_input] + btn_list
############
def add_text(state, text, image, image_process_mode, audio_input, audio_mode):
############
# Input legality checking
############
logging.info(f"add_text. len: {len(text)}")
if len(text) <= 0 and image is None and audio_input is None:
state.skip_next = True
return (state, state.to_gradio_chatbot_public(), "", None, None) + (no_change_btn,) * 2
############
# Re-initialize if having conducted audio conversations
############
for i, (role, msg) in enumerate(state.messages[state.offset:]):
if isinstance(msg, tuple) and msg[-1] is not None:
state = default_conversation.copy()
break
############
# Deal with image inputs
############
if image is not None:
if '<image>' not in text:
text = text + '\n<image>'
text = (text, image, image_process_mode, None)
state = default_conversation.copy()
############
# Deal with audio inputs
############
if audio_input is not None or audio_mode == 'tts':
if isinstance(text, tuple):
if audio_mode == 'chat':
prompt = mode2func[audio_mode](text[0][:-len("\n<image>")], audio_input)
text = (prompt + "\n<image>", text[1], text[2], audio_input)
elif audio_mode == 'tts':
prompt = mode2func[audio_mode](text[0][:-len("\n<image>")], audio_input)
text = (prompt, None, None, None)
else:
prompt = mode2func[audio_mode](text, audio_input)
text = (prompt, None, None, audio_input)
else:
prompt = mode2func[audio_mode](text, audio_input)
text = (prompt, None, None, audio_input)
state = default_conversation.copy()
state.append_message(state.roles[0], text)
state.append_message(state.roles[1], None)
state.skip_next = False
logging.info(str(state.messages))
return (state, state.to_gradio_chatbot_public(), "", None, None) + (disable_btn,) * 2
############
# Get response
# Input: [state, temperature, top_p, max_output_tokens, speaker]
# Return: [state, chatbot] + btn_list
############
@spaces.GPU(duration=90)
def http_bot(state, temperature, top_p, max_new_tokens, speaker):
logging.info(f"http_bot.")
if state.skip_next:
yield (state, state.to_gradio_chatbot_public()) + (no_change_btn,) * 2
return
if len(state.messages) == state.offset + 2:
# First round of conversation
if "llama3" in model_name.lower():
template_name = 'llama3_demo'
elif "qwen2" in model_name.lower():
template_name = 'qwen2_demo'
else:
template_name = "default"
new_state = conv_templates[template_name].copy()
new_state.append_message(new_state.roles[0], state.messages[-2][1])
new_state.append_message(new_state.roles[1], None)
state = new_state
# Construct prompt
prompt = state.get_prompt()
all_images = state.get_images(return_pil=True)
# Make requests
pload = {
"model": model_name,
"prompt": prompt,
"temperature": float(temperature),
"top_p": float(top_p),
"max_new_tokens": int(max_new_tokens),
"stop": state.sep if state.sep_style in [SeparatorStyle.SINGLE, SeparatorStyle.MPT] else state.sep2,
"images": f'List of {len(state.get_images())} images: {all_images}',
}
logging.info(f"==== request ====\n{pload}")
# Process inputs
inputs = processor(text=[prompt], images=all_images if len(all_images) > 0 else None, return_tensors="pt")
inputs.to(model.device)
if len(all_images) > 0:
inputs['pixel_values'] = inputs['pixel_values'].to(model.dtype)
# Process hyperparameters
temperature = float(pload.get("temperature", 1.0))
top_p = float(pload.get("top_p", 1.0))
stop_str = pload.get("stop", None)
do_sample = True if temperature > 0.001 else False
max_context_length = getattr(model.config, 'max_position_embeddings', 2048)
max_new_tokens = int(pload.get("max_new_tokens", 256))
max_new_tokens = min(max_new_tokens, max_context_length - inputs['input_ids'].shape[1])
gen_kwargs = dict(
do_sample=do_sample,
temperature=temperature,
top_p=top_p,
max_new_tokens=max_new_tokens,
use_cache=True,
)
if max_new_tokens < 1:
state.messages[-1][-1] = "Exceeds max token length. Please start a new conversation, thanks."
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2
return
state.messages[-1][-1] = "▌"
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2
# Stream output
try:
for generated_text in stream_response(model, inputs, streamer, prompt, gen_kwargs):
output = generated_text[len(prompt):].strip()
if tts_format not in prompt and chat_format not in prompt:
state.messages[-1][-1] = output + "▌"
else:
state.messages[-1][-1] = "▌"
# state.messages[-1][-1] = "[😁 GENERATING AUDIO {}%...]".format(round(output.count("<|speech_") / max_new_tokens * 100, 1)) + "\n" + output + "▌"
yield (state, state.to_gradio_chatbot_public()) + (disable_btn,) * 2
except Exception as e:
os.system("nvidia-smi")
logging.info(traceback.print_exc())
state.messages[-1][-1] = server_error_msg
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2
return
################
# decode output to audio
################
temp_file = None
if tts_format in prompt or chat_format in prompt:
try:
try:
if output.startswith("{"):
if output.endswith("|>"):
output += "\"}"
elif output.endswith("\""):
output += "}"
info_dict = json.loads(output)
content_unit = info_dict['assistant response speech'].replace('<|speech_', '').replace('|>', ' ').strip()
emotion = info_dict['assistant response emotion'] if hasattr(info_dict, 'assistant response emotion') else "neutral"
speed = info_dict['assistant response speed'] if hasattr(info_dict, 'assistant response speed') else "normal"
pitch = info_dict['assistant response pitch'] if hasattr(info_dict, 'assistant response pitch') else "normal"
gender = speaker.lower() if speaker else 'female'
except:
content_unit = output.replace('<|speech_', '').replace('|>', ' ').strip()
emotion = 'neutral'
speed = "normal"
pitch = "normal"
gender = speaker.lower() if speaker else 'female'
condition = f'gender-{gender}_emotion-{emotion}_speed-{speed}_pitch-{pitch}'
style_centroid_file = condition2style_centroid_file_dict[condition]
style_centroid_embedding = condition2style_centroid_embedding_dict[condition].cuda()
logging.info(condition)
id = str(uuid.uuid4())
os.makedirs("./demo_audio", exist_ok=True)
synthesis(content_unit, style_centroid_embedding, hps, net_g, f"./demo_audio/{id}_temp_audio.wav")
temp_file = f"./demo_audio/{id}_temp_audio.wav"
except Exception as e:
os.system("nvidia-smi")
logging.info(traceback.print_exc())
state.messages[-1][-1] = state.messages[-1][-1][:-1]
if tts_format in prompt or chat_format in prompt:
if temp_file is not None:
state.messages[-1][-1] = (output, temp_file)
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2
else:
state.messages[-1][-1] = server_error_msg
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2
else:
yield (state, state.to_gradio_chatbot_public()) + (enable_btn,) * 2
if temp_file is not None:
os.system("rm {}".format(temp_file))
logging.info(f"{output}")
############
# Layout Markdown
############
title_markdown = ("""
<div style="display: flex; align-items: center; padding: 20px; border-radius: 10px; background-color: #f0f0f0;">
<div style="margin-left: 20px; margin-right: 40px;">
<img src="https://emova-ollm.github.io/static/images/icons/emova.png" alt="Icon" style="width: 100px; height: 100px; border-radius: 10px;">
</div>
<div>
<h1 style="margin: 0;">EMOVA: Empowering Language Models to See, Hear and Speak with Vivid Emotions</h1>
<h2 style="margin: 10px 0;">📃 <a href="https://arxiv.org/abs/2409.18042" style="font-weight: 300;">Paper</a> | 💻 <a href="https://github.com/emova-ollm/EMOVA" style="font-weight: 300;">Code</a> | 🤗 <a href="https://huggingface.co/Emova-ollm" style="font-weight: 300;">HuggingFace</a> | 🌐 <a href="https://emova-ollm.github.io/" style="font-weight: 300;">Website</a></h2>
<p style="margin: 20px 0;">
<strong>1. To chat with EMOVA, upload images, enter texts or record audios and then do not forget to <mark>Click 💬 Chat Button</mark> ^v^!</strong><br/>
<strong>2. Heighten the <code>Max output tokens</code> if necessary to talk longer with EMOVA.</strong>
</p>
</div>
</div>
""")
tos_markdown = ("""
## Terms of use
By using this service, users are required to agree to the following terms:
The service is a research preview intended for non-commercial use only. It only provides limited safety measures and may generate offensive content. It must not be used for any illegal, harmful, violent, racist, or sexual purposes. The service may collect user dialogue data for future research.
For an optimal experience, please use desktop computers for this demo, as mobile devices may compromise its quality.
""")
learn_more_markdown = ("""
## License
The service is a research preview intended for non-commercial use only, subject to the model [License](https://github.com/facebookresearch/llama/blob/main/MODEL_CARD.md) of LLaMA and [Privacy Practices](https://chrome.google.com/webstore/detail/sharegpt-share-your-chatg/daiacboceoaocpibfodeljbdfacokfjb) of ShareGPT. Please contact us if you find any potential violation.
## Acknowledgement
The service is built upon [LLaVA](https://github.com/haotian-liu/LLaVA/). We thanks the authors for open-sourcing the wonderful code.
## Citation
<pre><code>@article{chen2024emova,
title={Emova: Empowering language models to see, hear and speak with vivid emotions},
author={Chen, Kai and Gou, Yunhao and Huang, Runhui and Liu, Zhili and Tan, Daxin and Xu, Jing and Wang, Chunwei and Zhu, Yi and Zeng, Yihan and Yang, Kuo and others},
journal={arXiv preprint arXiv:2409.18042},
year={2024}
}</code></pre>
""")
block_css = """
#buttons button {
min-width: min(120px,100%);
}
.message-row img {
margin: 0px !important;
}
.avatar-container img {
padding: 0px !important;
}
"""
############
# Layout Demo
############
def build_demo(embed_mode):
textbox = gr.Textbox(label="Text", show_label=False, placeholder="Enter text or record audio in the right and then click 💬 Chat to talk with me ^v^", container=False, scale=6)
audio_input = gr.Audio(label="Audio", sources=["microphone", "upload"], type="filepath", max_length=10, show_download_button=True, waveform_options=dict(sample_rate=16000), scale=2)
with gr.Blocks(title="EMOVA", theme=gr.themes.Default(), css=block_css) as demo:
state = gr.State()
if not embed_mode:
gr.HTML(title_markdown)
##############
# Chatbot
##############
with gr.Row(equal_height=True):
with gr.Column(scale=1):
imagebox = gr.Image(type="pil", label="Image")
image_process_mode = gr.Radio(
["Crop", "Resize", "Pad", "Default"],
value="Default",
label="Preprocess for non-square image", visible=False)
##############
# Parameters
##############
with gr.Accordion("Parameters", open=True) as parameter_row:
temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, step=0.1, interactive=True, label="Temperature")
top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, step=0.1, interactive=True, label="Top P")
max_output_tokens = gr.Slider(minimum=0, maximum=4096, value=2048, step=32, interactive=True, label="Max output tokens")
speaker = gr.Radio(["Female", "Male"], value="Female", label="Speaker")
with gr.Column(scale=8):
chatbot = gr.Chatbot(
elem_id="chatbot",
label="EMOVA Chatbot",
layout="bubble",
avatar_images=["examples/user_avator.png", "examples/icon_256.png"]
)
with gr.Row(equal_height=True):
textbox.render()
audio_input.render()
with gr.Row(elem_id="buttons") as button_row:
submit_btn = gr.Button(value="💬 Chat", variant="primary")
#stop_btn = gr.Button(value="⏹️ Stop Generation", interactive=False)
regenerate_btn = gr.Button(value="🔄 Regenerate", interactive=False)
clear_btn = gr.Button(value="🗑️ Clear", interactive=False)
##############
# Examples
##############
with gr.Row():
with gr.Column(scale=9):
gr.Examples(examples=[
["./examples/emo-speech/what_is_your_name.wav"],
["./examples/emo-speech/I_am_so_sad.wav"],
["./examples/emo-speech/parent.wav"],
["./examples/emo-speech/wedding(CH).wav"],
], inputs=[audio_input], label='Audio Examples (Click to load the examples~)')
with gr.Row(equal_height=True):
gr.Examples(examples=[
["./examples/image-text/example_1.png", "Why is this image funny?"],
["./examples/image-text/example_2.png", "First please perform reasoning, and think step by step to provide best answer to the following question:\n\nWhat is the original price for pork belly before discount?"],
["./examples/image-text/example_3.png", "Convert this table to markdown format."],
], inputs=[imagebox, textbox], label='Image Examples')
gr.Examples(examples=[
["./examples/emo-speech/write_a_poem.jfif", "./examples/emo-speech/write_a_poem.wav"],
["./examples/emo-speech/I_am_happy_get_my_offer.webp", "./examples/emo-speech/I_am_happy_get_my_offer.wav"],
["./examples/structure-speech/names_of_main_actors.jpg", "./examples/structure-speech/names_of_main_actors.wav"],
], inputs=[imagebox, audio_input], label='Omni Examples 1')
gr.Examples(examples=[
["./examples/structure-speech/how_to_save_water.png", "./examples/structure-speech/how_to_save_water.wav"],
["./examples/structure-speech/internet_coverage.png", "./examples/structure-speech/internet_coverage.wav"],
["./examples/structure-speech/how_to_use_website.PNG", "./examples/structure-speech/how_to_use_website.wav"],
], inputs=[imagebox, audio_input], label='Omni Examples 2')
if not embed_mode:
gr.Markdown(tos_markdown)
gr.Markdown(learn_more_markdown)
# Register listeners
btn_list = [regenerate_btn, clear_btn]
regenerate_btn.click(
regenerate,
[state, image_process_mode],
[state, chatbot, textbox, imagebox, audio_input] + btn_list
).then(
http_bot,
[state, temperature, top_p, max_output_tokens, speaker],
[state, chatbot] + btn_list,
)
clear_btn.click(
clear_history,
None,
[state, chatbot, textbox, imagebox] + btn_list + [audio_input],
queue=False
)
# probably mean press enter
textbox.submit(
add_text,
[state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)],
[state, chatbot, textbox, imagebox, audio_input] + btn_list,
queue=False
).then(
http_bot,
[state, temperature, top_p, max_output_tokens, speaker],
[state, chatbot] + btn_list,
)
submit_btn.click(
add_text,
[state, textbox, imagebox, image_process_mode, audio_input, gr.Number(value='chat', visible=False)],
[state, chatbot, textbox, imagebox, audio_input] + btn_list
).then(
http_bot,
[state, temperature, top_p, max_output_tokens, speaker],
[state, chatbot] + btn_list,
)
##############
# Demo loading
##############
demo.load(
load_demo_refresh_model_list,
None,
[state],
queue=False
)
return demo
parser = argparse.ArgumentParser()
parser.add_argument("--share", action="store_true")
parser.add_argument("--embed", action="store_true")
args = parser.parse_args()
demo = build_demo(args.embed)
demo.queue(
max_size=10,
api_open=False
).launch(
favicon_path="./examples/icon_256.png",
allowed_paths=["/"],
share=args.share
)