import gradio as gr import httpx import os import json import inspect import aiofiles import asyncio import tempfile import math from gradio.themes import colors, sizes, Font, GoogleFont, Origin limits = httpx.Limits(max_connections=100, max_keepalive_connections=20) HTTP_CLIENT = httpx.AsyncClient( http2=False, limits=limits, timeout=httpx.Timeout(30.0, pool=10.0) ) # --- SRT Generation Functions --- def format_srt_time(total_seconds): hours = math.floor(total_seconds / 3600) minutes = math.floor((total_seconds % 3600) / 60) seconds = math.floor(total_seconds % 60) milliseconds = round((total_seconds - math.floor(total_seconds)) * 1000) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def generate_srt_file(json_data): if not json_data or "segments" not in json_data or not json_data["segments"]: print("No segments to convert to SRT.") return None srt_content = "" for index, segment in enumerate(json_data["segments"]): sequence = index + 1 start_time = format_srt_time(segment['start']) end_time = format_srt_time(segment['end']) text = segment['text'].strip() srt_content += f"{sequence}\n{start_time} --> {end_time}\n{text}\n\n" try: with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.srt', encoding='utf-8') as temp_file: temp_file.write(srt_content) print(f"Temporary SRT file created at: {temp_file.name}") return temp_file.name except Exception as e: print(f"Error creating SRT file: {e}") return None # --- Tools for Chatbot --- def get_city_info(city: str): if "paris" in city.lower(): return json.dumps({"population": "2.1 million", "monument": "Eiffel Tower", "fact": "Paris is known as the 'City of Light'."}) elif "tokyo" in city.lower(): return json.dumps({"population": "14 million", "monument": "Tokyo Tower", "fact": "Tokyo is the largest metropolitan area in the world."}) else: return json.dumps({"error": f"Sorry, I don't have information about {city}."}) available_tools = {"get_city_info": get_city_info} tools_schema = [ {"type": "function", "function": {"name": "get_city_info", "description": "Get information about a specific city.", "parameters": {"type": "object", "properties": {"city": {"type": "string", "description": "The name of the city, e.g., 'Paris'."}}, "required": ["city"]}}} ] async def upload_file_to_public_service(filepath: str): if not filepath: return None url = "https://uguu.se/upload" try: async with aiofiles.open(filepath, 'rb') as f: content = await f.read() files = {'files[]': (os.path.basename(filepath), content)} response = await HTTP_CLIENT.post(url, files=files, timeout=30.0) response.raise_for_status() result = response.json() if "files" in result and result["files"] and "url" in result["files"][0]: full_url = result["files"][0]["url"] # print(f"File successfully uploaded: {full_url}") return full_url else: print(f"Upload API response error: {result}") return None except httpx.HTTPStatusError as e: print(f"HTTP error during upload: {e.response.status_code} - {e.response.text}") return None except httpx.RequestError as e: print(f"Connection error during upload: {e}") return None except (IOError, FileNotFoundError) as e: print(f"File read error: {e}") return None except (KeyError, IndexError) as e: print(f"Unexpected JSON response structure: {e}") return None async def handle_api_call(api_key, messages, model_chat): headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} api_url = "https://api.mistral.ai/v1/chat/completions" json_data = { "model": model_chat, "messages": messages, "tools": tools_schema, "tool_choice": "auto" } return await HTTP_CLIENT.post(api_url, headers=headers, json=json_data, timeout=60.0) async def handle_chat_submission(api_key, user_message, chat_history_api, audio_url, model_chat): if not api_key: user_content = [{"type": "text", "text": user_message}] if user_message else [] updated_history = chat_history_api + [{"role": "user", "content": user_content}, {"role": "assistant", "content": "Error: API key not configured."}] return updated_history, "" current_user_content = [] if audio_url: current_user_content.append({"type": "input_audio", "input_audio": {"data": audio_url, "format": "mp3"}}) if user_message: current_user_content.append({"type": "text", "text": user_message}) if not current_user_content: return chat_history_api, "" chat_history_api.append({"role": "user", "content": current_user_content}) try: response = await handle_api_call(api_key, chat_history_api, model_chat) if response.status_code != 200: error_msg = response.json().get("message", response.text) chat_history_api.append({"role": "assistant", "content": f"Error API: {error_msg}"}) return chat_history_api, "" assistant_message = response.json()['choices'][0]['message'] except httpx.HTTPStatusError as e: error_msg = e.response.json().get("message", e.response.text) chat_history_api.append({"role": "assistant", "content": f"Error API: {error_msg}"}) return chat_history_api, "" except httpx.RequestError as e: chat_history_api.append({"role": "assistant", "content": f"Connection error: {e}"}) return chat_history_api, "" if assistant_message.get("tool_calls"): chat_history_api.append(assistant_message) tool_call = assistant_message["tool_calls"][0] function_name = tool_call['function']['name'] function_args = json.loads(tool_call['function']['arguments']) if function_name in available_tools: tool_call_id = tool_call['id'] function_to_call = available_tools[function_name] if inspect.iscoroutinefunction(function_to_call): tool_output = await function_to_call(**function_args) else: tool_output = function_to_call(**function_args) chat_history_api.append({ "role": "tool", "tool_call_id": tool_call_id, "content": tool_output }) try: second_response = await handle_api_call(api_key, chat_history_api, model_chat) if second_response.status_code != 200: error_msg = second_response.json().get("message", second_response.text) chat_history_api.append({"role": "assistant", "content": f"Error API after tool call: {error_msg}"}) else: chat_history_api.append(second_response.json()['choices'][0]['message']) except httpx.HTTPStatusError as e: error_msg = e.response.json().get("message", e.response.text) chat_history_api.append({"role": "assistant", "content": f"Error API after tool call: {error_msg}"}) except httpx.RequestError as e: chat_history_api.append({"role": "assistant", "content": f"Connection error after tool call: {e}"}) else: chat_history_api.append({"role": "assistant", "content": f"Error: Unknown tool '{function_name}'."}) else: chat_history_api.append(assistant_message) return chat_history_api, "" def format_history_for_display(api_history): display_messages = [] for msg in api_history: if msg['role'] == 'user': text_content = "" has_audio = False if isinstance(msg.get('content'), list): text_part = next((part['text'] for part in msg['content'] if part['type'] == 'text'), None) if text_part: text_content = text_part if any(part['type'] == 'input_audio' for part in msg['content']): has_audio = True elif isinstance(msg.get('content'), str): text_content = msg['content'] display_content = f"🎤 {text_content}" if has_audio else text_content if display_content: display_messages.append({"role": "user", "content": display_content}) elif msg['role'] == 'assistant': if msg.get("tool_calls"): tool_call = msg["tool_calls"][0] func_name = tool_call['function']['name'] func_args = tool_call['function']['arguments'] tool_display_content = f"⚙️ *Calling tool `{func_name}` with arguments : `{func_args}`...*" display_messages.append({"role": "assistant", "content": tool_display_content}) elif msg.get('content'): display_messages.append({"role": "assistant", "content": msg['content']}) return display_messages async def transcribe_audio(api_key, source_type, audio_file_path, audio_url, add_timestamps, model_transcription): if not api_key: return {"error": "Please first enter your API key."} headers = {"Authorization": f"Bearer {api_key}"} api_url = "https://api.mistral.ai/v1/audio/transcriptions" try: payload = {'model': (None, model_transcription)} if add_timestamps: payload['timestamp_granularities'] = (None, 'segment') if source_type == "Upload a file": if not audio_file_path: return {"error": "Please upload an audio file."} async with aiofiles.open(audio_file_path, "rb") as f: content = await f.read() payload['file'] = (os.path.basename(audio_file_path), content, "audio/mpeg") response = await HTTP_CLIENT.post(api_url, headers=headers, files=payload, timeout=120.0) elif source_type == "Use a URL": if not audio_url: return {"error": "Please provide an audio URL."} payload['file_url'] = (None, audio_url) response = await HTTP_CLIENT.post(api_url, headers=headers, files=payload, timeout=120.0) else: return {"error": "Invalid source type."} response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: try: await e.response.aread() details = e.response.json().get("message", e.response.text) except Exception: details = e.response.text return {"error": f"API Error {e.response.status_code}", "details": details} except httpx.RequestError as e: return {"error": "Connection error", "details": str(e)} except IOError as e: return {"error": "File reading error", "details": str(e)} async def run_transcription_and_update_ui(api_key, source, file_path, url, timestamps, model_transcription): yield { transcription_button: gr.update(value="⏳ Transcription in progress...", interactive=False), transcription_status: gr.update(value="*Starting transcription...*", visible=True), transcription_output: gr.update(visible=False), download_zone: gr.update(visible=False), download_file_output: gr.update(value=None) } json_result = await transcribe_audio(api_key, source, file_path, url, timestamps, model_transcription) has_segments = isinstance(json_result, dict) and "segments" in json_result and json_result.get("segments") is_error = isinstance(json_result, dict) and "error" in json_result if is_error: error_title = json_result.get("error", "Unknown error") error_details = json_result.get("details", "No details available.") yield { transcription_status: gr.update(value=f"### ❌ {error_title}\n\n*_{error_details}_*", visible=True), transcription_button: gr.update(value="▶️ Start transcription", interactive=True) } elif has_segments: yield { transcription_status: gr.update(value="### ✔️ Transcription complete!", visible=True), transcription_output: gr.update(value=json_result, visible=True), download_zone: gr.update(visible=True), transcription_button: gr.update(value="▶️ Start transcription", interactive=True) } else: text_result = json_result.get('text', "No text detected.") yield { transcription_status: gr.update(value=f"### ⚠️ Partial result\n\n_{text_result}_", visible=True), transcription_output: gr.update(value=json_result, visible=True), download_zone: gr.update(visible=False), transcription_button: gr.update(value="▶️ Start transcription", interactive=True) } theme = gr.themes.Origin( primary_hue="orange", secondary_hue="gray", neutral_hue="zinc", text_size="md", spacing_size="md", radius_size="xxl", font=("Inter", "IBM Plex Sans", "ui-sans-serif", "system-ui", "sans-serif"), ).set( body_background_fill="#f7f8fa", block_background_fill="#fff", block_shadow="0 4px 24px 0 #0001, 0 1.5px 4px 0 #0001", block_border_width="1px", block_border_color="#ececec", button_primary_background_fill="#223a5e", button_primary_background_fill_hover="#1a2c47", button_primary_text_color="#fff", input_border_color="#e5e7eb", input_border_color_focus="#223a5e", input_background_fill="#fafbfc", input_shadow="0 0 0 2px #223a5e22", ) custom_css = """ .gradio-container label, .gradio-container .gr-button, .gradio-container .gr-button span, .gradio-container a { color: #FF6F3C !important; } .gradio-container .gr-button { border-color: #FF6F3C !important; background: #FF6F3C !important; color: #fff !important; } .gradio-container .gr-button:not([disabled]):hover { background: #fff !important; color: #FF6F3C !important; border: 2px solid #FF6F3C !important; } .gradio-container .gr-box, .gradio-container .gr-block { background: #f7f8fa !important; } .gradio-container .gr-input, .gradio-container .gr-textbox, .gradio-container .gr-text-input, .gradio-container .gr-file, .gradio-container .gr-audio { background: #f7f8fa !important; color: #223a5e !important; border: 1.2px solid #FF6F3C !important; } .gradio-container .gr-input::placeholder, .gradio-container .gr-textbox::placeholder, .gradio-container .gr-text-input::placeholder { color: #888 !important; } .gradio-container .gr-markdown, .gradio-container .gr-markdown p { color: #888 !important; } """ with gr.Blocks(theme=theme, title="Voxtral Pro", css=custom_css) as demo: gr.Markdown("""

