Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import shlex | |
import subprocess | |
import tempfile | |
import traceback | |
from pathlib import Path | |
# --- Install / fetch runtime deps & assets --- | |
os.system("pip install -r requirements.txt") | |
# Download token2wav assets | |
os.system("wget https://huggingface.co/stepfun-ai/Step-Audio-2-mini/resolve/main/token2wav/campplus.onnx -P token2wav") | |
os.system("wget https://huggingface.co/stepfun-ai/Step-Audio-2-mini/resolve/main/token2wav/flow.pt -P token2wav") | |
os.system("wget https://huggingface.co/stepfun-ai/Step-Audio-2-mini/resolve/main/token2wav/flow.yaml -P token2wav") | |
os.system("wget https://huggingface.co/stepfun-ai/Step-Audio-2-mini/resolve/main/token2wav/hift.pt -P token2wav") | |
# Hugging Face token (optional) | |
hf_token = os.getenv("HF_TOKEN", None) | |
if hf_token is not None: | |
os.environ["HF_TOKEN"] = hf_token | |
import spaces | |
import gradio as gr | |
# ----------------------- | |
# Utility helpers | |
# ----------------------- | |
def save_tmp_audio(audio_bytes: bytes, cache_dir: str) -> str: | |
"""Save raw wav bytes to a temporary file and return path.""" | |
os.makedirs(cache_dir, exist_ok=True) | |
with tempfile.NamedTemporaryFile(dir=cache_dir, delete=False, suffix=".wav") as temp_audio: | |
temp_audio.write(audio_bytes) | |
return temp_audio.name | |
def add_message(chatbot, history, mic, text): | |
"""Append user text or audio to the chat + history.""" | |
if not mic and not text: | |
return chatbot, history, "Input is empty" | |
if text: | |
chatbot.append({"role": "user", "content": text}) | |
history.append({"role": "human", "content": text}) | |
elif mic and Path(mic).exists(): | |
chatbot.append({"role": "user", "content": {"path": mic}}) | |
history.append({"role": "human", "content": [{"type": "audio", "audio": mic}]}) | |
print(f"{history=}") | |
return chatbot, history, None | |
def reset_state(system_prompt: str): | |
"""Reset chat to a single system message.""" | |
return [], [{"role": "system", "content": system_prompt}] | |
# ----------------------- | |
# Lazy model loading inside the GPU worker | |
# ----------------------- | |
_MODEL = None | |
_TOK2WAV = None | |
def _get_models(model_path: str): | |
""" | |
Lazily load heavy, non-picklable models INSIDE the worker process | |
and cache them in module globals for reuse. | |
""" | |
global _MODEL, _TOK2WAV | |
if _MODEL is None or _TOK2WAV is None: | |
# Import here so the objects are constructed in the worker | |
from stepaudio2 import StepAudio2 | |
from token2wav import Token2wav | |
_MODEL = StepAudio2(model_path) | |
_TOK2WAV = Token2wav("token2wav") | |
return _MODEL, _TOK2WAV | |
# ----------------------- | |
# Inference | |
# ----------------------- | |
def predict(chatbot, history, prompt_wav_path, cache_dir, model_path="Step-Audio-2-mini"): | |
""" | |
Run generation on GPU worker. All args must be picklable (strings, lists, dicts). | |
Heavy models are created via _get_models() inside this process. | |
`prompt_wav_path` is the CURRENT reference audio to condition on (can be user upload). | |
""" | |
try: | |
audio_model, token2wav = _get_models(model_path) | |
history.append({ | |
"role": "assistant", | |
"content": [{"type": "text", "text": "<tts_start>"}], | |
"eot": False | |
}) | |
tokens, text, audio_tokens = audio_model( | |
history, | |
max_new_tokens=4096, | |
temperature=0.7, | |
repetition_penalty=1.05, | |
do_sample=True, | |
) | |
print(f"predict text={text!r}") | |
# Convert tokens -> waveform bytes using token2wav with the *selected* prompt | |
prompt_path = prompt_wav_path if (prompt_wav_path and Path(prompt_wav_path).exists()) else None | |
audio_bytes = token2wav(audio_tokens, prompt_path) | |
# Persist to temp .wav for the UI | |
audio_path = save_tmp_audio(audio_bytes, cache_dir) | |
# Append assistant audio message | |
chatbot.append({"role": "assistant", "content": {"path": audio_path}}) | |
history[-1]["content"].append({"type": "token", "token": tokens}) | |
history[-1]["eot"] = True | |
except Exception: | |
print(traceback.format_exc()) | |
gr.Warning("Some error happened, please try again.") | |
return chatbot, history | |
# ----------------------- | |
# UI | |
# ----------------------- | |
def _launch_demo(args): | |
with gr.Blocks(delete_cache=(86400, 86400)) as demo: | |
gr.Markdown("""<center><font size=8>Step Audio 2 Demo</font></center>""") | |
with gr.Row(): | |
system_prompt = gr.Textbox( | |
label="System Prompt", | |
value=( | |
"你的名字叫做小跃,是由阶跃星辰公司训练出来的语音大模型。\n" | |
"你情感细腻,观察能力强,擅长分析用户的内容,并作出善解人意的回复," | |
"说话的过程中时刻注意用户的感受,富有同理心,提供多样的情绪价值。\n" | |
"今天是2025年8月29日,星期五\n" | |
"请用默认女声与用户交流。" | |
), | |
lines=2, | |
) | |
chatbot = gr.Chatbot( | |
elem_id="chatbot", | |
min_height=800, | |
type="messages", | |
) | |
# Initialize history with current system prompt value | |
history = gr.State([{"role": "system", "content": system_prompt.value}]) | |
# NEW: keep track of the *current* prompt wav path (defaults to bundled voice) | |
current_prompt_wav = gr.State(args.prompt_wav) | |
mic = gr.Audio(type="filepath", label="🎤 Speak (optional)") | |
text = gr.Textbox(placeholder="Enter message ...", label="💬 Text") | |
with gr.Row(): | |
clean_btn = gr.Button("🧹 Clear History (清除历史)") | |
regen_btn = gr.Button("🤔️ Regenerate (重试)") | |
submit_btn = gr.Button("🚀 Submit") | |
def on_submit(chatbot_val, history_val, mic_val, text_val, current_prompt): | |
chatbot2, history2, error = add_message(chatbot_val, history_val, mic_val, text_val) | |
if error: | |
gr.Warning(error) | |
# keep state intact | |
return chatbot2, history2, None, None, current_prompt | |
# Choose prompt: prefer latest user mic if present, else stick to remembered prompt | |
prompt_path = mic_val if (mic_val and Path(mic_val).exists()) else current_prompt | |
chatbot2, history2 = predict( | |
chatbot2, history2, | |
prompt_path, | |
args.cache_dir, | |
model_path=args.model_path, | |
) | |
# Clear inputs; remember the prompt we actually used | |
new_prompt_state = prompt_path | |
return chatbot2, history2, None, None, new_prompt_state | |
submit_btn.click( | |
fn=on_submit, | |
inputs=[chatbot, history, mic, text, current_prompt_wav], | |
outputs=[chatbot, history, mic, text, current_prompt_wav], | |
concurrency_limit=4, | |
concurrency_id="gpu_queue", | |
) | |
def on_clean(system_prompt_text, _default_prompt): | |
# Reset chat and also reset the remembered prompt back to default | |
new_chatbot, new_history = reset_state(system_prompt_text) | |
return new_chatbot, new_history, _default_prompt | |
clean_btn.click( | |
fn=on_clean, | |
inputs=[system_prompt, current_prompt_wav], | |
outputs=[chatbot, history, current_prompt_wav], | |
) | |
def on_regenerate(chatbot_val, history_val, current_prompt): | |
# Drop last assistant turn(s) to regenerate | |
while chatbot_val and chatbot_val[-1]["role"] == "assistant": | |
chatbot_val.pop() | |
while history_val and history_val[-1]["role"] == "assistant": | |
print(f"discard {history_val[-1]}") | |
history_val.pop() | |
return predict( | |
chatbot_val, history_val, | |
current_prompt, # use the remembered prompt for regen | |
args.cache_dir, | |
model_path=args.model_path, | |
) | |
regen_btn.click( | |
fn=on_regenerate, | |
inputs=[chatbot, history, current_prompt_wav], | |
outputs=[chatbot, history], | |
concurrency_id="gpu_queue", | |
) | |
demo.queue().launch( | |
server_port=args.server_port, | |
server_name=args.server_name, | |
) | |
# ----------------------- | |
# Entrypoint | |
# ----------------------- | |
if __name__ == "__main__": | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument("--model-path", type=str, default="Step-Audio-2-mini", help="Model path.") | |
parser.add_argument("--server-port", type=int, default=7860, help="Demo server port.") | |
parser.add_argument("--server-name", type=str, default="0.0.0.0", help="Demo server name.") | |
parser.add_argument("--prompt-wav", type=str, default="assets/default_female.wav", help="Prompt wave for the assistant.") | |
parser.add_argument("--cache-dir", type=str, default="/tmp/stepaudio2", help="Cache directory.") | |
args = parser.parse_args() | |
os.environ["GRADIO_TEMP_DIR"] = args.cache_dir | |
Path(args.cache_dir).mkdir(parents=True, exist_ok=True) | |
_launch_demo(args) | |