import streamlit as st
import os
import json
import wave
import numpy as np
from datetime import timedelta
import base64
from io import BytesIO, StringIO
import tempfile
# Page configuration
st.set_page_config(
page_title="ASR Annotation Tool",
page_icon="🎤",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state
if 'annotation_type' not in st.session_state:
st.session_state.annotation_type = None
if 'audio_file' not in st.session_state:
st.session_state.audio_file = None
if 'transcript' not in st.session_state:
st.session_state.transcript = ""
if 'segments' not in st.session_state:
st.session_state.segments = []
if 'current_page' not in st.session_state:
st.session_state.current_page = "home"
if 'audio_duration' not in st.session_state:
st.session_state.audio_duration = 0
def get_audio_duration(audio_file):
"""Get audio duration in seconds"""
try:
with wave.open(audio_file, 'rb') as wav_file:
frames = wav_file.getnframes()
sample_rate = wav_file.getframerate()
duration = frames / float(sample_rate)
return duration
except:
return 0
def format_time(seconds):
"""Format seconds to HH:MM:SS.mmm"""
td = timedelta(seconds=seconds)
total_seconds = int(td.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
milliseconds = int((td.total_seconds() - total_seconds) * 1000)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}"
def create_audio_player_html(audio_data, audio_id="audio_player"):
"""Create HTML audio player with controls"""
audio_base64 = base64.b64encode(audio_data).decode()
html = f"""
"""
return html
def create_waveform_html(audio_data, segments=None):
"""Create interactive waveform with region selection"""
audio_base64 = base64.b64encode(audio_data).decode()
segments_json = json.dumps(segments or [])
html = f"""
Segments:
"""
return html
def generate_srt(segments, transcript):
"""Generate SRT format from segments and transcript"""
srt_content = ""
for i, segment in enumerate(segments):
start_time = format_srt_time(segment['start'])
end_time = format_srt_time(segment['end'])
# Extract corresponding text (simplified - in real app you'd need better text matching)
text = f"{segment['speaker_id']}: [Segment {i+1} text]"
srt_content += f"{i+1}\n"
srt_content += f"{start_time} --> {end_time}\n"
srt_content += f"{text}\n\n"
return srt_content
def format_srt_time(seconds):
"""Format time for SRT format (HH:MM:SS,mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millisecs = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millisecs:03d}"
def get_download_link(content, filename, label="Download file"):
"""Generate download link for text content"""
b64 = base64.b64encode(content.encode()).decode()
href = f'{label}'
return href
# Main App Layout
def main():
st.title("🎤 ASR Annotation Tool")
st.markdown("Simple tool for transcribing, segmenting, and annotating audio for ASR dataset creation.")
# Sidebar for navigation and settings
with st.sidebar:
st.header("Navigation")
if st.button("🏠 Home", use_container_width=True):
st.session_state.current_page = "home"
if st.session_state.audio_file and st.session_state.annotation_type:
if st.button("📝 Transcription", use_container_width=True):
st.session_state.current_page = "transcription"
if st.session_state.annotation_type == "multi_speaker" and st.session_state.transcript:
if st.button("🎯 Segmentation", use_container_width=True):
st.session_state.current_page = "segmentation"
if st.session_state.segments:
if st.button("📊 Assignment", use_container_width=True):
st.session_state.current_page = "assignment"
if st.session_state.current_page == "home":
show_home_page()
elif st.session_state.current_page == "transcription":
show_transcription_page()
elif st.session_state.current_page == "segmentation":
show_segmentation_page()
elif st.session_state.current_page == "assignment":
show_assignment_page()
def show_home_page():
"""Home page - annotation type selection and file upload"""
# Annotation type selection
st.subheader("1. Select Annotation Type")
annotation_type = st.radio(
"How many speakers are in your audio?",
["single_speaker", "multi_speaker"],
format_func=lambda x: "Single Speaker (Simple ASR)" if x == "single_speaker" else "Multi Speaker (Diarization)",
key="annotation_type_radio"
)
st.session_state.annotation_type = annotation_type
# File upload with better error handling
st.subheader("2. Upload Audio File")
# Add file size warning for Hugging Face Spaces
st.info("💡 **Tip for Hugging Face Spaces:** Large files (>10MB) may fail to upload. Try smaller audio files or compress your audio if you encounter issues.")
uploaded_file = st.file_uploader(
"Choose an audio file",
type=['wav', 'mp3', 'flac', 'm4a'],
help="Supported formats: WAV, MP3, FLAC, M4A"
)
if uploaded_file is not None:
st.session_state.audio_file = uploaded_file.read()
# Save temporary file to get duration
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
tmp_file.write(st.session_state.audio_file)
st.session_state.audio_duration = get_audio_duration(tmp_file.name)
os.unlink(tmp_file.name)
st.success(f"✅ Audio file uploaded successfully!")
st.info(f"Duration: {format_time(st.session_state.audio_duration)}")
# Show audio player
st.subheader("Audio Preview")
audio_html = create_audio_player_html(st.session_state.audio_file)
st.components.v1.html(audio_html, height=120)
# Continue button
if st.button("Continue to Transcription →", type="primary"):
st.session_state.current_page = "transcription"
st.rerun()
def show_transcription_page():
"""Transcription page - text annotation"""
st.header("📝 Text Transcription")
if not st.session_state.audio_file:
st.error("Please upload an audio file first!")
return
# Audio player
st.subheader("Audio Player")
audio_html = create_audio_player_html(st.session_state.audio_file)
st.components.v1.html(audio_html, height=120)
# Transcription area with callback for auto-saving
st.subheader("Transcript")
def update_transcript():
"""Callback function to update transcript in session state"""
st.session_state.transcript = st.session_state.transcript_input
transcript = st.text_area(
"Write your transcription here:",
value=st.session_state.transcript,
height=300,
help="Check the guidelines below to help you transcribe accurately.",
key="transcript_input",
on_change=update_transcript
)
# Guidelines reminder
with st.expander("📋 Transcription Guidelines"):
st.markdown("""
**Key Guidelines:**
- Transcribe EXACTLY what is said. No shortening or paraphrasing. (e.g saying "I do like" and "I'd like" are different)
- Use standard punctuation and capitalization (tip: Get punctuation from natural pauses in dialogue).
- Write numbers as digits.
- Ignore unclear speech or marked as [unclear] or [inaudible].
- For multi-speaker: transcribe all audible speech without identifying speakers.
""")
# Action buttons
col1, col2, col3 = st.columns(3)
with col1:
if st.session_state.transcript.strip():
download_link = get_download_link(st.session_state.transcript, "transcript.txt", "💾 Download Transcript")
st.markdown(download_link, unsafe_allow_html=True)
else:
st.button("💾 Download Transcript", disabled=True)
with col2:
if st.session_state.annotation_type == "multi_speaker" and st.session_state.transcript.strip():
if st.button("🎯 Continue to Segmentation →"):
st.session_state.current_page = "segmentation"
st.rerun()
with col3:
if st.session_state.annotation_type == "single_speaker" and st.session_state.transcript.strip():
if st.button("✅ Finish Annotation"):
st.balloons()
st.success("🎉 Single speaker annotation completed!")
download_link = get_download_link(st.session_state.transcript, "transcript.txt", "📥 Download Final Transcript")
st.markdown(download_link, unsafe_allow_html=True)
def show_segmentation_page():
"""Segmentation page - audio region selection"""
st.header("🎯 Audio Segmentation")
if not st.session_state.audio_file:
st.error("Please upload an audio file first!")
return
st.info("Click and drag on the waveform to create segments. Resize by dragging edges, remove with ✕ button.")
# Interactive waveform
waveform_html = create_waveform_html(st.session_state.audio_file)
st.components.v1.html(waveform_html, height=500)
# Manual segment addition
st.subheader("Manual Segment Addition")
st.info("After having segmented the wav using our wav surfer, you can manually add segments here. Don't hesitate to replay and pause.")
col1, col2, col3, col4 = st.columns(4)
with col1:
start_time = st.number_input("Start (seconds)", min_value=0.0, max_value=st.session_state.audio_duration, step=0.1)
with col2:
end_time = st.number_input("End (seconds)", min_value=0.0, max_value=st.session_state.audio_duration, step=0.1)
with col3:
speaker_id = st.text_input("Speaker ID", value="SPK001")
with col4:
if st.button("➕ Add Segment"):
if start_time < end_time:
new_segment = {
"start": start_time,
"end": end_time,
"speaker_id": speaker_id
}
st.session_state.segments.append(new_segment)
st.success("Segment added!")
st.rerun()
else:
st.error("End time must be greater than start time!")
# Current segments display
if st.session_state.segments:
st.subheader("Current Segments")
for i, segment in enumerate(st.session_state.segments):
col1, col2 = st.columns([4, 1])
with col1:
st.write(f"**Segment {i+1}:** {segment['speaker_id']} | {segment['start']:.2f}s - {segment['end']:.2f}s")
with col2:
if st.button("🗑️", key=f"remove_{i}"):
st.session_state.segments.pop(i)
st.rerun()
# Continue button
if st.session_state.segments:
if st.button("📊 Continue to Assignment →", type="primary"):
st.session_state.current_page = "assignment"
st.rerun()
def show_assignment_page():
"""Assignment page - text-to-segment mapping and final export"""
st.header("📊 Text-Segment Assignment")
if not st.session_state.segments:
st.error("Please create segments first!")
return
st.info("Assign portions of your text transcript to each audio segment to create the final annotation.")
# Display transcript
st.subheader("Original Transcript")
st.text_area("Reference transcript:", value=st.session_state.transcript, height=150, disabled=True)
# Segment assignment
st.subheader("Segment Text Assignment")
assigned_segments = []
for i, segment in enumerate(st.session_state.segments):
st.write(f"**Segment {i+1}:** {segment['speaker_id']} ({segment['start']:.2f}s - {segment['end']:.2f}s)")
segment_text = st.text_area(
f"Text for segment {i+1}:",
key=f"segment_text_{i}",
height=100,
help="Copy and paste the relevant portion of the text transcript for this segment"
)
assigned_segments.append({
**segment,
"text": segment_text
})
st.divider()
# Preview SRT
if st.button("🔍 Preview SRT"):
srt_preview = generate_srt_with_text(assigned_segments)
st.subheader("SRT Preview")
st.code(srt_preview, language="text")
# Final save
st.subheader("Download Final Annotation")
col1, col2 = st.columns(2)
with col1:
# Create enhanced transcript with speaker labels
enhanced_transcript = create_speaker_transcript(assigned_segments)
download_transcript = get_download_link(enhanced_transcript, "final_transcript.txt", "💾 Download Transcript")
st.markdown(download_transcript, unsafe_allow_html=True)
with col2:
srt_content = generate_srt_with_text(assigned_segments)
download_srt = get_download_link(srt_content, "final_transcript.srt", "💾 Download SRT")
st.markdown(download_srt, unsafe_allow_html=True)
if st.button("🎉 Finish Annotation", type="primary"):
st.balloons()
st.success("🎉 Yihawww or Youhouuuu Multi-speaker annotation completed!")
# Final downloads
st.subheader("Download your files:")
download_transcript = get_download_link(enhanced_transcript, "final_transcript.txt", "📥 Download Transcript")
download_srt = get_download_link(srt_content, "final_transcript.srt", "📥 Download SRT")
st.markdown(download_transcript, unsafe_allow_html=True)
st.markdown(download_srt, unsafe_allow_html=True)
if st.button("🔄 Back to Segmentation"):
st.session_state.current_page = "segmentation"
st.rerun()
def generate_srt_with_text(segments):
"""Generate SRT with actual text content"""
srt_content = ""
for i, segment in enumerate(segments):
start_time = format_srt_time(segment['start'])
end_time = format_srt_time(segment['end'])
text = segment.get('text', '').strip() or f"[Segment {i+1} - No text assigned]"
srt_content += f"{i+1}\n"
srt_content += f"{start_time} --> {end_time}\n"
srt_content += f"{segment['speaker_id']}: {text}\n\n"
return srt_content
def create_speaker_transcript(segments):
"""Create speaker-labeled transcript"""
transcript_lines = []
for segment in sorted(segments, key=lambda x: x['start']):
text = segment.get('text', '').strip()
if text:
transcript_lines.append(f"{segment['speaker_id']}: {text}")
return "\n\n".join(transcript_lines)
if __name__ == "__main__":
main()