from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit import os import requests import json import uuid from datetime import datetime from dotenv import load_dotenv import logging from werkzeug.utils import secure_filename import random import asyncio # Initialize Flask and configure core settings app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(24) # Generate a random secret key for security app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # Limit file uploads to 16MB # Initialize SocketIO with CORS support for development socketio = SocketIO(app, cors_allowed_origins="*") # Load environment variables from .env file load_dotenv() MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY') ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY') # Configure logging to track application behavior logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class GameState: """Manages the state of all active game sessions.""" def __init__(self): """Initialize the game state manager.""" self.games = {} # Dictionary to store all active games self.cleanup_interval = 3600 # Cleanup inactive games every hour def create_game(self): """Create a new game session with a unique identifier.""" game_id = str(uuid.uuid4()) self.games[game_id] = { 'players': [], 'current_phase': 'setup', 'recordings': {}, 'impostor': None, 'votes': {}, 'question': None, 'impostor_answer': None, 'modified_recording': None, 'round_number': 1, 'start_time': datetime.now().isoformat(), 'completed_rounds': [], 'score': {'impostor_wins': 0, 'player_wins': 0} } return game_id def cleanup_inactive_games(self): """Remove inactive game sessions older than 2 hours.""" current_time = datetime.now() inactive_threshold = 7200 # 2 hours in seconds for game_id, game in list(self.games.items()): start_time = datetime.fromisoformat(game['start_time']) if (current_time - start_time).total_seconds() > inactive_threshold: del self.games[game_id] # Initialize global game state game_state = GameState() async def generate_question(): """Generate an engaging question using Mistral AI.""" try: headers = { 'Authorization': f'Bearer {MISTRAL_API_KEY}', 'Content-Type': 'application/json' } # Craft a prompt that encourages interesting, personal questions payload = { 'messages': [{ 'role': 'user', 'content': '''Generate an engaging personal question for a social game. The question should: 1. Encourage creative and unique responses 2. Be open-ended but not too philosophical 3. Be answerable in 15-30 seconds 4. Be appropriate for all ages 5. Spark interesting conversation Generate only the question, without any additional text.''' }] } response = requests.post( 'https://api.mistral.ai/v1/chat/completions', headers=headers, json=payload, timeout=10 # Set timeout to handle slow responses ) if response.status_code == 200: question = response.json()['choices'][0]['message']['content'].strip() logger.info(f"Generated question: {question}") return question else: logger.error(f"Mistral API error: {response.status_code}") # Fallback questions if API fails fallback_questions = [ "What is your favorite childhood memory?", "What's the most interesting place you've ever visited?", "What's a skill you'd love to master and why?", "What's the best piece of advice you've ever received?" ] return random.choice(fallback_questions) except Exception as e: logger.error(f"Error generating question: {str(e)}") return "What is your favorite memory from your childhood?" async def generate_impostor_answer(question): """Generate a convincing impostor response using Mistral AI.""" try: headers = { 'Authorization': f'Bearer {MISTRAL_API_KEY}', 'Content-Type': 'application/json' } # Craft a detailed prompt for generating a natural response prompt = f'''Given the question "{question}", generate a detailed and convincing personal response. The response should: 1. Sound natural and conversational 2. Be 2-3 sentences long 3. Include specific details to sound authentic 4. Be suitable for text-to-speech conversion 5. Avoid complex words or punctuation that might affect voice synthesis Generate only the response, without any additional context.''' payload = { 'messages': [{ 'role': 'user', 'content': prompt }] } response = requests.post( 'https://api.mistral.ai/v1/chat/completions', headers=headers, json=payload, timeout=10 ) if response.status_code == 200: answer = response.json()['choices'][0]['message']['content'].strip() logger.info(f"Generated impostor answer: {answer}") return answer else: logger.error(f"Mistral API error generating answer: {response.status_code}") return "I have an interesting story about that, but I'd need more time to explain it properly." except Exception as e: logger.error(f"Error generating impostor answer: {str(e)}") return "I have an interesting story about that, but I'd need more time to explain it properly." async def clone_voice(audio_file): """Clone a voice using ElevenLabs API.""" try: headers = { 'xi-api-key': ELEVENLABS_API_KEY } with open(audio_file, 'rb') as f: files = { 'files': ('recording.wav', f, 'audio/wav') } response = requests.post( 'https://api.elevenlabs.io/v1/voices/add', headers=headers, files=files, timeout=30 ) if response.status_code == 200: voice_id = response.json().get('voice_id') logger.info(f"Successfully cloned voice: {voice_id}") return voice_id else: logger.error(f"ElevenLabs voice cloning error: {response.status_code}") return None except Exception as e: logger.error(f"Error cloning voice: {str(e)}") return None async def generate_cloned_speech(voice_id, text): """Generate speech using ElevenLabs with a cloned voice.""" try: headers = { 'xi-api-key': ELEVENLABS_API_KEY, 'Content-Type': 'application/json' } payload = { 'text': text, 'voice_settings': { 'stability': 0.75, 'similarity_boost': 0.75 } } response = requests.post( f'https://api.elevenlabs.io/v1/text-to-speech/{voice_id}', headers=headers, json=payload, timeout=30 ) if response.status_code == 200: filename = f"temp/impostor_audio_{uuid.uuid4()}.mp3" with open(filename, 'wb') as f: f.write(response.content) logger.info(f"Generated cloned speech: {filename}") return filename else: logger.error(f"ElevenLabs speech generation error: {response.status_code}") return None except Exception as e: logger.error(f"Error generating cloned speech: {str(e)}") return None @app.route('/') def home(): """Serve the main game page.""" return render_template('index.html') @app.route('/api/start_game', methods=['POST']) async def start_game(): """Initialize a new game session.""" try: data = request.get_json() game_id = data.get('game_id') if game_id not in game_state.games: return jsonify({'error': 'Game not found'}), 404 game = game_state.games[game_id] # Generate question for the round question = await generate_question() game['question'] = question game['current_phase'] = 'recording' return jsonify({ 'status': 'success', 'question': question }) except Exception as e: logger.error(f"Error starting game: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/submit_recording', methods=['POST']) async def submit_recording(): """Handle voice recording submissions.""" try: game_id = request.form.get('game_id') player_id = request.form.get('player_id') audio_file = request.files.get('audio') if not all([game_id, player_id, audio_file]): return jsonify({'error': 'Missing required data'}), 400 if game_id not in game_state.games: return jsonify({'error': 'Game not found'}), 404 # Save the recording filename = secure_filename(f"recording_{game_id}_{player_id}.wav") filepath = os.path.join('temp', filename) audio_file.save(filepath) game_state.games[game_id]['recordings'][player_id] = filepath return jsonify({'status': 'success'}) except Exception as e: logger.error(f"Error submitting recording: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/process_impostor', methods=['POST']) async def process_impostor(): """Handle the complete impostor voice generation process.""" try: data = request.get_json() game_id = data.get('game_id') impostor_id = data.get('impostor_id') if game_id not in game_state.games: return jsonify({'error': 'Game not found'}), 404 game = game_state.games[game_id] # Generate impostor's answer impostor_answer = await generate_impostor_answer(game['question']) game['impostor_answer'] = impostor_answer # Get impostor's original recording original_recording = game['recordings'].get(impostor_id) if not original_recording: return jsonify({'error': 'Original recording not found'}), 404 # Clone voice voice_id = await clone_voice(original_recording) if not voice_id: return jsonify({'error': 'Voice cloning failed'}), 500 # Generate modified speech audio_file = await generate_cloned_speech(voice_id, impostor_answer) if not audio_file: return jsonify({'error': 'Speech generation failed'}), 500 game['modified_recording'] = audio_file return jsonify({ 'status': 'success', 'audio_url': audio_file }) except Exception as e: logger.error(f"Error processing impostor: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 @socketio.on('join_game') def handle_join_game(data): """Handle a player joining the game.""" game_id = data.get('game_id') player_name = data.get('player_name') if game_id in game_state.games: game = game_state.games[game_id] if len(game['players']) < 5: player_id = len(game['players']) + 1 game['players'].append({ 'id': player_id, 'name': player_name }) emit('player_joined', { 'player_id': player_id, 'player_name': player_name }, broadcast=True, room=game_id) @socketio.on('submit_vote') def handle_vote(data): """Process player votes and determine round outcome.""" game_id = data.get('game_id') voter_id = data.get('voter_id') vote_for = data.get('vote_for') if game_id in game_state.games: game = game_state.games[game_id] game['votes'][voter_id] = vote_for # Check if all players have voted if len(game['votes']) == len(game['players']): # Calculate results votes_count = {} for vote in game['votes'].values(): votes_count[vote] = votes_count.get(vote, 0) + 1 most_voted = max(votes_count.items(), key=lambda x: x[1])[0] # Update scores if most_voted == game['impostor']: game['score']['player_wins'] += 1 else: game['score']['impostor_wins'] += 1 # Store round results game['completed_rounds'].append({ 'round_number': game['round_number'], 'impostor': game['impostor'], 'votes': game['votes'].copy(), 'most_voted': most_voted }) # Emit results emit('round_result', { 'impostor': game['impostor'], 'most_voted': most_voted, 'votes': game['votes'], 'score': game['score'] }, broadcast=True, room=game_id) if __name__ == '__main__': # Create temporary directory for recordings if it doesn't exist os.makedirs('temp', exist_ok=True) # Start the server socketio.run(app, host='0.0.0.0', port=7860, debug=True)