from flask import Flask, render_template_string, request, redirect, url_for, session
import random
import string
import json
import os
from flask_socketio import SocketIO, join_room, leave_room, emit
import hashlib
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-very-secret-key-here' # Смените на реальный секретный ключ!
socketio = SocketIO(app)
# Пути к JSON-файлам
ROOMS_DB = os.path.join(app.root_path, 'rooms.json')
USERS_DB = os.path.join(app.root_path, 'users.json')
# Загрузка и сохранение JSON (с обработкой ошибок)
def load_json(file_path, default={}):
try:
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return default
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error loading JSON from {file_path}: {e}")
return default
def save_json(file_path, data):
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
except OSError as e:
print(f"Error saving JSON to {file_path}: {e}")
rooms = load_json(ROOMS_DB)
users = load_json(USERS_DB)
def generate_token():
return ''.join(random.choices(string.ascii_letters + string.digits, k=15))
def hash_password(password):
return hashlib.sha256(password.encode('utf-8')).hexdigest()
# ... (остальные функции: index, dashboard, logout - без изменений) ...
@app.route('/', methods=['GET', 'POST'])
def index():
if 'username' in session:
return redirect(url_for('dashboard'))
if request.method == 'POST':
action = request.form.get('action')
username = request.form.get('username')
password = request.form.get('password')
if action == 'register':
if username in users:
return "Пользователь уже существует", 400
users[username] = hash_password(password)
save_json(USERS_DB, users)
session['username'] = username
return redirect(url_for('dashboard'))
elif action == 'login':
if username in users and users[username] == hash_password(password):
session['username'] = username
return redirect(url_for('dashboard'))
return "Неверный логин или пароль", 401
return render_template_string('''
Видеоконференция
Видеоконференция
''')
# Панель управления
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
if 'username' not in session:
return redirect(url_for('index'))
if request.method == 'POST':
action = request.form.get('action')
if action == 'create':
token = generate_token()
rooms[token] = {'users': [], 'max_users': 5, 'admin': session['username'], 'video_url': '', 'video_state': {'playing': False, 'time': 0}}
save_json(ROOMS_DB, rooms)
return redirect(url_for('room', token=token))
elif action == 'join':
token = request.form.get('token')
if token in rooms and len(rooms[token]['users']) < rooms[token]['max_users']:
return redirect(url_for('room', token=token))
return "Комната не найдена или переполнена", 404
return render_template_string('''
Панель управления
Добро пожаловать, {{ session['username'] }}
''', session=session)
# Выход из системы
@app.route('/logout', methods=['POST'])
def logout():
session.pop('username', None)
return redirect(url_for('index'))
@app.route('/room/')
def room(token):
if 'username' not in session:
return redirect(url_for('index'))
if token not in rooms:
return redirect(url_for('dashboard'))
is_admin = rooms[token]['admin'] == session['username']
video_url = rooms[token]['video_url']
video_state = rooms[token]['video_state']
return render_template_string('''
Комната {{ token }}
Комната: {{ token }}
{% if video_url %}
{% endif %}
{% if is_admin %}
{% endif %}
''', token=token, session=session, is_admin=is_admin, video_url=video_url, video_state=video_state)
# WebSocket events
@socketio.on('join')
def handle_join(data):
token = data['token']
username = data['username']
print(f"User {username} joining room {token}")
if token in rooms and len(rooms[token]['users']) < rooms[token]['max_users']:
join_room(token)
if username not in rooms[token]['users']:
rooms[token]['users'].append(username)
save_json(ROOMS_DB, rooms)
emit('initial_video_state', {'state': rooms[token]['video_state']}, to=request.sid)
# Broadcast to everyone (including the new user)
emit('user_joined', {'username': username, 'users': rooms[token]['users']}, room=token)
# Send the list of users directly to the new user
emit('init_users', {'users': rooms[token]['users']}, to=request.sid)
else:
emit('error_message', {'message': 'Room is full or does not exist'}, to=request.sid)
@socketio.on('leave')
def handle_leave(data):
token = data['token']
username = data['username']
print(f"User {username} leaving room {token}")
if token in rooms and username in rooms[token]['users']:
leave_room(token)
rooms[token]['users'].remove(username)
if rooms[token]['admin'] == username:
del rooms[token]
save_json(ROOMS_DB, rooms)
emit('user_left', {'username': username, 'users': rooms[token]['users']}, room=token)
@socketio.on('signal')
def handle_signal(data):
# Send the signal to the specified recipient
emit('signal', data, room=data['token'], include_self=False)
@socketio.on('admin_mute')
def handle_admin_mute(data):
token = data['token']
target_user = data['targetUser']
by_user = data['byUser']
if token in rooms and rooms[token].get('admin') == by_user:
emit('admin_muted', {'targetUser': target_user}, room=token)
@socketio.on('set_video_url')
def handle_set_video_url(data):
token = data['token']
video_url = data['video_url']
print(f"Setting video URL for room {token} to {video_url}")
if token in rooms and rooms[token]['admin'] == session.get('username'):
# Sanitize the URL (basic example - prevent script injection)
if '