|
import os |
|
import io |
|
import streamlit as st |
|
from groq import Groq |
|
import soundfile as sf |
|
import google.generativeai as genai |
|
from audiorecorder import audiorecorder |
|
|
|
|
|
groq_api_key = os.getenv('groqwhisper') |
|
gemini_api_key = os.getenv('geminiapi') |
|
|
|
if not groq_api_key or not gemini_api_key: |
|
st.error("Bitte setze die Umgebungsvariablen 'groqwhisper' und 'geminiapi'") |
|
st.stop() |
|
|
|
|
|
groq_client = Groq(api_key=groq_api_key) |
|
|
|
|
|
genai.configure(api_key=gemini_api_key) |
|
|
|
|
|
generation_config = { |
|
"temperature": 0.4, |
|
"top_p": 0.95, |
|
"top_k": 40, |
|
"max_output_tokens": 8192, |
|
"response_mime_type": "text/plain", |
|
} |
|
|
|
model = genai.GenerativeModel( |
|
model_name="gemini-2.0-flash-exp", |
|
generation_config=generation_config, |
|
) |
|
audio_bytes = st.audio_input("Click to record") |
|
|
|
if "chat_session" not in st.session_state: |
|
st.session_state.chat_session = model.start_chat(history=[]) |
|
|
|
if "display_history" not in st.session_state: |
|
st.session_state.display_history = [] |
|
|
|
def process_audio(audio_data): |
|
"""Verarbeitet Audiodaten und gibt Transkript zurück.""" |
|
try: |
|
sample_rate, samples = audio_data |
|
|
|
|
|
with io.BytesIO() as wav_buffer: |
|
sf.write(wav_buffer, samples, sample_rate, format='WAV') |
|
wav_buffer.seek(0) |
|
|
|
|
|
transcription = groq_client.audio.transcriptions.create( |
|
file=("recording.wav", wav_buffer.read(), "audio/wav"), |
|
model="whisper-large-v3-turbo", |
|
prompt="transcribe", |
|
language="de", |
|
response_format="json", |
|
temperature=0.0 |
|
) |
|
return transcription.text |
|
except Exception as e: |
|
return f"Fehler: {str(e)}" |
|
|
|
|
|
st.set_page_config( |
|
page_title="Gemini Chatbot mit Spracheingabe", |
|
page_icon="🤖" |
|
) |
|
st.title("Gemini Chatbot 🎤+📝") |
|
|
|
|
|
for role, text in st.session_state.display_history: |
|
with st.chat_message(role): |
|
st.markdown(text) |
|
|
|
|
|
audio_bytes = st.audio_input("Sprachnachricht aufnehmen") |
|
if audio_bytes: |
|
try: |
|
audio_content = audio_bytes.getvalue() |
|
with io.BytesIO(audio_content) as wav_io: |
|
samples, sample_rate = sf.read(wav_io) |
|
if len(samples.shape) > 1 and samples.shape[1] == 2: |
|
samples = samples.mean(axis=1) |
|
|
|
with st.spinner("Transkription..."): |
|
transcription = process_audio((sample_rate, samples)) |
|
|
|
if transcription: |
|
if transcription.startswith("Fehler:"): |
|
st.error(transcription) |
|
else: |
|
st.session_state.display_history.append(("user", transcription)) |
|
full_prompt = f"{transcription}\nAntworte immer auf Deutsch" |
|
response = st.session_state.chat_session.send_message(full_prompt) |
|
response_text = response.candidates[0].content.parts[0].text if response.candidates else "Keine Antwort" |
|
st.session_state.display_history.append(("assistant", response_text)) |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Audioprocessing fehlgeschlagen: {str(e)}") |
|
|
|
|
|
user_input = st.text_input("Schreibe deine Frage:", key="user_input") |
|
if user_input: |
|
st.session_state.display_history.append(("user", user_input)) |
|
full_prompt = f"{user_input}\nAntworte immer auf Deutsch" |
|
response = st.session_state.chat_session.send_message(full_prompt) |
|
response_text = response.candidates[0].content.parts[0].text if response.candidates else "Keine Antwort" |
|
st.session_state.display_history.append(("assistant", response_text)) |
|
st.rerun() |