import sherpa_onnx import numpy as np import ujson from pydub import AudioSegment from datetime import datetime as dt recognizer = sherpa_onnx.OfflineRecognizer.from_nemo_ctc( model = "GigaAMv2_ctc_public.onnx", tokens = "tokens.txt", feature_dim=64, num_threads=8, sample_rate=8000, decoding_method="greedy_search", provider="CUDA", debug = False, ) def get_np_array_samples_float32(audio_bytes: bytes, sample_width: int = 2) -> np.ndarray: """ Преобразует аудио в байтах в массив float32. :param audio_bytes: Аудиоданные в байтах. :param sample_width: Размер одного сэмпла в байтах (обычно 2 для 16-битного аудио). :return: Массив numpy с данными в формате float32. """ # Определяем тип данных на основе sample_width dtype = np.int16 if sample_width == 2 else np.int32 # Преобразуем байты в массив numpy samples = np.frombuffer(audio_bytes, dtype=dtype) samples_float32 = samples.astype(np.float32) samples_float32 = samples_float32 / 32768 return samples_float32 def simple_recognise(audio_data, ) -> dict: """ :param audio_data: Аудиоданные в формате Audiosegment (puDub). """ stream = None stream = recognizer.create_stream() audio_data = audio_data.set_frame_rate(8000) audio_data = audio_data[:] # cut here to 15-18 secs audio_data = audio_data.split_to_mono()[0] # only mono allowed # перевод в семплы для распознавания. samples = get_np_array_samples_float32(audio_data.raw_data, audio_data.sample_width) print(f'Audio length - {audio_data.duration_seconds} secs.') # передали аудиофрагмент на распознавание stream.accept_waveform(sample_rate=audio_data.frame_rate, waveform=samples) recognizer.decode_stream(stream) result = ujson.loads(str(stream.result)) return result def process_gigaam_asr(input_json, time_shift = 0.0): """ Собираем токены в слова дополнительных вычислений не производим. :param input_json: json - результат работы stream.result :param input_json: time_shift - так как на вход логично будут приходить чанки, то для каждого чанка передаём его начало от начала записи. """ # Парсим JSON data = input_json # Формируем шаблон результата result = {"data": {"result": [], "text": ""}} # Собираем слова из токенов words = [] current_word = "" start_time, end_time = 0.0, 0.0 for i, token in enumerate(data['tokens']): if token != ' ': if current_word == "": start_time = round((data['timestamps'][i]+time_shift), 3) current_word += token end_time = round((data['timestamps'][i]+time_shift), 3) else: if current_word != "": words.append({'word': current_word, 'start': start_time, 'end': end_time}) current_word = "" # Добавляем последнее слово, если оно есть if current_word != "": words.append({'word': current_word, 'start': start_time, 'end': end_time}) # Формируем итоговый массив result['data'] = { 'result': [{'start': word['start'], 'end': word['end'], 'word': word['word']} for word in words], 'text': data['text'] } return result if __name__ == '__main__': file_path = "example.wav" sound = AudioSegment.from_file(str(file_path)) time_start = dt.now() asr_res = simple_recognise(audio_data=sound) res_w_word_timestamp = process_gigaam_asr(asr_res) print( f'Work time = {(dt.now()-time_start).total_seconds()}\n' ) print(res_w_word_timestamp)