Voxtral Pro

The all-in-one AI assistant for audio, text & productivity.
Fast and powerful.
""") api_history_state = gr.State([]) api_key_state = gr.State() # Dropdowns for model selection model_choices = ["voxtral-mini-2507", "voxtral-small-2507"] chat_model_state = gr.State("voxtral-mini-2507") transcription_model_state = gr.State("voxtral-mini-2507") with gr.Accordion("🔑 API Key Configuration", open=True): with gr.Row(): api_key_input = gr.Textbox(label="Mistral API Key", placeholder="Enter your API key here...", type="password", scale=6) chat_model_dropdown = gr.Dropdown(choices=model_choices, value="voxtral-mini-2507", label="Chat Model", scale=2) transcription_model_dropdown = gr.Dropdown(choices=model_choices, value="voxtral-mini-2507", label="Transcription Model", scale=2) save_api_key_button = gr.Button("Save Key", scale=1) api_key_status = gr.Markdown(value="*Please save your API key to use the application.*") gr.Markdown( "🔒 Security: Your API key is stored only in your browser session memory and is never sent to any server except Mistral's API. It is not saved or shared anywhere else.", elem_id="api-key-security-info" ) with gr.Tabs(): with gr.TabItem("💬 Multimodal Chatbot"): gr.Markdown("### Chat with text and audio files at any time.") chatbot_display = gr.Chatbot( label="Conversation", height=500, avatar_images=(None, "/Users/hasanbasbunar/voxtral-gradio/c29ca011-87ff-45b0-8236-08d629812732.svg"), type="messages" ) with gr.Row(): audio_input_files = gr.File( label="Drag and drop your audio files here", file_count="multiple", file_types=["audio"], elem_id="upload-box", scale=2, height=100 ) user_textbox = gr.Textbox( label="Your message", placeholder="Type your message here...", lines=2, scale=6, elem_id="user-message-box", ) mic_input = gr.Audio( label="Voice recording", sources=["microphone"], type="filepath", elem_classes="voice-recorder", scale=2, ) send_button = gr.Button("Send", variant="primary") clear_button = gr.Button("🗑️ Clear conversation", variant="secondary") with gr.TabItem("🎙️ Audio Transcription"): gr.Markdown("### Transcribe an audio file and export the result.") with gr.Row(variant="panel"): with gr.Column(scale=1): gr.Markdown("#### 1. Audio Source") source_type_transcription = gr.Radio(["Upload a file", "Use a URL"], label="Source type", value="Upload a file") audio_file_input = gr.Audio(type="filepath", label="Audio file", visible=True) audio_url_input = gr.Textbox(label="Audio file URL", placeholder="https://.../audio.mp3", visible=False) gr.Markdown("#### 2. Options") timestamp_checkbox = gr.Checkbox(label="Include timestamps (for .SRT)", value=True) transcription_button = gr.Button("▶️ Start transcription", variant="primary") with gr.Column(scale=2): gr.Markdown("#### 3. Results") transcription_status = gr.Markdown(visible=False) transcription_output = gr.JSON(label="Raw transcription data", visible=False) with gr.Group(visible=False) as download_zone: download_srt_button = gr.Button("💾 Download .srt file", variant="secondary") download_file_output = gr.File(label="Your file is ready:", interactive=False) def save_key(api_key): return api_key, "✅ API key saved." save_api_key_button.click(fn=save_key, inputs=[api_key_input], outputs=[api_key_state, api_key_status]) # Dropdown logic: update State when dropdown changes def update_chat_model(model): return model def update_transcription_model(model): return model chat_model_dropdown.change(fn=update_chat_model, inputs=[chat_model_dropdown], outputs=[chat_model_state]) transcription_model_dropdown.change(fn=update_transcription_model, inputs=[transcription_model_dropdown], outputs=[transcription_model_state]) async def on_submit(api_key, user_msg, api_history, uploaded_files, mic_file, chat_model): # 1. Check for API Key if not api_key: api_history.append({"role": "user", "content": user_msg or "..."}) api_history.append({"role": "assistant", "content": "Error: Please configure your API key."}) yield api_history, format_history_for_display(api_history), "", None, None return # 2. Collect all audio file paths all_filepaths = [] if uploaded_files: all_filepaths.extend(p.name for p in uploaded_files) if mic_file: all_filepaths.append(mic_file) # 3. Upload files in parallel and show loading state audio_urls_to_send = [] if all_filepaths: audio_count = len(all_filepaths) api_history.append({"role": "user", "content": user_msg or ""}) # Placeholder for display api_history.append({"role": "assistant", "content": f"⏳ *Uploading {audio_count} audio file{'s' if audio_count > 1 else ''}...*"}) yield api_history, format_history_for_display(api_history), user_msg, None, None upload_tasks = [upload_file_to_public_service(path) for path in all_filepaths] uploaded_urls = await asyncio.gather(*upload_tasks) audio_urls_to_send = [url for url in uploaded_urls if url] api_history.pop() # Remove loading message if len(audio_urls_to_send) != audio_count: api_history.append({"role": "assistant", "content": f"Error: Failed to upload {audio_count - len(audio_urls_to_send)} file(s)."}) yield api_history, format_history_for_display(api_history), user_msg, None, None return # 4. Construct the user message for the API current_user_content = [] for url in audio_urls_to_send: current_user_content.append({"type": "input_audio", "input_audio": {"data": url}}) if user_msg: current_user_content.append({"type": "text", "text": user_msg}) if not current_user_content: yield api_history, format_history_for_display(api_history), "", None, None return # If we had a placeholder, replace it. Otherwise, append. if all_filepaths: api_history[-1] = {"role": "user", "content": current_user_content} else: api_history.append({"role": "user", "content": current_user_content}) # 5. Call API and handle tool calls try: response = await handle_api_call(api_key, api_history, chat_model) response.raise_for_status() assistant_message = response.json()['choices'][0]['message'] api_history.append(assistant_message) if "tool_calls" in assistant_message and assistant_message["tool_calls"]: tool_call = assistant_message["tool_calls"][0] function_name = tool_call['function']['name'] if function_name in available_tools: function_args = json.loads(tool_call['function']['arguments']) tool_output = available_tools[function_name](**function_args) api_history.append({"role": "tool", "tool_call_id": tool_call['id'], "content": tool_output}) second_response = await handle_api_call(api_key, api_history, chat_model) second_response.raise_for_status() final_message = second_response.json()['choices'][0]['message'] api_history.append(final_message) except Exception as e: error_details = str(e) if hasattr(e, 'response') and e.response: error_details = e.response.text api_history.append({"role": "assistant", "content": f"API Error: {error_details}"}) # 6. Final UI update yield api_history, format_history_for_display(api_history), "", None, None chat_inputs = [api_key_state, user_textbox, api_history_state, audio_input_files, mic_input, chat_model_state] chat_outputs = [api_history_state, chatbot_display, user_textbox, audio_input_files, mic_input] send_button.click( fn=on_submit, inputs=chat_inputs, outputs=chat_outputs ) user_textbox.submit( fn=on_submit, inputs=chat_inputs, outputs=chat_outputs ) def clear_chat(): return [], [], "", None, None clear_button.click(fn=clear_chat, outputs=chat_outputs) all_transcription_outputs = [ transcription_button, transcription_status, transcription_output, download_zone, download_file_output ] transcription_button.click( fn=run_transcription_and_update_ui, inputs=[api_key_state, source_type_transcription, audio_file_input, audio_url_input, timestamp_checkbox, transcription_model_state], outputs=all_transcription_outputs ) download_srt_button.click( fn=generate_srt_file, inputs=[transcription_output], outputs=[download_file_output] ) def toggle_transcription_inputs(source_type): return gr.update(visible=source_type == "Upload a file"), gr.update(visible=source_type == "Use a URL") source_type_transcription.change(fn=toggle_transcription_inputs, inputs=source_type_transcription, outputs=[audio_file_input, audio_url_input]) if __name__ == "__main__": demo.queue(default_concurrency_limit=20, max_size=40) demo.launch(debug=False, max_threads=20)