|
|
|
from flask import Flask, render_template, send_from_directory, request |
|
from flask_socketio import SocketIO, emit |
|
from collections import defaultdict |
|
import os |
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = 'your-secret-key' |
|
socketio = SocketIO(app, cors_allowed_origins="*") |
|
|
|
|
|
SPACE_PASSWORD = os.environ.get('password') or os.environ.get('PASSWORD') or None |
|
|
|
|
|
|
|
|
|
class GoGame: |
|
def __init__(self, size=13): |
|
|
|
self.size = size |
|
self.board = [[None for _ in range(size)] for _ in range(size)] |
|
self.current_player = 'black' |
|
self.captured = {'black': 0, 'white': 0} |
|
self.passes = 0 |
|
self.game_over = False |
|
self.scores = {'black': 0, 'white': 0} |
|
self.game_history = {'black': 0, 'white': 0} |
|
self.last_move = None |
|
|
|
def place_stone(self, x, y, color): |
|
if self.game_over: |
|
return False |
|
if not (0 <= x < self.size and 0 <= y < self.size): |
|
return False |
|
if self.board[x][y] is not None: |
|
return False |
|
|
|
self.board[x][y] = color |
|
self.last_move = (x, y) |
|
|
|
opponent = 'white' if color == 'black' else 'black' |
|
removed = self.check_captures(x, y, opponent) |
|
self.captured[opponent] += len(removed) |
|
|
|
|
|
if not self.has_liberties(x, y, color): |
|
self.board[x][y] = None |
|
return False |
|
|
|
self.current_player = opponent |
|
self.passes = 0 |
|
return True |
|
|
|
def check_captures(self, x, y, opponent): |
|
captured = [] |
|
for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: |
|
nx, ny = x + dx, y + dy |
|
if 0 <= nx < self.size and 0 <= ny < self.size: |
|
if self.board[nx][ny] == opponent and not self.has_liberties(nx, ny, opponent): |
|
captured.extend(self.remove_group(nx, ny, opponent)) |
|
return captured |
|
|
|
def has_liberties(self, x, y, color): |
|
visited = set() |
|
return self._has_liberties_recursive(x, y, color, visited) |
|
|
|
def _has_liberties_recursive(self, x, y, color, visited): |
|
if (x, y) in visited: |
|
return False |
|
visited.add((x, y)) |
|
for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: |
|
nx, ny = x + dx, y + dy |
|
if 0 <= nx < self.size and 0 <= ny < self.size: |
|
if self.board[nx][ny] is None: |
|
return True |
|
if self.board[nx][ny] == color and self._has_liberties_recursive(nx, ny, color, visited): |
|
return True |
|
return False |
|
|
|
def remove_group(self, x, y, color): |
|
visited = set() |
|
self._remove_group_recursive(x, y, color, visited) |
|
return list(visited) |
|
|
|
def _remove_group_recursive(self, x, y, color, visited): |
|
if (x, y) in visited: |
|
return |
|
if not (0 <= x < self.size and 0 <= y < self.size): |
|
return |
|
if self.board[x][y] != color: |
|
return |
|
visited.add((x, y)) |
|
self.board[x][y] = None |
|
for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]: |
|
self._remove_group_recursive(x + dx, y + dy, color, visited) |
|
|
|
def pass_turn(self): |
|
if self.game_over: |
|
return |
|
self.passes += 1 |
|
self.current_player = 'white' if self.current_player == 'black' else 'black' |
|
if self.passes >= 2: |
|
self.end_game() |
|
|
|
def resign(self, player_color): |
|
winner = 'white' if player_color == 'black' else 'black' |
|
self.game_history[winner] += 1 |
|
self.game_over = True |
|
return winner |
|
|
|
def end_game(self): |
|
|
|
black_score = self.captured['black'] |
|
white_score = self.captured['white'] + 6.5 |
|
for row in self.board: |
|
for cell in row: |
|
if cell == 'black': |
|
black_score += 1 |
|
elif cell == 'white': |
|
white_score += 1 |
|
self.scores['black'] = int(black_score) |
|
self.scores['white'] = int(white_score) |
|
if black_score > white_score: |
|
self.game_history['black'] += 1 |
|
else: |
|
self.game_history['white'] += 1 |
|
self.game_over = True |
|
|
|
def reset(self, keep_history=True): |
|
"""Reset board/state; preserve per-color history by default.""" |
|
hist = self.game_history if keep_history else {'black': 0, 'white': 0} |
|
self.board = [[None for _ in range(self.size)] for _ in range(self.size)] |
|
self.current_player = 'black' |
|
self.captured = {'black': 0, 'white': 0} |
|
self.passes = 0 |
|
self.game_over = False |
|
self.scores = {'black': 0, 'white': 0} |
|
self.last_move = None |
|
self.game_history = hist |
|
|
|
|
|
|
|
|
|
game = GoGame(size=13) |
|
sid_to_username = {} |
|
current_player_user = {'black': None, 'white': None} |
|
wins_by_user = defaultdict(int) |
|
|
|
|
|
|
|
|
|
def winner_username_for_color(color): |
|
return current_player_user.get(color) |
|
|
|
def snapshot(): |
|
return { |
|
'board_size': game.size, |
|
'current_player': game.current_player, |
|
'scores': game.scores, |
|
'game_history': game.game_history, |
|
'wins_by_user': dict(wins_by_user), |
|
'board': game.board, |
|
'last_move': game.last_move, |
|
'captured': game.captured, |
|
'passes': game.passes, |
|
'game_over': game.game_over |
|
} |
|
|
|
def broadcast_colors(): |
|
socketio.emit('colors', { |
|
'black': current_player_user['black'], |
|
'white': current_player_user['white'] |
|
}) |
|
|
|
def require_auth(): |
|
"""Ensure the current socket is authenticated via successful join.""" |
|
if request.sid not in sid_to_username: |
|
emit('error', {'message': 'Not authenticated'}) |
|
return False |
|
return True |
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/<path:path>') |
|
def static_files(path): |
|
return send_from_directory('.', path) |
|
|
|
|
|
|
|
|
|
@socketio.on('connect') |
|
def on_connect(): |
|
print('Client connected', request.sid) |
|
|
|
@socketio.on('disconnect') |
|
def on_disconnect(): |
|
sid = request.sid |
|
username = sid_to_username.pop(sid, None) |
|
print('Client disconnected', sid, username) |
|
|
|
|
|
@socketio.on('join') |
|
def on_join(data): |
|
username = (data or {}).get('username') |
|
provided_password = (data or {}).get('password') |
|
|
|
if SPACE_PASSWORD and provided_password != SPACE_PASSWORD: |
|
emit('error', {'message': 'Invalid password'}) |
|
return |
|
if not username: |
|
emit('error', {'message': 'Username required'}) |
|
return |
|
sid_to_username[request.sid] = username |
|
emit('init', snapshot()) |
|
broadcast_colors() |
|
|
|
@socketio.on('claim_color') |
|
def on_claim_color(data): |
|
if not require_auth(): |
|
return |
|
username = (data or {}).get('username') |
|
color = (data or {}).get('color') |
|
if color not in ('black', 'white'): |
|
emit('error', {'message': 'Bad color'}) |
|
return |
|
other = 'white' if color == 'black' else 'black' |
|
|
|
|
|
if current_player_user.get(other) == username: |
|
emit('error', {'message': 'You already claimed the other color'}) |
|
return |
|
|
|
|
|
if current_player_user.get(color) == username: |
|
broadcast_colors() |
|
return |
|
|
|
|
|
if current_player_user.get(color) is None: |
|
current_player_user[color] = username |
|
broadcast_colors() |
|
else: |
|
emit('error', {'message': f'{color} already taken'}) |
|
|
|
@socketio.on('move') |
|
def on_move(data): |
|
if not require_auth(): |
|
return |
|
x = int((data or {}).get('x', -1)) |
|
y = int((data or {}).get('y', -1)) |
|
user = (data or {}).get('player') |
|
|
|
moving_color = game.current_player |
|
if user != current_player_user.get(moving_color): |
|
emit('error', {'message': 'Not your turn!'}) |
|
return |
|
|
|
if game.place_stone(x, y, moving_color): |
|
socketio.emit('move', { |
|
'x': x, |
|
'y': y, |
|
'player': moving_color, |
|
'next_player': game.current_player, |
|
'captured': game.captured |
|
}) |
|
else: |
|
emit('error', {'message': 'Invalid move!'}) |
|
|
|
@socketio.on('pass') |
|
def on_pass(data): |
|
if not require_auth(): |
|
return |
|
user = (data or {}).get('player') |
|
if user != current_player_user.get(game.current_player): |
|
emit('error', {'message': 'Not your turn!'}) |
|
return |
|
|
|
game.pass_turn() |
|
socketio.emit('pass', {'next_player': game.current_player}) |
|
|
|
if game.game_over: |
|
|
|
if game.scores['black'] != game.scores['white']: |
|
winner_color = 'black' if game.scores['black'] > game.scores['white'] else 'white' |
|
wuser = winner_username_for_color(winner_color) |
|
if wuser: |
|
wins_by_user[wuser] += 1 |
|
|
|
socketio.emit('game_over', { |
|
'scores': game.scores, |
|
'game_history': game.game_history, |
|
'wins_by_user': dict(wins_by_user) |
|
}) |
|
|
|
@socketio.on('resign') |
|
def on_resign(data): |
|
if not require_auth(): |
|
return |
|
user = (data or {}).get('player') |
|
|
|
player_color = None |
|
for color, uname in current_player_user.items(): |
|
if uname == user: |
|
player_color = color |
|
break |
|
if not player_color: |
|
emit('error', {'message': 'You have not claimed a color'}) |
|
return |
|
|
|
winner_color = game.resign(player_color) |
|
wuser = winner_username_for_color(winner_color) |
|
if wuser: |
|
wins_by_user[wuser] += 1 |
|
|
|
socketio.emit('resign', { |
|
'winner': winner_color, |
|
'scores': game.scores, |
|
'game_history': game.game_history, |
|
'wins_by_user': dict(wins_by_user) |
|
}) |
|
|
|
@socketio.on('new_game') |
|
def on_new_game(data): |
|
if not require_auth(): |
|
return |
|
|
|
game.reset(keep_history=True) |
|
current_player_user['black'] = None |
|
current_player_user['white'] = None |
|
socketio.emit('init', snapshot()) |
|
broadcast_colors() |
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
socketio.run(app, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), allow_unsafe_werkzeug=True) |
|
|