Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify | |
from flask_socketio import SocketIO, emit, join_room, leave_room | |
import os | |
import requests | |
import json | |
import uuid | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import logging | |
from werkzeug.utils import secure_filename | |
import random | |
import asyncio | |
# Initialize Flask and configure core settings | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = os.urandom(24) | |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 | |
# Initialize SocketIO with CORS support and logging | |
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True, async_mode='eventlet') | |
# Load environment variables | |
load_dotenv() | |
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY') | |
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY') | |
# Configure logging with more detailed format | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class GameState: | |
"""Manages the state of all active game sessions.""" | |
def __init__(self): | |
self.games = {} | |
self.cleanup_interval = 3600 | |
def create_game(self): | |
"""Creates a new game session with proper initialization.""" | |
try: | |
game_id = str(uuid.uuid4()) | |
self.games[game_id] = { | |
'players': [], | |
'current_phase': 'setup', | |
'recordings': {}, | |
'impostor': None, | |
'votes': {}, | |
'question': None, | |
'impostor_answer': None, | |
'modified_recording': None, | |
'round_number': 1, | |
'start_time': datetime.now().isoformat(), | |
'completed_rounds': [], | |
'score': {'impostor_wins': 0, 'player_wins': 0}, | |
'room': game_id # Add room for socket management | |
} | |
logger.info(f"Successfully created game with ID: {game_id}") | |
return game_id | |
except Exception as e: | |
logger.error(f"Error creating game: {str(e)}") | |
raise | |
def get_game(self, game_id): | |
"""Safely retrieves a game by ID.""" | |
game = self.games.get(game_id) | |
if not game: | |
logger.error(f"Game not found: {game_id}") | |
raise ValueError("Game not found") | |
return game | |
def cleanup_inactive_games(self): | |
"""Removes inactive game sessions.""" | |
current_time = datetime.now() | |
for game_id, game in list(self.games.items()): | |
start_time = datetime.fromisoformat(game['start_time']) | |
if (current_time - start_time).total_seconds() > 7200: # 2 hours | |
del self.games[game_id] | |
logger.info(f"Cleaned up inactive game: {game_id}") | |
# Initialize global game state | |
game_state = GameState() | |
def home(): | |
"""Serves the main game page.""" | |
return render_template('index.html') | |
def handle_connect(): | |
"""Handles client connection.""" | |
logger.info(f"Client connected: {request.sid}") | |
emit('connection_success', {'status': 'connected'}) | |
def handle_disconnect(): | |
"""Handles client disconnection.""" | |
logger.info(f"Client disconnected: {request.sid}") | |
def handle_create_game(): | |
"""Handles game creation request.""" | |
try: | |
game_id = game_state.create_game() | |
join_room(game_id) # Create socket room for the game | |
logger.info(f"Created and joined game room: {game_id}") | |
emit('game_created', { | |
'gameId': game_id, | |
'status': 'success' | |
}) | |
except Exception as e: | |
logger.error(f"Error in game creation: {str(e)}") | |
emit('game_error', { | |
'error': 'Failed to create game', | |
'details': str(e) | |
}) | |
def handle_join_game(data): | |
"""Handles player joining a game.""" | |
try: | |
game_id = data.get('gameId') | |
player_name = data.get('playerName') | |
if not game_id or not player_name: | |
raise ValueError("Missing game ID or player name") | |
game = game_state.get_game(game_id) | |
# Validate player count | |
if len(game['players']) >= 5: | |
raise ValueError("Game is full") | |
# Add player to game | |
player_id = len(game['players']) + 1 | |
player = { | |
'id': player_id, | |
'name': player_name, | |
'socket_id': request.sid | |
} | |
game['players'].append(player) | |
# Join socket room | |
join_room(game_id) | |
logger.info(f"Player {player_name} (ID: {player_id}) joined game {game_id}") | |
# Broadcast to all players in the game | |
emit('player_joined', { | |
'playerId': player_id, | |
'playerName': player_name, | |
'status': 'success' | |
}, room=game_id) | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Error in handle_join_game: {error_msg}") | |
emit('game_error', {'error': error_msg}) | |
async def start_game(): | |
"""Initializes a new game round.""" | |
try: | |
data = request.get_json() | |
game_id = data.get('gameId') | |
if not game_id: | |
raise ValueError("Missing game ID") | |
game = game_state.get_game(game_id) | |
# Validate player count | |
if len(game['players']) < 3: | |
raise ValueError("Need at least 3 players to start") | |
# Generate question using Mistral AI | |
question = await generate_question() | |
game['question'] = question | |
game['current_phase'] = 'recording' | |
logger.info(f"Started game {game_id} with question: {question}") | |
# Notify all players in the game room | |
socketio.emit('round_started', { | |
'question': question | |
}, room=game_id) | |
return jsonify({ | |
'status': 'success', | |
'question': question | |
}) | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Error starting game: {error_msg}") | |
return jsonify({ | |
'status': 'error', | |
'error': error_msg | |
}), 500 | |
async def generate_question(): | |
"""Generates an engaging question using Mistral AI.""" | |
try: | |
headers = { | |
'Authorization': f'Bearer {MISTRAL_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
'messages': [{ | |
'role': 'user', | |
'content': '''Generate an engaging personal question for a social game. | |
The question should: | |
1. Encourage creative and unique responses | |
2. Be open-ended but not too philosophical | |
3. Be answerable in 15-30 seconds | |
4. Be appropriate for all ages | |
5. Spark interesting conversation | |
Generate only the question, without any additional text.''' | |
}] | |
} | |
response = requests.post( | |
'https://api.mistral.ai/v1/chat/completions', | |
headers=headers, | |
json=payload, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
question = response.json()['choices'][0]['message']['content'].strip() | |
logger.info(f"Generated question: {question}") | |
return question | |
logger.error(f"Mistral API error: {response.status_code}") | |
return random.choice([ | |
"What's your favorite childhood memory?", | |
"What's the most interesting place you've ever visited?", | |
"What's a skill you'd love to master and why?", | |
"What's the best piece of advice you've ever received?" | |
]) | |
except Exception as e: | |
logger.error(f"Error generating question: {str(e)}") | |
return "What is your favorite memory from your childhood?" | |
async def submit_recording(): | |
"""Handles voice recording submissions.""" | |
try: | |
game_id = request.form.get('gameId') | |
player_id = request.form.get('playerId') | |
audio_file = request.files.get('audio') | |
if not all([game_id, player_id, audio_file]): | |
raise ValueError("Missing required data") | |
game = game_state.get_game(game_id) | |
# Save the recording | |
filename = secure_filename(f"recording_{game_id}_{player_id}.wav") | |
filepath = os.path.join('temp', filename) | |
audio_file.save(filepath) | |
game['recordings'][player_id] = filepath | |
logger.info(f"Saved recording for player {player_id} in game {game_id}") | |
# Notify all players about the new recording | |
socketio.emit('recording_submitted', { | |
'playerId': player_id, | |
'status': 'success' | |
}, room=game_id) | |
return jsonify({'status': 'success'}) | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Error submitting recording: {error_msg}") | |
return jsonify({ | |
'status': 'error', | |
'error': error_msg | |
}), 500 | |
def handle_vote(data): | |
"""Processes player votes and determines round outcome.""" | |
try: | |
game_id = data.get('gameId') | |
voter_id = data.get('voterId') | |
vote_for = data.get('voteFor') | |
if not all([game_id, voter_id, vote_for]): | |
raise ValueError("Missing vote data") | |
game = game_state.get_game(game_id) | |
game['votes'][voter_id] = vote_for | |
# Check if all players have voted | |
if len(game['votes']) == len(game['players']): | |
# Calculate results | |
votes_count = {} | |
for vote in game['votes'].values(): | |
votes_count[vote] = votes_count.get(vote, 0) + 1 | |
most_voted = max(votes_count.items(), key=lambda x: x[1])[0] | |
# Update scores | |
if most_voted == game['impostor']: | |
game['score']['player_wins'] += 1 | |
result = 'players_win' | |
else: | |
game['score']['impostor_wins'] += 1 | |
result = 'impostor_wins' | |
# Store round results | |
game['completed_rounds'].append({ | |
'round_number': game['round_number'], | |
'impostor': game['impostor'], | |
'votes': game['votes'].copy(), | |
'most_voted': most_voted, | |
'result': result | |
}) | |
# Emit results to all players | |
emit('round_result', { | |
'impostor': game['impostor'], | |
'most_voted': most_voted, | |
'votes': game['votes'], | |
'score': game['score'], | |
'result': result | |
}, room=game_id) | |
logger.info(f"Round completed for game {game_id}. Result: {result}") | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Error processing vote: {error_msg}") | |
emit('game_error', {'error': error_msg}) | |
if __name__ == '__main__': | |
# Create temporary directory for recordings | |
os.makedirs('temp', exist_ok=True) | |
# Start the server | |
logger.info("Starting server...") | |
socketio.run(app, host='0.0.0.0', port=7860, debug=True) |