from flask import Flask, render_template, request, jsonify |
from flask_socketio import SocketIO, emit, join_room, leave_room |
import os |
import requests |
import json |
import uuid |
from datetime import datetime |
from dotenv import load_dotenv |
import logging |
from werkzeug.utils import secure_filename |
import random |
import asyncio |
app = Flask(__name__) |
app.config['SECRET_KEY'] = os.urandom(24) |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True, async_mode='eventlet') |
load_dotenv() |
logging.basicConfig( |
level=logging.INFO, |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
) |
logger = logging.getLogger(__name__) |
class GameState: |
"""Manages the state of all active game sessions.""" |
def __init__(self): |
self.games = {} |
self.cleanup_interval = 3600 |
def create_game(self): |
"""Creates a new game session with proper initialization.""" |
try: |
game_id = str(uuid.uuid4()) |
self.games[game_id] = { |
'players': [], |
'current_phase': 'setup', |
'recordings': {}, |
'impostor': None, |
'votes': {}, |
'question': None, |
'impostor_answer': None, |
'modified_recording': None, |
'round_number': 1, |
'start_time': datetime.now().isoformat(), |
'completed_rounds': [], |
'score': {'impostor_wins': 0, 'player_wins': 0}, |
'room': game_id |
} |
logger.info(f"Successfully created game with ID: {game_id}") |
return game_id |
except Exception as e: |
logger.error(f"Error creating game: {str(e)}") |
raise |
def get_game(self, game_id): |
"""Safely retrieves a game by ID.""" |
game = self.games.get(game_id) |
if not game: |
logger.error(f"Game not found: {game_id}") |
raise ValueError("Game not found") |
return game |
def cleanup_inactive_games(self): |
"""Removes inactive game sessions.""" |
current_time = datetime.now() |
for game_id, game in list(self.games.items()): |
start_time = datetime.fromisoformat(game['start_time']) |
if (current_time - start_time).total_seconds() > 7200: |
del self.games[game_id] |
logger.info(f"Cleaned up inactive game: {game_id}") |
game_state = GameState() |
@app.route('/') |
def home(): |
"""Serves the main game page.""" |
return render_template('index.html') |
@socketio.on('connect') |
def handle_connect(): |
"""Handles client connection.""" |
logger.info(f"Client connected: {request.sid}") |
emit('connection_success', {'status': 'connected'}) |
@socketio.on('disconnect') |
def handle_disconnect(): |
"""Handles client disconnection.""" |
logger.info(f"Client disconnected: {request.sid}") |
@socketio.on('create_game') |
def handle_create_game(): |
"""Handles game creation request.""" |
try: |
game_id = game_state.create_game() |
join_room(game_id) |
logger.info(f"Created and joined game room: {game_id}") |
emit('game_created', { |
'gameId': game_id, |
'status': 'success' |
}) |
except Exception as e: |
logger.error(f"Error in game creation: {str(e)}") |
emit('game_error', { |
'error': 'Failed to create game', |
'details': str(e) |
}) |
@socketio.on('join_game') |
def handle_join_game(data): |
"""Handles player joining a game.""" |
try: |
game_id = data.get('gameId') |
player_name = data.get('playerName') |
if not game_id or not player_name: |
raise ValueError("Missing game ID or player name") |
game = game_state.get_game(game_id) |
if len(game['players']) >= 5: |
raise ValueError("Game is full") |
player_id = len(game['players']) + 1 |
player = { |
'id': player_id, |
'name': player_name, |
'socket_id': request.sid |
} |
game['players'].append(player) |
join_room(game_id) |
logger.info(f"Player {player_name} (ID: {player_id}) joined game {game_id}") |
emit('player_joined', { |
'playerId': player_id, |
'playerName': player_name, |
'status': 'success' |
}, room=game_id) |
except Exception as e: |
error_msg = str(e) |
logger.error(f"Error in handle_join_game: {error_msg}") |
emit('game_error', {'error': error_msg}) |
@app.route('/api/start_game', methods=['POST']) |
async def start_game(): |
"""Initializes a new game round.""" |
try: |
data = request.get_json() |
game_id = data.get('gameId') |
if not game_id: |
raise ValueError("Missing game ID") |
game = game_state.get_game(game_id) |
if len(game['players']) < 3: |
raise ValueError("Need at least 3 players to start") |
question = await generate_question() |
game['question'] = question |
game['current_phase'] = 'recording' |
logger.info(f"Started game {game_id} with question: {question}") |
socketio.emit('round_started', { |
'question': question |
}, room=game_id) |
return jsonify({ |
'status': 'success', |
'question': question |
}) |
except Exception as e: |
error_msg = str(e) |
logger.error(f"Error starting game: {error_msg}") |
return jsonify({ |
'status': 'error', |
'error': error_msg |
}), 500 |
async def generate_question(): |
"""Generates an engaging question using Mistral AI.""" |
try: |
headers = { |
'Authorization': f'Bearer {MISTRAL_API_KEY}', |
'Content-Type': 'application/json' |
} |
payload = { |
'messages': [{ |
'role': 'user', |
'content': '''Generate an engaging personal question for a social game. |
The question should: |
1. Encourage creative and unique responses |
2. Be open-ended but not too philosophical |
3. Be answerable in 15-30 seconds |
4. Be appropriate for all ages |
5. Spark interesting conversation |
Generate only the question, without any additional text.''' |
}] |
} |
response = requests.post( |
'https://api.mistral.ai/v1/chat/completions', |
headers=headers, |
json=payload, |
timeout=10 |
) |
if response.status_code == 200: |
question = response.json()['choices'][0]['message']['content'].strip() |
logger.info(f"Generated question: {question}") |
return question |
logger.error(f"Mistral API error: {response.status_code}") |
return random.choice([ |
"What's your favorite childhood memory?", |
"What's the most interesting place you've ever visited?", |
"What's a skill you'd love to master and why?", |
"What's the best piece of advice you've ever received?" |
]) |
except Exception as e: |
logger.error(f"Error generating question: {str(e)}") |
return "What is your favorite memory from your childhood?" |
@app.route('/api/submit_recording', methods=['POST']) |
async def submit_recording(): |
"""Handles voice recording submissions.""" |
try: |
game_id = request.form.get('gameId') |
player_id = request.form.get('playerId') |
audio_file = request.files.get('audio') |
if not all([game_id, player_id, audio_file]): |
raise ValueError("Missing required data") |
game = game_state.get_game(game_id) |
filename = secure_filename(f"recording_{game_id}_{player_id}.wav") |
filepath = os.path.join('temp', filename) |
audio_file.save(filepath) |
game['recordings'][player_id] = filepath |
logger.info(f"Saved recording for player {player_id} in game {game_id}") |
socketio.emit('recording_submitted', { |
'playerId': player_id, |
'status': 'success' |
}, room=game_id) |
return jsonify({'status': 'success'}) |
except Exception as e: |
error_msg = str(e) |
logger.error(f"Error submitting recording: {error_msg}") |
return jsonify({ |
'status': 'error', |
'error': error_msg |
}), 500 |
@socketio.on('submit_vote') |
def handle_vote(data): |
"""Processes player votes and determines round outcome.""" |
try: |
game_id = data.get('gameId') |
voter_id = data.get('voterId') |
vote_for = data.get('voteFor') |
if not all([game_id, voter_id, vote_for]): |
raise ValueError("Missing vote data") |
game = game_state.get_game(game_id) |
game['votes'][voter_id] = vote_for |
if len(game['votes']) == len(game['players']): |
votes_count = {} |
for vote in game['votes'].values(): |
votes_count[vote] = votes_count.get(vote, 0) + 1 |
most_voted = max(votes_count.items(), key=lambda x: x[1])[0] |
if most_voted == game['impostor']: |
game['score']['player_wins'] += 1 |
result = 'players_win' |
else: |
game['score']['impostor_wins'] += 1 |
result = 'impostor_wins' |
game['completed_rounds'].append({ |
'round_number': game['round_number'], |
'impostor': game['impostor'], |
'votes': game['votes'].copy(), |
'most_voted': most_voted, |
'result': result |
}) |
emit('round_result', { |
'impostor': game['impostor'], |
'most_voted': most_voted, |
'votes': game['votes'], |
'score': game['score'], |
'result': result |
}, room=game_id) |
logger.info(f"Round completed for game {game_id}. Result: {result}") |
except Exception as e: |
error_msg = str(e) |
logger.error(f"Error processing vote: {error_msg}") |
emit('game_error', {'error': error_msg}) |
if __name__ == '__main__': |
os.makedirs('temp', exist_ok=True) |
logger.info("Starting server...") |
socketio.run(app, host='', port=7860, debug=True) |