Spaces:
Running
Running
import streamlit as st | |
import asyncio | |
import websockets | |
import uuid | |
from datetime import datetime | |
import os | |
import random | |
import time | |
import hashlib | |
from PIL import Image | |
import glob | |
import base64 | |
import io | |
import streamlit.components.v1 as components | |
import edge_tts | |
from audio_recorder_streamlit import audio_recorder | |
import nest_asyncio | |
import re | |
import pytz | |
import anthropic | |
import openai | |
from PyPDF2 import PdfReader | |
import threading | |
import json | |
import zipfile | |
from gradio_client import Client | |
from dotenv import load_dotenv | |
from streamlit_marquee import streamlit_marquee | |
from collections import defaultdict, Counter | |
import pandas as pd | |
# Patch asyncio for nesting | |
nest_asyncio.apply() | |
# Page Config | |
st.set_page_config( | |
layout="wide", | |
page_title="Colorado Sim: Mountains, Memes, & Mayhem πποΈ", | |
page_icon="π¦" | |
) | |
# Static Config with Character-Driven Sim | |
Site_Name = "Colorado Sim Chat ποΈβ¨" | |
START_ROOM = "Rocky Mountain Hub π²" | |
FUN_USERNAMES = { | |
"Trailblazer Tim π": "en-US-GuyNeural", # Adventurous hiker | |
"Meme Queen Mia π": "en-US-JennyNeural", # Meme enthusiast | |
"Elk Whisperer Eve π¦": "en-GB-SoniaNeural", # Wildlife lover | |
"Tech Titan Tara πΎ": "en-AU-NatashaNeural", # Tech-savvy coder | |
"Ski Guru Sam β·οΈ": "en-CA-ClaraNeural", # Skiing pro | |
"Cosmic Camper Cal π ": "en-US-AriaNeural", # Stargazing storyteller | |
"Rasta Ranger Rick π": "en-GB-RyanNeural", # Chill outdoorsman | |
"Boulder Bro Ben πͺ¨": "en-AU-WilliamNeural" # Rock-climbing dude | |
} | |
EDGE_TTS_VOICES = list(set(FUN_USERNAMES.values())) | |
FILE_EMOJIS = {"md": "π", "mp3": "π΅", "png": "πΌοΈ", "mp4": "π₯", "zip": "π¦"} | |
# Directories | |
for d in ["chat_logs", "audio_logs", "audio_cache"]: | |
os.makedirs(d, exist_ok=True) | |
CHAT_DIR = "chat_logs" | |
AUDIO_CACHE_DIR = "audio_cache" | |
AUDIO_DIR = "audio_logs" | |
STATE_FILE = "user_state.txt" | |
CHAT_FILE = os.path.join(CHAT_DIR, "sim_chat.md") | |
# API Keys | |
load_dotenv() | |
anthropic_key = os.getenv('ANTHROPIC_API_KEY', st.secrets.get('ANTHROPIC_API_KEY', "")) | |
openai_api_key = os.getenv('OPENAI_API_KEY', st.secrets.get('OPENAI_API_KEY', "")) | |
openai_client = openai.OpenAI(api_key=openai_api_key) | |
# Timestamp Helper | |
def format_timestamp_prefix(username=""): | |
central = pytz.timezone('US/Central') | |
now = datetime.now(central) | |
return f"{now.strftime('%Y%m%d_%H%M%S')}-by-{username}" | |
# Session State Init | |
def init_session_state(): | |
defaults = { | |
'server_running': False, 'server_task': None, 'active_connections': {}, | |
'last_chat_update': 0, 'message_text': "", 'audio_cache': {}, | |
'transcript_history': [], 'last_transcript': "", 'tts_voice': "en-US-AriaNeural", | |
'chat_history': [], 'marquee_settings': { | |
"background": "#1E1E1E", "color": "#FFFFFF", "font-size": "14px", | |
"animationDuration": "20s", "width": "100%", "lineHeight": "35px" | |
}, 'username': None, 'autosend': True, 'autosearch': True, 'last_message': "", | |
'mp3_files': {}, 'timer_start': time.time(), 'auto_refresh': True, 'refresh_rate': 10 | |
} | |
for k, v in defaults.items(): | |
if k not in st.session_state: | |
st.session_state[k] = v | |
# Marquee Helpers | |
def update_marquee_settings_ui(): | |
st.sidebar.markdown("### π― Marquee Settings") | |
cols = st.sidebar.columns(2) | |
with cols[0]: | |
st.session_state['marquee_settings']['background'] = st.color_picker("π¨ Background", "#1E1E1E") | |
st.session_state['marquee_settings']['color'] = st.color_picker("βοΈ Text", "#FFFFFF") | |
with cols[1]: | |
st.session_state['marquee_settings']['font-size'] = f"{st.slider('π Size', 10, 24, 14)}px" | |
st.session_state['marquee_settings']['animationDuration'] = f"{st.slider('β±οΈ Speed', 1, 20, 20)}s" | |
def display_marquee(text, settings, key_suffix=""): | |
truncated = text[:280] + "..." if len(text) > 280 else text | |
streamlit_marquee(content=truncated, **settings, key=f"marquee_{key_suffix}") | |
st.write("") | |
# Text & File Helpers | |
def clean_text_for_tts(text): | |
return re.sub(r'[#*!\[\]]+', '', ' '.join(text.split()))[:200] or "No text" | |
def clean_text_for_filename(text): | |
return '_'.join(re.sub(r'[^\w\s-]', '', text.lower()).split())[:50] | |
def generate_filename(prompt, username, file_type="md"): | |
timestamp = format_timestamp_prefix(username) | |
hash_val = hashlib.md5(prompt.encode()).hexdigest()[:8] | |
return f"{timestamp}-{hash_val}.{file_type}" | |
def create_file(prompt, username, file_type="md"): | |
filename = generate_filename(prompt, username, file_type) | |
with open(filename, 'w', encoding='utf-8') as f: | |
f.write(prompt) | |
return filename | |
def get_download_link(file, file_type="mp3"): | |
with open(file, "rb") as f: | |
b64 = base64.b64encode(f.read()).decode() | |
mime_types = {"mp3": "audio/mpeg", "png": "image/png", "md": "text/markdown"} | |
return f'<a href="data:{mime_types.get(file_type, "application/octet-stream")};base64,{b64}" download="{os.path.basename(file)}">{FILE_EMOJIS.get(file_type, "Download")} Download {os.path.basename(file)}</a>' | |
def save_username(username): | |
with open(STATE_FILE, 'w') as f: | |
f.write(username) | |
def load_username(): | |
if os.path.exists(STATE_FILE): | |
with open(STATE_FILE, 'r') as f: | |
return f.read().strip() | |
return None | |
# Audio Processing | |
async def async_edge_tts_generate(text, voice, username): | |
cache_key = f"{text[:100]}_{voice}" | |
if cache_key in st.session_state['audio_cache']: | |
return st.session_state['audio_cache'][cache_key] | |
text = clean_text_for_tts(text) | |
filename = f"{format_timestamp_prefix(username)}-{hashlib.md5(text.encode()).hexdigest()[:8]}.mp3" | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(filename) | |
if os.path.exists(filename) and os.path.getsize(filename) > 0: | |
st.session_state['audio_cache'][cache_key] = filename | |
return filename | |
return None | |
def play_and_download_audio(file_path): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
st.markdown(get_download_link(file_path), unsafe_allow_html=True) | |
async def save_chat_entry(username, message, voice, is_markdown=False): | |
if not message.strip() or message == st.session_state.last_transcript: | |
return None, None | |
central = pytz.timezone('US/Central') | |
timestamp = datetime.now(central).strftime("%Y-%m-%d %H:%M:%S") | |
entry = f"[{timestamp}] {username} ({voice}): {message}" if not is_markdown else f"[{timestamp}] {username} ({voice}):\n```markdown\n{message}\n```" | |
md_file = create_file(entry, username, "md") | |
with open(CHAT_FILE, 'a') as f: | |
f.write(f"{entry}\n") | |
audio_file = await async_edge_tts_generate(message, voice, username) | |
await broadcast_message(f"{username}|{message}", "chat") | |
st.session_state.last_chat_update = time.time() | |
st.session_state.chat_history.append(entry) | |
st.session_state.last_transcript = message | |
return md_file, audio_file | |
async def load_chat(): | |
if not os.path.exists(CHAT_FILE): | |
with open(CHAT_FILE, 'a') as f: | |
f.write(f"# {START_ROOM} Chat\n\nWelcome to the Colorado Sim! ποΈ\n") | |
with open(CHAT_FILE, 'r') as f: | |
content = f.read().strip() | |
lines = content.split('\n') | |
return list(dict.fromkeys(line for line in lines if line.strip())) | |
# Claude and ArXiv Search | |
async def perform_claude_search(query, username): | |
client = anthropic.Anthropic(api_key=anthropic_key) | |
response = client.messages.create( | |
model="claude-3-sonnet-20240229", | |
max_tokens=1000, | |
messages=[{"role": "user", "content": query}] | |
) | |
result = response.content[0].text | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
full_text = f"Claude says: {result}" | |
md_file, audio_file = await save_chat_entry(username, full_text, voice, True) | |
return md_file, audio_file, result | |
async def perform_arxiv_search(query, username, claude_result=None): | |
if claude_result is None: | |
client = anthropic.Anthropic(api_key=anthropic_key) | |
claude_response = client.messages.create( | |
model="claude-3-sonnet-20240229", | |
max_tokens=1000, | |
messages=[{"role": "user", "content": query}] | |
) | |
claude_result = claude_response.content[0].text | |
enhanced_query = f"{query}\n\n{claude_result}" | |
gradio_client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
refs = gradio_client.predict( | |
enhanced_query, 10, "Semantic Search", "mistralai/Mixtral-8x7B-Instruct-v0.1", api_name="/update_with_rag_md" | |
)[0] | |
result = f"π ArXiv Results:\n\n{refs}" | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
md_file, audio_file = await save_chat_entry(username, result, voice, True) | |
return md_file, audio_file | |
# WebSocket Handling | |
async def websocket_handler(websocket, path): | |
client_id = str(uuid.uuid4()) | |
room_id = "chat" | |
if room_id not in st.session_state.active_connections: | |
st.session_state.active_connections[room_id] = {} | |
st.session_state.active_connections[room_id][client_id] = websocket | |
username = st.session_state.get('username', random.choice(list(FUN_USERNAMES.keys()))) | |
await save_chat_entry("System π", f"{username} has joined {START_ROOM}!", "en-US-AriaNeural") | |
try: | |
async for message in websocket: | |
if '|' in message: | |
username, content = message.split('|', 1) | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
await save_chat_entry(username, content, voice) | |
except websockets.ConnectionClosed: | |
await save_chat_entry("System π", f"{username} has left {START_ROOM}!", "en-US-AriaNeural") | |
finally: | |
if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: | |
del st.session_state.active_connections[room_id][client_id] | |
async def broadcast_message(message, room_id): | |
if room_id in st.session_state.active_connections: | |
disconnected = [] | |
for client_id, ws in st.session_state.active_connections[room_id].items(): | |
try: | |
await ws.send(message) | |
except websockets.ConnectionClosed: | |
disconnected.append(client_id) | |
for client_id in disconnected: | |
if client_id in st.session_state.active_connections[room_id]: | |
del st.session_state.active_connections[room_id][client_id] | |
async def run_websocket_server(): | |
if not st.session_state.get('server_running', False): | |
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) | |
st.session_state['server_running'] = True | |
await server.wait_closed() | |
def start_websocket_server(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(run_websocket_server()) | |
# Sim-Oriented Mad Libs | |
def generate_mad_libs_story(username, inputs): | |
story = f""" | |
In the wilds of Colorado, {username} stumbled upon {inputs['plural_noun']}. | |
These {inputs['adjective1']} critters were {inputs['verb_ing']} all over the Rockies, | |
freaking out climbers like {inputs['celebrity']}. | |
Holding a {inputs['object']}, {username} felt {inputs['emotion']} as a {inputs['animal']} | |
{inputs['verb_past']} past, yelling {inputs['exclamation']} near {inputs['place']}. | |
Later, munching {inputs['food']} with a {inputs['adjective2']} {inputs['body_part']}, | |
{username} looped β{inputs['song']}β {inputs['number']} times, | |
deciding to {inputs['verb_present']} as a {inputs['occupation']} | |
in a {inputs['color']} {inputs['noun']}βall ending with a {inputs['sound']}! | |
""" | |
return story.strip() | |
# Main Interface | |
def main(): | |
init_session_state() | |
saved_username = load_username() | |
if saved_username and saved_username in FUN_USERNAMES: | |
st.session_state.username = saved_username | |
if not st.session_state.username: | |
st.session_state.username = random.choice(list(FUN_USERNAMES.keys())) | |
st.session_state.tts_voice = FUN_USERNAMES[st.session_state.username] | |
asyncio.run(save_chat_entry("System π", f"{st.session_state.username} has joined {START_ROOM}!", "en-US-AriaNeural")) | |
save_username(st.session_state.username) | |
st.title(f"{Site_Name} - {st.session_state.username}") | |
st.subheader("Chat, Search, or Create a Colorado Sim Story! ππ²") | |
update_marquee_settings_ui() | |
chat_text = " ".join([line.split(": ")[-1] for line in asyncio.run(load_chat()) if ": " in line]) | |
display_marquee(f"ποΈ {START_ROOM} | {st.session_state.username} | {chat_text}", st.session_state['marquee_settings'], "welcome") | |
tab_main = st.radio("Action:", ["π€ Chat & Voice", "π Research", "β¨ Mad Libs Sim"], horizontal=True) | |
st.checkbox("Autosend Chat", key="autosend") | |
st.checkbox("Autosearch Research", key="autosearch") | |
if tab_main == "π€ Chat & Voice": | |
st.subheader(f"{START_ROOM} Chat π¬") | |
chat_content = asyncio.run(load_chat()) | |
with st.container(): | |
st.code("\n".join(f"{i+1}. {line}" for i, line in enumerate(chat_content)), language="python") | |
message = st.text_input(f"Message as {st.session_state.username}", key="message_input") | |
if message and message != st.session_state.last_message: | |
st.session_state.last_message = message | |
col_send, col_claude, col_arxiv = st.columns([1, 1, 1]) | |
with col_send: | |
if st.session_state.autosend or st.button("Send π"): | |
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural") | |
md_file, audio_file = asyncio.run(save_chat_entry(st.session_state.username, message, voice, True)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
st.rerun() | |
with col_claude: | |
if st.button("π§ Claude"): | |
md_file, audio_file, _ = asyncio.run(perform_claude_search(message, st.session_state.username)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
st.rerun() | |
with col_arxiv: | |
if st.button("π ArXiv"): | |
md_file, audio_file = asyncio.run(perform_arxiv_search(message, st.session_state.username)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
st.rerun() | |
elif tab_main == "π Research": | |
st.subheader("π Research with Claude & ArXiv") | |
q = st.text_input("Query:", key="research_query") | |
if q and (st.session_state.autosearch or st.button("π Run")): | |
st.session_state.last_message = q | |
md_file_claude, audio_file_claude, claude_result = asyncio.run(perform_claude_search(q, st.session_state.username)) | |
if audio_file_claude: | |
play_and_download_audio(audio_file_claude) | |
md_file_arxiv, audio_file_arxiv = asyncio.run(perform_arxiv_search(q, st.session_state.username, claude_result)) | |
if audio_file_arxiv: | |
play_and_download_audio(audio_file_arxiv) | |
elif tab_main == "β¨ Mad Libs Sim": | |
st.subheader("Create Your Colorado Sim Story!") | |
col1, col2 = st.columns(2) | |
inputs = {} | |
with col1: | |
inputs['plural_noun'] = st.text_input("1. Plural Noun", placeholder="e.g., 'tacos'") | |
inputs['adjective1'] = st.text_input("2. Adjective", placeholder="e.g., 'spicy'") | |
inputs['verb_ing'] = st.text_input("3. Verb ending in -ing", placeholder="e.g., 'yeeting'") | |
inputs['celebrity'] = st.text_input("4. Celebrity Name", placeholder="e.g., 'Elon Musk'") | |
inputs['object'] = st.text_input("5. Random Object", placeholder="e.g., 'toaster'") | |
inputs['emotion'] = st.text_input("6. Emotion", placeholder="e.g., 'salty'") | |
inputs['animal'] = st.text_input("7. Animal", placeholder="e.g., 'elk'") | |
inputs['verb_past'] = st.text_input("8. Verb (past tense)", placeholder="e.g., 'yeeted'") | |
inputs['place'] = st.text_input("9. Place", placeholder="e.g., 'Rockies'") | |
inputs['exclamation'] = st.text_input("10. Exclamation", placeholder="e.g., 'Bruh!'") | |
with col2: | |
inputs['food'] = st.text_input("11. Food Item", placeholder="e.g., 'pizza'") | |
inputs['adjective2'] = st.text_input("12. Adjective", placeholder="e.g., 'cringe'") | |
inputs['body_part'] = st.text_input("13. Body Part", placeholder="e.g., 'elbow'") | |
inputs['number'] = st.text_input("14. Number", placeholder="e.g., '69'") | |
inputs['song'] = st.text_input("15. Song Title", placeholder="e.g., 'Sweet Caroline'") | |
inputs['verb_present'] = st.text_input("16. Verb (present tense)", placeholder="e.g., 'slay'") | |
inputs['occupation'] = st.text_input("17. Occupation", placeholder="e.g., 'influencer'") | |
inputs['color'] = st.text_input("18. Color", placeholder="e.g., 'teal'") | |
inputs['noun'] = st.text_input("19. Noun", placeholder="e.g., 'chaos'") | |
inputs['sound'] = st.text_input("20. Silly Sound Effect", placeholder="e.g., 'Boop!'") | |
if st.button("Generate Sim Story! π"): | |
story = generate_mad_libs_story(st.session_state.username, inputs) | |
st.markdown("### Your Colorado Sim Adventure ποΈ") | |
st.write(story) | |
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural") | |
md_file, audio_file = asyncio.run(save_chat_entry(st.session_state.username, story, voice, True)) | |
if audio_file: | |
play_and_download_audio(audio_file) | |
# Sidebar | |
st.sidebar.subheader("Character Settings") | |
new_username = st.sidebar.selectbox("Switch Character", list(FUN_USERNAMES.keys()), index=list(FUN_USERNAMES.keys()).index(st.session_state.username)) | |
if new_username != st.session_state.username: | |
asyncio.run(save_chat_entry("System π", f"{st.session_state.username} switched to {new_username}", "en-US-AriaNeural")) | |
st.session_state.username = new_username | |
st.session_state.tts_voice = FUN_USERNAMES[new_username] | |
save_username(st.session_state.username) | |
st.rerun() | |
st.sidebar.subheader("Chat History") | |
chat_content = asyncio.run(load_chat()) | |
with st.sidebar.expander("Recent Logs"): | |
st.code("\n".join(f"{i+1}. {line}" for i, line in enumerate(chat_content[-10:])), language="python") | |
if not st.session_state.get('server_running', False): | |
st.session_state.server_task = threading.Thread(target=start_websocket_server, daemon=True) | |
st.session_state.server_task.start() | |
if __name__ == "__main__": | |
main() |