File size: 4,181 Bytes
cf9d67c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import sherpa_onnx
import numpy as np
import ujson
from pydub import AudioSegment
from datetime import datetime as dt


recognizer = sherpa_onnx.OfflineRecognizer.from_nemo_ctc(
        model = "GigaAMv2_ctc_public.onnx",
        tokens = "tokens.txt",
        feature_dim=64,
        num_threads=8,
        sample_rate=8000,
        decoding_method="greedy_search",
        provider="CUDA",
        debug = False,
    )

def get_np_array_samples_float32(audio_bytes: bytes, sample_width: int = 2) -> np.ndarray:
    """
    Преобразует аудио в байтах в массив float32.
    :param audio_bytes: Аудиоданные в байтах.
    :param sample_width: Размер одного сэмпла в байтах (обычно 2 для 16-битного аудио).
    :return: Массив numpy с данными в формате float32.
    """

    # Определяем тип данных на основе sample_width
    dtype = np.int16 if sample_width == 2 else np.int32

    # Преобразуем байты в массив numpy
    samples = np.frombuffer(audio_bytes, dtype=dtype)
    samples_float32 = samples.astype(np.float32)
    samples_float32 = samples_float32 / 32768

    return samples_float32


def simple_recognise(audio_data, ) -> dict:
    """
    :param audio_data: Аудиоданные в формате Audiosegment (puDub).
    """

    stream = None
    stream = recognizer.create_stream()
    audio_data = audio_data.set_frame_rate(8000)
    audio_data = audio_data[:]  # cut here to 15-18 secs
    audio_data = audio_data.split_to_mono()[0]  # only mono allowed

    # перевод в семплы для распознавания.
    samples = get_np_array_samples_float32(audio_data.raw_data, audio_data.sample_width)

    print(f'Audio length - {audio_data.duration_seconds} secs.')

    # передали аудиофрагмент на распознавание
    stream.accept_waveform(sample_rate=audio_data.frame_rate, waveform=samples)
    recognizer.decode_stream(stream)

    result = ujson.loads(str(stream.result))

    return result

def process_gigaam_asr(input_json, time_shift = 0.0):
    """
        Собираем токены в слова дополнительных вычислений не производим.
        :param input_json: json - результат работы stream.result
        :param input_json: time_shift - так как на вход логично будут приходить чанки,
            то для каждого чанка передаём его начало от начала записи.
    """
    # Парсим JSON
    data = input_json

    # Формируем шаблон результата
    result = {"data": {"result": [], "text": ""}}

    # Собираем слова из токенов
    words = []
    current_word = ""
    start_time, end_time = 0.0, 0.0

    for i, token in enumerate(data['tokens']):
        if token != ' ':
            if current_word == "":
                start_time = round((data['timestamps'][i]+time_shift), 3)
            current_word += token
            end_time = round((data['timestamps'][i]+time_shift), 3)
        else:
            if current_word != "":
                words.append({'word': current_word, 'start': start_time, 'end': end_time})
                current_word = ""

    # Добавляем последнее слово, если оно есть
    if current_word != "":
        words.append({'word': current_word, 'start': start_time, 'end': end_time})

    # Формируем итоговый массив
    result['data'] = {
        'result': [{'start': word['start'], 'end': word['end'], 'word': word['word']} for word in words],
        'text': data['text']
    }
    return result

if __name__ == '__main__':
    file_path = "example.wav"
    sound = AudioSegment.from_file(str(file_path))

    time_start = dt.now()
    asr_res = simple_recognise(audio_data=sound)
    res_w_word_timestamp = process_gigaam_asr(asr_res)
    print(
        f'Work time = {(dt.now()-time_start).total_seconds()}\n'
    )
    print(res_w_word_timestamp)