team_16trial / app.py
dcrey7's picture
Update app.py
f5365fb verified
raw
history blame
11.6 kB
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit, join_room, leave_room
import os
import requests
import json
import uuid
from datetime import datetime
from dotenv import load_dotenv
import logging
from werkzeug.utils import secure_filename
import random
import asyncio
# Initialize Flask and configure core settings
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# Initialize SocketIO with CORS support and logging
socketio = SocketIO(app, cors_allowed_origins="*", logger=True, engineio_logger=True, async_mode='eventlet')
# Load environment variables
load_dotenv()
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY')
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class GameState:
"""Manages the state of all active game sessions."""
def __init__(self):
self.games = {}
self.cleanup_interval = 3600
def create_game(self):
"""Creates a new game session with proper initialization."""
try:
game_id = str(uuid.uuid4())
self.games[game_id] = {
'players': [],
'current_phase': 'setup',
'recordings': {},
'impostor': None,
'votes': {},
'question': None,
'impostor_answer': None,
'modified_recording': None,
'round_number': 1,
'start_time': datetime.now().isoformat(),
'completed_rounds': [],
'score': {'impostor_wins': 0, 'player_wins': 0},
'room': game_id # Add room for socket management
}
logger.info(f"Successfully created game with ID: {game_id}")
return game_id
except Exception as e:
logger.error(f"Error creating game: {str(e)}")
raise
def get_game(self, game_id):
"""Safely retrieves a game by ID."""
game = self.games.get(game_id)
if not game:
logger.error(f"Game not found: {game_id}")
raise ValueError("Game not found")
return game
def cleanup_inactive_games(self):
"""Removes inactive game sessions."""
current_time = datetime.now()
for game_id, game in list(self.games.items()):
start_time = datetime.fromisoformat(game['start_time'])
if (current_time - start_time).total_seconds() > 7200: # 2 hours
del self.games[game_id]
logger.info(f"Cleaned up inactive game: {game_id}")
# Initialize global game state
game_state = GameState()
@app.route('/')
def home():
"""Serves the main game page."""
return render_template('index.html')
@socketio.on('connect')
def handle_connect():
"""Handles client connection."""
logger.info(f"Client connected: {request.sid}")
emit('connection_success', {'status': 'connected'})
@socketio.on('disconnect')
def handle_disconnect():
"""Handles client disconnection."""
logger.info(f"Client disconnected: {request.sid}")
@socketio.on('create_game')
def handle_create_game():
"""Handles game creation request."""
try:
game_id = game_state.create_game()
join_room(game_id) # Create socket room for the game
logger.info(f"Created and joined game room: {game_id}")
emit('game_created', {
'gameId': game_id,
'status': 'success'
})
except Exception as e:
logger.error(f"Error in game creation: {str(e)}")
emit('game_error', {
'error': 'Failed to create game',
'details': str(e)
})
@socketio.on('join_game')
def handle_join_game(data):
"""Handles player joining a game."""
try:
game_id = data.get('gameId')
player_name = data.get('playerName')
if not game_id or not player_name:
raise ValueError("Missing game ID or player name")
game = game_state.get_game(game_id)
# Validate player count
if len(game['players']) >= 5:
raise ValueError("Game is full")
# Add player to game
player_id = len(game['players']) + 1
player = {
'id': player_id,
'name': player_name,
'socket_id': request.sid
}
game['players'].append(player)
# Join socket room
join_room(game_id)
logger.info(f"Player {player_name} (ID: {player_id}) joined game {game_id}")
# Broadcast to all players in the game
emit('player_joined', {
'playerId': player_id,
'playerName': player_name,
'status': 'success'
}, room=game_id)
except Exception as e:
error_msg = str(e)
logger.error(f"Error in handle_join_game: {error_msg}")
emit('game_error', {'error': error_msg})
@app.route('/api/start_game', methods=['POST'])
async def start_game():
"""Initializes a new game round."""
try:
data = request.get_json()
game_id = data.get('gameId')
if not game_id:
raise ValueError("Missing game ID")
game = game_state.get_game(game_id)
# Validate player count
if len(game['players']) < 3:
raise ValueError("Need at least 3 players to start")
# Generate question using Mistral AI
question = await generate_question()
game['question'] = question
game['current_phase'] = 'recording'
logger.info(f"Started game {game_id} with question: {question}")
# Notify all players in the game room
socketio.emit('round_started', {
'question': question
}, room=game_id)
return jsonify({
'status': 'success',
'question': question
})
except Exception as e:
error_msg = str(e)
logger.error(f"Error starting game: {error_msg}")
return jsonify({
'status': 'error',
'error': error_msg
}), 500
async def generate_question():
"""Generates an engaging question using Mistral AI."""
try:
headers = {
'Authorization': f'Bearer {MISTRAL_API_KEY}',
'Content-Type': 'application/json'
}
payload = {
'messages': [{
'role': 'user',
'content': '''Generate an engaging personal question for a social game.
The question should:
1. Encourage creative and unique responses
2. Be open-ended but not too philosophical
3. Be answerable in 15-30 seconds
4. Be appropriate for all ages
5. Spark interesting conversation
Generate only the question, without any additional text.'''
}]
}
response = requests.post(
'https://api.mistral.ai/v1/chat/completions',
headers=headers,
json=payload,
timeout=10
)
if response.status_code == 200:
question = response.json()['choices'][0]['message']['content'].strip()
logger.info(f"Generated question: {question}")
return question
logger.error(f"Mistral API error: {response.status_code}")
return random.choice([
"What's your favorite childhood memory?",
"What's the most interesting place you've ever visited?",
"What's a skill you'd love to master and why?",
"What's the best piece of advice you've ever received?"
])
except Exception as e:
logger.error(f"Error generating question: {str(e)}")
return "What is your favorite memory from your childhood?"
@app.route('/api/submit_recording', methods=['POST'])
async def submit_recording():
"""Handles voice recording submissions."""
try:
game_id = request.form.get('gameId')
player_id = request.form.get('playerId')
audio_file = request.files.get('audio')
if not all([game_id, player_id, audio_file]):
raise ValueError("Missing required data")
game = game_state.get_game(game_id)
# Save the recording
filename = secure_filename(f"recording_{game_id}_{player_id}.wav")
filepath = os.path.join('temp', filename)
audio_file.save(filepath)
game['recordings'][player_id] = filepath
logger.info(f"Saved recording for player {player_id} in game {game_id}")
# Notify all players about the new recording
socketio.emit('recording_submitted', {
'playerId': player_id,
'status': 'success'
}, room=game_id)
return jsonify({'status': 'success'})
except Exception as e:
error_msg = str(e)
logger.error(f"Error submitting recording: {error_msg}")
return jsonify({
'status': 'error',
'error': error_msg
}), 500
@socketio.on('submit_vote')
def handle_vote(data):
"""Processes player votes and determines round outcome."""
try:
game_id = data.get('gameId')
voter_id = data.get('voterId')
vote_for = data.get('voteFor')
if not all([game_id, voter_id, vote_for]):
raise ValueError("Missing vote data")
game = game_state.get_game(game_id)
game['votes'][voter_id] = vote_for
# Check if all players have voted
if len(game['votes']) == len(game['players']):
# Calculate results
votes_count = {}
for vote in game['votes'].values():
votes_count[vote] = votes_count.get(vote, 0) + 1
most_voted = max(votes_count.items(), key=lambda x: x[1])[0]
# Update scores
if most_voted == game['impostor']:
game['score']['player_wins'] += 1
result = 'players_win'
else:
game['score']['impostor_wins'] += 1
result = 'impostor_wins'
# Store round results
game['completed_rounds'].append({
'round_number': game['round_number'],
'impostor': game['impostor'],
'votes': game['votes'].copy(),
'most_voted': most_voted,
'result': result
})
# Emit results to all players
emit('round_result', {
'impostor': game['impostor'],
'most_voted': most_voted,
'votes': game['votes'],
'score': game['score'],
'result': result
}, room=game_id)
logger.info(f"Round completed for game {game_id}. Result: {result}")
except Exception as e:
error_msg = str(e)
logger.error(f"Error processing vote: {error_msg}")
emit('game_error', {'error': error_msg})
if __name__ == '__main__':
# Create temporary directory for recordings
os.makedirs('temp', exist_ok=True)
# Start the server
logger.info("Starting server...")
socketio.run(app, host='0.0.0.0', port=7860, debug=True)