|
import os
|
|
import librosa
|
|
from transformers import pipeline
|
|
|
|
labels = {0: 'O',
|
|
1: 'B-DATE',
|
|
2: 'B-EVENT',
|
|
3: 'B-LOC',
|
|
4: 'B-ORG',
|
|
5: 'B-PER',
|
|
6: 'I-DATE',
|
|
7: 'I-EVENT',
|
|
8: 'I-LOC',
|
|
9: 'I-ORG',
|
|
10: 'I-PER'}
|
|
|
|
class AudioSpeechNERPipeline:
|
|
def __init__(self,
|
|
stt_model_name='abduaziz/whisper-small-uz',
|
|
ner_model_name='abduaziz/bert-ner-uz',
|
|
stt_language='uz'):
|
|
|
|
self.stt_pipeline = pipeline(
|
|
task="automatic-speech-recognition",
|
|
model=stt_model_name,
|
|
return_timestamps=True
|
|
)
|
|
|
|
self.ner_pipeline = pipeline(
|
|
task="ner",
|
|
model=ner_model_name
|
|
)
|
|
|
|
def chunk_audio(self, audio_path, chunk_duration=30):
|
|
"""
|
|
Chunk long audio files into 30-second segments
|
|
"""
|
|
|
|
audio, sample_rate = librosa.load(audio_path, sr=16000)
|
|
|
|
|
|
chunk_samples = chunk_duration * sample_rate
|
|
|
|
|
|
chunks = []
|
|
for start in range(0, len(audio), chunk_samples):
|
|
chunk = audio[start:start+chunk_samples]
|
|
chunks.append({
|
|
'array': chunk,
|
|
'sampling_rate': 16000
|
|
})
|
|
|
|
return chunks
|
|
|
|
def transcribe_audio(self, audio_path):
|
|
"""
|
|
Handle audio transcription for files longer than 30 seconds
|
|
"""
|
|
|
|
audio, sample_rate = librosa.load(audio_path, sr=16000)
|
|
|
|
|
|
if len(audio) / sample_rate > 30:
|
|
audio_chunks = self.chunk_audio(audio_path)
|
|
transcriptions = []
|
|
|
|
for chunk in audio_chunks:
|
|
|
|
chunk_transcription = self.stt_pipeline(chunk)
|
|
transcriptions.append(chunk_transcription['text'])
|
|
|
|
|
|
full_transcription = " ".join(transcriptions)
|
|
else:
|
|
|
|
full_transcription = self.stt_pipeline({
|
|
'array': audio,
|
|
'sampling_rate': 16000
|
|
})['text']
|
|
|
|
return full_transcription
|
|
|
|
def process_audio(self, audio_path):
|
|
|
|
transcription = self.transcribe_audio(audio_path)
|
|
|
|
|
|
entities = self.ner_pipeline(transcription)
|
|
|
|
return {
|
|
'filename': os.path.basename(audio_path),
|
|
'transcription': transcription,
|
|
'entities': entities
|
|
}
|
|
|
|
def create_ner_html(entities):
|
|
"""
|
|
Create HTML representation of named entities
|
|
"""
|
|
if not entities:
|
|
return "No named entities found."
|
|
|
|
html = "<div style='background-color:#f0f0f0; padding:10px; border-radius:5px;'>"
|
|
html += "<h3>Named Entities:</h3>"
|
|
html += "<table style='width:100%; border-collapse:collapse;'>"
|
|
html += "<tr><th style='border:1px solid #ddd; padding:8px;'>Word</th><th style='border:1px solid #ddd; padding:8px;'>Entity Type</th></tr>"
|
|
|
|
for entity in entities:
|
|
new_entity = labels[int(entity['entity'].split("_")[-1])]
|
|
html += f"<tr>" \
|
|
f"<td style='border:1px solid #ddd; padding:8px;'>{entity['word']}</td>" \
|
|
f"<td style='border:1px solid #ddd; padding:8px;'>{new_entity}</td>" \
|
|
f"</tr>"
|
|
|
|
html += "</table></div>"
|
|
return html
|
|
|
|
def process_audio_pipeline(audio):
|
|
"""
|
|
Gradio interface function to process audio
|
|
"""
|
|
|
|
pipeline = AudioSpeechNERPipeline()
|
|
|
|
try:
|
|
|
|
transcription, entities = pipeline.process_audio(audio)
|
|
|
|
|
|
entities_html = create_ner_html(entities)
|
|
|
|
return transcription, entities_html
|
|
|
|
except Exception as e:
|
|
return f"Error processing audio: {str(e)}", ""
|
|
|