File size: 3,064 Bytes
a7ab6a9
 
 
 
 
 
4739174
 
 
 
a7ab6a9
 
 
4739174
 
 
 
 
a7ab6a9
4739174
 
 
 
 
 
 
a7ab6a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4739174
 
 
 
 
 
a7ab6a9
4739174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import torchaudio
import gradio as gr
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
import scipy.io.wavfile
import os
from huggingface_hub import HfApi

# Global variable to store the user's token
HUGGINGFACE_ACCESS_TOKEN = None


def perform_separation(audio_file_path: str):
    global HUGGINGFACE_ACCESS_TOKEN

    if not HUGGINGFACE_ACCESS_TOKEN:
        return [], "Please log in with your HuggingFace account first."

    # Instantiate the pipeline
    try:
        pipeline = Pipeline.from_pretrained(
            "pyannote/speech-separation-ami-1.0",
            use_auth_token=HUGGINGFACE_ACCESS_TOKEN,
        )
    except Exception as e:
        return [], f"Error loading pipeline: {str(e)}"

    waveform, sample_rate = torchaudio.load(audio_file_path)

    # Run the pipeline
    with ProgressHook() as hook:
        diarization, sources = pipeline(
            {"waveform": waveform, "sample_rate": sample_rate}, hook=hook
        )

    # Save separated sources to disk as SPEAKER_XX.wav files
    output_file_paths = []
    for s, speaker in enumerate(diarization.labels()):
        number_of_separated_sources = sources.data.shape[1]
        if s >= number_of_separated_sources:
            break

        output_file_path = f"{speaker}.wav"
        scipy.io.wavfile.write(
            output_file_path, sample_rate, sources.data[:, s].numpy()
        )
        output_file_paths.append(output_file_path)

    # Generate RTTM content
    rttm_content = diarization.to_rttm()

    return output_file_paths, rttm_content


def gradio_wrapper(audio_file_path: str, request: gr.Request):
    global HUGGINGFACE_ACCESS_TOKEN

    if not HUGGINGFACE_ACCESS_TOKEN:
        return [""] * 10 + ["Please log in with your HuggingFace account first."]

    output_file_paths, rttm_content = perform_separation(audio_file_path)
    return output_file_paths + [""] * (10 - len(output_file_paths)) + [rttm_content]


def login(request: gr.Request):
    global HUGGINGFACE_ACCESS_TOKEN

    if request.username:
        # User is authenticated
        HUGGINGFACE_ACCESS_TOKEN = request.auth
        return f"Welcome, {request.username}! You are now logged in."
    else:
        return "Please log in with your HuggingFace account to use this app."


with gr.Blocks() as demo:
    gr.Markdown("## Speech Separation and Diarization")
    gr.Markdown("Please log in with your HuggingFace account to use this app.")

    login_status = gr.Markdown()

    with gr.Row():
        input_audio = gr.Audio(label="Input Audio", type="filepath")

    with gr.Row():
        submit_button = gr.Button("Process Audio")

    outputs = []
    max_speakers = 10
    for i in range(max_speakers):
        outputs.append(gr.Audio(label=f"Speaker {i+1}", type="filepath"))

    rttm_output = gr.Textbox(label="RTTM Output")

    demo.load(login, inputs=None, outputs=login_status)
    submit_button.click(
        gradio_wrapper, inputs=[input_audio], outputs=outputs + [rttm_output]
    )

demo.launch(auth={"hf_oauth": True})