|
import sherpa_onnx |
|
import numpy as np |
|
import ujson |
|
from pydub import AudioSegment |
|
from datetime import datetime as dt |
|
|
|
|
|
recognizer = sherpa_onnx.OfflineRecognizer.from_nemo_ctc( |
|
model = "GigaAMv2_ctc_public.onnx", |
|
tokens = "tokens.txt", |
|
feature_dim=64, |
|
num_threads=8, |
|
sample_rate=8000, |
|
decoding_method="greedy_search", |
|
provider="CUDA", |
|
debug = False, |
|
) |
|
|
|
def get_np_array_samples_float32(audio_bytes: bytes, sample_width: int = 2) -> np.ndarray: |
|
""" |
|
Преобразует аудио в байтах в массив float32. |
|
:param audio_bytes: Аудиоданные в байтах. |
|
:param sample_width: Размер одного сэмпла в байтах (обычно 2 для 16-битного аудио). |
|
:return: Массив numpy с данными в формате float32. |
|
""" |
|
|
|
|
|
dtype = np.int16 if sample_width == 2 else np.int32 |
|
|
|
|
|
samples = np.frombuffer(audio_bytes, dtype=dtype) |
|
samples_float32 = samples.astype(np.float32) |
|
samples_float32 = samples_float32 / 32768 |
|
|
|
return samples_float32 |
|
|
|
|
|
def simple_recognise(audio_data, ) -> dict: |
|
""" |
|
:param audio_data: Аудиоданные в формате Audiosegment (puDub). |
|
""" |
|
|
|
stream = None |
|
stream = recognizer.create_stream() |
|
audio_data = audio_data.set_frame_rate(8000) |
|
audio_data = audio_data[:] |
|
audio_data = audio_data.split_to_mono()[0] |
|
|
|
|
|
samples = get_np_array_samples_float32(audio_data.raw_data, audio_data.sample_width) |
|
|
|
print(f'Audio length - {audio_data.duration_seconds} secs.') |
|
|
|
|
|
stream.accept_waveform(sample_rate=audio_data.frame_rate, waveform=samples) |
|
recognizer.decode_stream(stream) |
|
|
|
result = ujson.loads(str(stream.result)) |
|
|
|
return result |
|
|
|
def process_gigaam_asr(input_json, time_shift = 0.0): |
|
""" |
|
Собираем токены в слова дополнительных вычислений не производим. |
|
:param input_json: json - результат работы stream.result |
|
:param input_json: time_shift - так как на вход логично будут приходить чанки, |
|
то для каждого чанка передаём его начало от начала записи. |
|
""" |
|
|
|
data = input_json |
|
|
|
|
|
result = {"data": {"result": [], "text": ""}} |
|
|
|
|
|
words = [] |
|
current_word = "" |
|
start_time, end_time = 0.0, 0.0 |
|
|
|
for i, token in enumerate(data['tokens']): |
|
if token != ' ': |
|
if current_word == "": |
|
start_time = round((data['timestamps'][i]+time_shift), 3) |
|
current_word += token |
|
end_time = round((data['timestamps'][i]+time_shift), 3) |
|
else: |
|
if current_word != "": |
|
words.append({'word': current_word, 'start': start_time, 'end': end_time}) |
|
current_word = "" |
|
|
|
|
|
if current_word != "": |
|
words.append({'word': current_word, 'start': start_time, 'end': end_time}) |
|
|
|
|
|
result['data'] = { |
|
'result': [{'start': word['start'], 'end': word['end'], 'word': word['word']} for word in words], |
|
'text': data['text'] |
|
} |
|
return result |
|
|
|
if __name__ == '__main__': |
|
file_path = "example.wav" |
|
sound = AudioSegment.from_file(str(file_path)) |
|
|
|
time_start = dt.now() |
|
asr_res = simple_recognise(audio_data=sound) |
|
res_w_word_timestamp = process_gigaam_asr(asr_res) |
|
print( |
|
f'Work time = {(dt.now()-time_start).total_seconds()}\n' |
|
) |
|
print(res_w_word_timestamp) |