team_16trial / app.py
dcrey7's picture
Upload 9 files
e30257d verified
raw
history blame
14.6 kB
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
import os
import requests
import json
import uuid
from datetime import datetime
from dotenv import load_dotenv
import logging
from werkzeug.utils import secure_filename
import random
import asyncio
# Initialize Flask and configure core settings
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24) # Generate a random secret key for security
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # Limit file uploads to 16MB
# Initialize SocketIO with CORS support for development
socketio = SocketIO(app, cors_allowed_origins="*")
# Load environment variables from .env file
load_dotenv()
MISTRAL_API_KEY = os.getenv('MISTRAL_API_KEY')
ELEVENLABS_API_KEY = os.getenv('ELEVENLABS_API_KEY')
# Configure logging to track application behavior
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GameState:
"""Manages the state of all active game sessions."""
def __init__(self):
"""Initialize the game state manager."""
self.games = {} # Dictionary to store all active games
self.cleanup_interval = 3600 # Cleanup inactive games every hour
def create_game(self):
"""Create a new game session with a unique identifier."""
game_id = str(uuid.uuid4())
self.games[game_id] = {
'players': [],
'current_phase': 'setup',
'recordings': {},
'impostor': None,
'votes': {},
'question': None,
'impostor_answer': None,
'modified_recording': None,
'round_number': 1,
'start_time': datetime.now().isoformat(),
'completed_rounds': [],
'score': {'impostor_wins': 0, 'player_wins': 0}
}
return game_id
def cleanup_inactive_games(self):
"""Remove inactive game sessions older than 2 hours."""
current_time = datetime.now()
inactive_threshold = 7200 # 2 hours in seconds
for game_id, game in list(self.games.items()):
start_time = datetime.fromisoformat(game['start_time'])
if (current_time - start_time).total_seconds() > inactive_threshold:
del self.games[game_id]
# Initialize global game state
game_state = GameState()
async def generate_question():
"""Generate an engaging question using Mistral AI."""
try:
headers = {
'Authorization': f'Bearer {MISTRAL_API_KEY}',
'Content-Type': 'application/json'
}
# Craft a prompt that encourages interesting, personal questions
payload = {
'messages': [{
'role': 'user',
'content': '''Generate an engaging personal question for a social game.
The question should:
1. Encourage creative and unique responses
2. Be open-ended but not too philosophical
3. Be answerable in 15-30 seconds
4. Be appropriate for all ages
5. Spark interesting conversation
Generate only the question, without any additional text.'''
}]
}
response = requests.post(
'https://api.mistral.ai/v1/chat/completions',
headers=headers,
json=payload,
timeout=10 # Set timeout to handle slow responses
)
if response.status_code == 200:
question = response.json()['choices'][0]['message']['content'].strip()
logger.info(f"Generated question: {question}")
return question
else:
logger.error(f"Mistral API error: {response.status_code}")
# Fallback questions if API fails
fallback_questions = [
"What is your favorite childhood memory?",
"What's the most interesting place you've ever visited?",
"What's a skill you'd love to master and why?",
"What's the best piece of advice you've ever received?"
]
return random.choice(fallback_questions)
except Exception as e:
logger.error(f"Error generating question: {str(e)}")
return "What is your favorite memory from your childhood?"
async def generate_impostor_answer(question):
"""Generate a convincing impostor response using Mistral AI."""
try:
headers = {
'Authorization': f'Bearer {MISTRAL_API_KEY}',
'Content-Type': 'application/json'
}
# Craft a detailed prompt for generating a natural response
prompt = f'''Given the question "{question}", generate a detailed and convincing personal response.
The response should:
1. Sound natural and conversational
2. Be 2-3 sentences long
3. Include specific details to sound authentic
4. Be suitable for text-to-speech conversion
5. Avoid complex words or punctuation that might affect voice synthesis
Generate only the response, without any additional context.'''
payload = {
'messages': [{
'role': 'user',
'content': prompt
}]
}
response = requests.post(
'https://api.mistral.ai/v1/chat/completions',
headers=headers,
json=payload,
timeout=10
)
if response.status_code == 200:
answer = response.json()['choices'][0]['message']['content'].strip()
logger.info(f"Generated impostor answer: {answer}")
return answer
else:
logger.error(f"Mistral API error generating answer: {response.status_code}")
return "I have an interesting story about that, but I'd need more time to explain it properly."
except Exception as e:
logger.error(f"Error generating impostor answer: {str(e)}")
return "I have an interesting story about that, but I'd need more time to explain it properly."
async def clone_voice(audio_file):
"""Clone a voice using ElevenLabs API."""
try:
headers = {
'xi-api-key': ELEVENLABS_API_KEY
}
with open(audio_file, 'rb') as f:
files = {
'files': ('recording.wav', f, 'audio/wav')
}
response = requests.post(
'https://api.elevenlabs.io/v1/voices/add',
headers=headers,
files=files,
timeout=30
)
if response.status_code == 200:
voice_id = response.json().get('voice_id')
logger.info(f"Successfully cloned voice: {voice_id}")
return voice_id
else:
logger.error(f"ElevenLabs voice cloning error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error cloning voice: {str(e)}")
return None
async def generate_cloned_speech(voice_id, text):
"""Generate speech using ElevenLabs with a cloned voice."""
try:
headers = {
'xi-api-key': ELEVENLABS_API_KEY,
'Content-Type': 'application/json'
}
payload = {
'text': text,
'voice_settings': {
'stability': 0.75,
'similarity_boost': 0.75
}
}
response = requests.post(
f'https://api.elevenlabs.io/v1/text-to-speech/{voice_id}',
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
filename = f"temp/impostor_audio_{uuid.uuid4()}.mp3"
with open(filename, 'wb') as f:
f.write(response.content)
logger.info(f"Generated cloned speech: {filename}")
return filename
else:
logger.error(f"ElevenLabs speech generation error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error generating cloned speech: {str(e)}")
return None
@app.route('/')
def home():
"""Serve the main game page."""
return render_template('index.html')
@app.route('/api/start_game', methods=['POST'])
async def start_game():
"""Initialize a new game session."""
try:
data = request.get_json()
game_id = data.get('game_id')
if game_id not in game_state.games:
return jsonify({'error': 'Game not found'}), 404
game = game_state.games[game_id]
# Generate question for the round
question = await generate_question()
game['question'] = question
game['current_phase'] = 'recording'
return jsonify({
'status': 'success',
'question': question
})
except Exception as e:
logger.error(f"Error starting game: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/submit_recording', methods=['POST'])
async def submit_recording():
"""Handle voice recording submissions."""
try:
game_id = request.form.get('game_id')
player_id = request.form.get('player_id')
audio_file = request.files.get('audio')
if not all([game_id, player_id, audio_file]):
return jsonify({'error': 'Missing required data'}), 400
if game_id not in game_state.games:
return jsonify({'error': 'Game not found'}), 404
# Save the recording
filename = secure_filename(f"recording_{game_id}_{player_id}.wav")
filepath = os.path.join('temp', filename)
audio_file.save(filepath)
game_state.games[game_id]['recordings'][player_id] = filepath
return jsonify({'status': 'success'})
except Exception as e:
logger.error(f"Error submitting recording: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/process_impostor', methods=['POST'])
async def process_impostor():
"""Handle the complete impostor voice generation process."""
try:
data = request.get_json()
game_id = data.get('game_id')
impostor_id = data.get('impostor_id')
if game_id not in game_state.games:
return jsonify({'error': 'Game not found'}), 404
game = game_state.games[game_id]
# Generate impostor's answer
impostor_answer = await generate_impostor_answer(game['question'])
game['impostor_answer'] = impostor_answer
# Get impostor's original recording
original_recording = game['recordings'].get(impostor_id)
if not original_recording:
return jsonify({'error': 'Original recording not found'}), 404
# Clone voice
voice_id = await clone_voice(original_recording)
if not voice_id:
return jsonify({'error': 'Voice cloning failed'}), 500
# Generate modified speech
audio_file = await generate_cloned_speech(voice_id, impostor_answer)
if not audio_file:
return jsonify({'error': 'Speech generation failed'}), 500
game['modified_recording'] = audio_file
return jsonify({
'status': 'success',
'audio_url': audio_file
})
except Exception as e:
logger.error(f"Error processing impostor: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@socketio.on('join_game')
def handle_join_game(data):
"""Handle a player joining the game."""
game_id = data.get('game_id')
player_name = data.get('player_name')
if game_id in game_state.games:
game = game_state.games[game_id]
if len(game['players']) < 5:
player_id = len(game['players']) + 1
game['players'].append({
'id': player_id,
'name': player_name
})
emit('player_joined', {
'player_id': player_id,
'player_name': player_name
}, broadcast=True, room=game_id)
@socketio.on('submit_vote')
def handle_vote(data):
"""Process player votes and determine round outcome."""
game_id = data.get('game_id')
voter_id = data.get('voter_id')
vote_for = data.get('vote_for')
if game_id in game_state.games:
game = game_state.games[game_id]
game['votes'][voter_id] = vote_for
# Check if all players have voted
if len(game['votes']) == len(game['players']):
# Calculate results
votes_count = {}
for vote in game['votes'].values():
votes_count[vote] = votes_count.get(vote, 0) + 1
most_voted = max(votes_count.items(), key=lambda x: x[1])[0]
# Update scores
if most_voted == game['impostor']:
game['score']['player_wins'] += 1
else:
game['score']['impostor_wins'] += 1
# Store round results
game['completed_rounds'].append({
'round_number': game['round_number'],
'impostor': game['impostor'],
'votes': game['votes'].copy(),
'most_voted': most_voted
})
# Emit results
emit('round_result', {
'impostor': game['impostor'],
'most_voted': most_voted,
'votes': game['votes'],
'score': game['score']
}, broadcast=True, room=game_id)
if __name__ == '__main__':
# Create temporary directory for recordings if it doesn't exist
os.makedirs('temp', exist_ok=True)
# Start the server
socketio.run(app, host='0.0.0.0', port=7860, debug=True)