File size: 4,294 Bytes
f122ddf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import os
import librosa
from transformers import pipeline
labels = {0: 'O',
1: 'B-DATE',
2: 'B-EVENT',
3: 'B-LOC',
4: 'B-ORG',
5: 'B-PER',
6: 'I-DATE',
7: 'I-EVENT',
8: 'I-LOC',
9: 'I-ORG',
10: 'I-PER'}
class AudioSpeechNERPipeline:
def __init__(self,
stt_model_name='abduaziz/whisper-small-uz',
ner_model_name='abduaziz/bert-ner-uz',
stt_language='uz'):
# Initialize Speech-to-Text pipeline with timestamp support
self.stt_pipeline = pipeline(
task="automatic-speech-recognition",
model=stt_model_name,
return_timestamps=True # Enable timestamp support
)
# Initialize NER pipeline
self.ner_pipeline = pipeline(
task="ner",
model=ner_model_name
)
def chunk_audio(self, audio_path, chunk_duration=30):
"""
Chunk long audio files into 30-second segments
"""
# Load audio file
audio, sample_rate = librosa.load(audio_path, sr=16000)
# Calculate chunk size
chunk_samples = chunk_duration * sample_rate
# Create chunks
chunks = []
for start in range(0, len(audio), chunk_samples):
chunk = audio[start:start+chunk_samples]
chunks.append({
'array': chunk,
'sampling_rate': 16000
})
return chunks
def transcribe_audio(self, audio_path):
"""
Handle audio transcription for files longer than 30 seconds
"""
# Check audio length
audio, sample_rate = librosa.load(audio_path, sr=16000)
# If audio is longer than 30 seconds, chunk it
if len(audio) / sample_rate > 30:
audio_chunks = self.chunk_audio(audio_path)
transcriptions = []
for chunk in audio_chunks:
# Transcribe each chunk
chunk_transcription = self.stt_pipeline(chunk)
transcriptions.append(chunk_transcription['text'])
# Combine transcriptions
full_transcription = " ".join(transcriptions)
else:
# Process audio normally for short files
full_transcription = self.stt_pipeline({
'array': audio,
'sampling_rate': 16000
})['text']
return full_transcription
def process_audio(self, audio_path):
# Transcribe audio
transcription = self.transcribe_audio(audio_path)
# Extract named entities
entities = self.ner_pipeline(transcription)
return {
'filename': os.path.basename(audio_path),
'transcription': transcription,
'entities': entities
}
def create_ner_html(entities):
"""
Create HTML representation of named entities
"""
if not entities:
return "No named entities found."
html = "<div style='background-color:#f0f0f0; padding:10px; border-radius:5px;'>"
html += "<h3>Named Entities:</h3>"
html += "<table style='width:100%; border-collapse:collapse;'>"
html += "<tr><th style='border:1px solid #ddd; padding:8px;'>Word</th><th style='border:1px solid #ddd; padding:8px;'>Entity Type</th></tr>"
for entity in entities:
new_entity = labels[int(entity['entity'].split("_")[-1])]
html += f"<tr>" \
f"<td style='border:1px solid #ddd; padding:8px;'>{entity['word']}</td>" \
f"<td style='border:1px solid #ddd; padding:8px;'>{new_entity}</td>" \
f"</tr>"
html += "</table></div>"
return html
def process_audio_pipeline(audio):
"""
Gradio interface function to process audio
"""
# Initialize pipeline
pipeline = AudioSpeechNERPipeline()
try:
# Process the audio
transcription, entities = pipeline.process_audio(audio)
# Create HTML for entities
entities_html = create_ner_html(entities)
return transcription, entities_html
except Exception as e:
return f"Error processing audio: {str(e)}", ""
|