Spaces:
Runtime error
Runtime error
import os | |
import argparse | |
import random | |
import jsonlines | |
import soundfile as sf | |
import json | |
import copy | |
import torch | |
from pathlib import Path | |
from threading import Thread | |
import torchaudio | |
from transformers import AutoTokenizer | |
from model import VoilaAudioAlphaModel, VoilaModel, VoilaAutonomousModel | |
from spkr import SpeakerEmbedding | |
from voila_tokenizer import VoilaTokenizer | |
from tokenize_func import ( | |
voila_input_format, | |
AUDIO_TOKEN_FORMAT, | |
DEFAULT_AUDIO_TOKEN, | |
DEFAULT_ASSISTANT_TOKEN, | |
) | |
def disable_torch_init(): | |
""" | |
Disable the redundant torch default initialization to accelerate model creation. | |
""" | |
import torch | |
setattr(torch.nn.Linear, "reset_parameters", lambda self: None) | |
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) | |
def load_model(model_name, audio_tokenizer_path): | |
disable_torch_init() | |
if "Voila-audio" in model_name: | |
model_type = "audio" | |
cls = VoilaAudioAlphaModel | |
elif "Voila-auto" in model_name: | |
model_type = "autonomous" | |
cls = VoilaAutonomousModel | |
else: | |
model_type = "token" | |
cls = VoilaModel | |
model = cls.from_pretrained( | |
model_name, | |
torch_dtype=torch.bfloat16, | |
use_flash_attention_2=True, | |
use_cache=True, | |
) | |
model = model.cuda() | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
tokenizer_voila = VoilaTokenizer(model_path=audio_tokenizer_path, device="cuda") | |
return model, tokenizer, tokenizer_voila, model_type | |
def is_audio_output_task(task_type): | |
return task_type.endswith("ao") or "aiao" in task_type or "tts" in task_type | |
def eval_model(model, tokenizer, tokenizer_voila, model_type, task_type, history, ref_embs, ref_embs_mask, max_new_tokens=512): | |
# step1: initializing | |
num_codebooks = model.config.num_codebooks | |
codebook_size = model.config.codebook_size | |
AUDIO_MIN_TOKEN_ID = tokenizer.convert_tokens_to_ids(AUDIO_TOKEN_FORMAT.format(0)) | |
assert isinstance(AUDIO_MIN_TOKEN_ID, int) | |
AUDIO_MAX_TOKEN_ID = tokenizer.convert_tokens_to_ids(AUDIO_TOKEN_FORMAT.format(codebook_size*num_codebooks-1)) | |
assert isinstance(AUDIO_MAX_TOKEN_ID, int) | |
AUDIO_TOKEN_ID = tokenizer.convert_tokens_to_ids(DEFAULT_AUDIO_TOKEN) | |
assert isinstance(AUDIO_TOKEN_ID, int) | |
ASSISTANT_TOKEN_ID = tokenizer.convert_tokens_to_ids(DEFAULT_ASSISTANT_TOKEN) | |
assert isinstance(ASSISTANT_TOKEN_ID, int) | |
# step2: set infer config | |
data_cfg = { | |
"input_type": model_type, | |
"task_type": task_type, | |
"num_codebooks": num_codebooks, | |
"codebook_size": codebook_size, | |
} | |
# step3: infer | |
input_ids, audio_datas, audio_data_masks, streaming_user_input_audio_tokens = voila_input_format(history, tokenizer, tokenizer_voila, data_cfg) | |
# prepare user_streaming_generator to simulate streaming user input | |
def get_input_generator(all_tokens): | |
assert all_tokens is not None | |
for i in range(len(all_tokens[0])): | |
yield all_tokens[:,i] | |
if model_type == "autonomous": | |
input_generator = get_input_generator(torch.as_tensor(streaming_user_input_audio_tokens).cuda()) | |
input_ids = [torch.as_tensor([input]).transpose(1,2).cuda() for input in input_ids] # transpose to [bs, seq, num_codebooks] | |
input_ids = torch.cat(input_ids, dim=2) # concat to [bs, seq, num_codebooks*2] | |
else: | |
input_ids = torch.as_tensor([input_ids]).transpose(1,2).cuda() # transpose to [bs, seq, num_codebooks] | |
gen_params = { | |
"input_ids": input_ids, | |
"ref_embs": ref_embs, | |
"ref_embs_mask": ref_embs_mask, | |
"max_new_tokens": max_new_tokens, | |
"pad_token_id": tokenizer.pad_token_id, | |
"eos_token_id": tokenizer.eos_token_id, | |
"llm_audio_token_id": AUDIO_TOKEN_ID, | |
"min_audio_token_id": AUDIO_MIN_TOKEN_ID, | |
"temperature": 0.2, | |
"top_k": 50, | |
"audio_temperature": 0.8, | |
"audio_top_k": 50, | |
} | |
if model_type == "audio": | |
audio_datas = torch.tensor([audio_datas], dtype=torch.bfloat16).cuda() | |
audio_data_masks = torch.tensor([audio_data_masks]).cuda() | |
gen_params["audio_datas"] = audio_datas | |
gen_params["audio_data_masks"] = audio_data_masks | |
elif model_type == "autonomous": | |
gen_params["input_generator"] = input_generator | |
gen_params["llm_assistant_token_id"] = ASSISTANT_TOKEN_ID | |
print(f"Input str: {tokenizer.decode(input_ids[0, :, 0])}") | |
with torch.inference_mode(): | |
outputs = model.run_generate(**gen_params) | |
if model_type == "autonomous": | |
outputs = outputs.chunk(2, dim=2)[1] | |
outputs = outputs[0].cpu().tolist() | |
predict_outputs = outputs[input_ids.shape[1]:] | |
text_outputs = [] | |
audio_outputs = [] | |
for _ in range(num_codebooks): | |
audio_outputs.append([]) | |
for item in predict_outputs: | |
if item[0] >= AUDIO_MIN_TOKEN_ID and item[0] <= AUDIO_MAX_TOKEN_ID: | |
for n, at in enumerate(item): | |
audio_outputs[n].append((at - AUDIO_MIN_TOKEN_ID)%codebook_size) | |
elif item[0] != tokenizer.eos_token_id: | |
text_outputs.append(item[0]) | |
out ={ | |
'text': tokenizer.decode(text_outputs), | |
} | |
if is_audio_output_task(task_type): | |
audio_values = tokenizer_voila.decode(torch.tensor(audio_outputs).cuda()) | |
out['audio'] = (audio_values.detach().cpu().numpy(), 16000) | |
return out | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--instruction", type=str, default="") | |
parser.add_argument("--input-text", type=str, default=None) | |
parser.add_argument("--input-audio", type=str, default=None) | |
parser.add_argument("--result-path", type=str, default="output") | |
parser.add_argument("--ref-audio", type=str, default="examples/test1.mp3") | |
parser.add_argument("--model-name", type=str, default="maitrix-org/Voila-chat") | |
parser.add_argument("--audio-tokenizer-path", type=str, default="maitrix-org/Voila-Tokenizer") | |
parser.add_argument("--task-type", type=str, default="chat_aiao") | |
args = parser.parse_args() | |
assert args.model_name in [ | |
"maitrix-org/Voila-audio-alpha", | |
"maitrix-org/Voila-base", | |
"maitrix-org/Voila-chat", | |
"maitrix-org/Voila-autonomous-preview", | |
] | |
# step0: Model loading | |
model, tokenizer, tokenizer_voila, model_type = load_model(args.model_name, args.audio_tokenizer_path) | |
# step1: prepare inputs | |
Path(args.result_path).mkdir(exist_ok=True, parents=True) | |
history = { | |
"instruction": args.instruction, | |
"conversations": [], | |
} | |
if args.input_text is not None: | |
history["conversations"].append({"from": "user", "text": args.input_text}) | |
elif args.input_audio is not None: | |
history["conversations"].append({"from": "user", "audio": {"file": args.input_audio}}) | |
else: | |
raise Exception("Please provide atleast one of --input-text and --input-audio") | |
history["conversations"].append({"from": "assistant"}) | |
# step2: encode ref | |
ref_embs, ref_embs_mask = None, None | |
if is_audio_output_task(args.task_type): | |
spkr_model = SpeakerEmbedding(device="cuda") | |
wav, sr = torchaudio.load(args.ref_audio) | |
ref_embs = spkr_model(wav, sr) | |
ref_embs_mask = torch.tensor([1]).cuda() | |
out = eval_model(model, tokenizer, tokenizer_voila, model_type, args.task_type, history, ref_embs, ref_embs_mask) | |
print(f"Output str: {out['text']}") | |
if 'audio' in out: | |
wav, sr = out['audio'] | |
save_name = f"{args.result_path}/out.wav" | |
sf.write(save_name, wav, sr) | |