Spaces:
Paused
Paused
hunun
/
attached_assets
/Pasted-import-os-import-uuid-import-json-from-datetime-import-datetime-from-flask-import-Flask-render--1753191891664_1753191891665.txt
| import os | |
| import uuid | |
| import json | |
| from datetime import datetime | |
| from flask import Flask, render_template, request, jsonify, session, redirect, url_for | |
| from flask_cors import CORS | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("SESSION_SECRET", "whatsapp-clone-secret-key") | |
| CORS(app) | |
| # In-memory storage | |
| users = {} # user_id: {name, email, unique_id, online, last_seen} | |
| conversations = {} # conversation_id: {type: 'private'/'group', participants: [], messages: []} | |
| user_conversations = {} # user_id: [conversation_ids] | |
| def generate_unique_id(): | |
| """Generate a unique 8-character ID for users""" | |
| return str(uuid.uuid4())[:8].upper() | |
| def get_current_user(): | |
| """Get current user from session""" | |
| user_id = session.get('user_id') | |
| return users.get(user_id) if user_id else None | |
| @app.route('/') | |
| def landing(): | |
| """Landing page""" | |
| if 'user_id' in session and session['user_id'] in users: | |
| return redirect(url_for('chat')) | |
| return render_template('landing.html') | |
| @app.route('/register') | |
| def register_page(): | |
| """Registration page""" | |
| return render_template('register.html') | |
| @app.route('/api/register', methods=['POST']) | |
| def register(): | |
| """Register a new user""" | |
| try: | |
| data = request.get_json() | |
| name = data.get('name', '').strip() | |
| email = data.get('email', '').strip() | |
| if not name or not email: | |
| return jsonify({'success': False, 'message': 'Name and email are required'}) | |
| # Check if email already exists | |
| for user_id, user_data in users.items(): | |
| if user_data['email'] == email: | |
| return jsonify({'success': False, 'message': 'Email already registered'}) | |
| # Create new user | |
| user_id = str(uuid.uuid4()) | |
| unique_id = generate_unique_id() | |
| users[user_id] = { | |
| 'name': name, | |
| 'email': email, | |
| 'unique_id': unique_id, | |
| 'online': True, | |
| 'last_seen': datetime.now().isoformat(), | |
| 'user_id': user_id | |
| } | |
| user_conversations[user_id] = [] | |
| # Set session | |
| session['user_id'] = user_id | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'user_id': user_id, | |
| 'name': name, | |
| 'email': email, | |
| 'unique_id': unique_id | |
| } | |
| }) | |
| except Exception as e: | |
| logging.error(f"Registration error: {e}") | |
| return jsonify({'success': False, 'message': 'Registration failed'}) | |
| @app.route('/chat') | |
| def chat(): | |
| """Main chat interface""" | |
| if 'user_id' not in session or session['user_id'] not in users: | |
| return redirect(url_for('landing')) | |
| user = users[session['user_id']] | |
| return render_template('chat.html', user=user) | |
| @app.route('/settings') | |
| def settings(): | |
| """Settings page""" | |
| if 'user_id' not in session or session['user_id'] not in users: | |
| return redirect(url_for('landing')) | |
| user = users[session['user_id']] | |
| return render_template('settings.html', user=user) | |
| @app.route('/api/find_user', methods=['POST']) | |
| def find_user(): | |
| """Find user by unique ID""" | |
| try: | |
| data = request.get_json() | |
| unique_id = data.get('unique_id', '').strip().upper() | |
| if not unique_id: | |
| return jsonify({'success': False, 'message': 'Unique ID is required'}) | |
| # Find user by unique_id | |
| for user_id, user_data in users.items(): | |
| if user_data['unique_id'] == unique_id: | |
| return jsonify({ | |
| 'success': True, | |
| 'user': { | |
| 'user_id': user_id, | |
| 'name': user_data['name'], | |
| 'unique_id': user_data['unique_id'], | |
| 'online': user_data['online'] | |
| } | |
| }) | |
| return jsonify({'success': False, 'message': 'User not found'}) | |
| except Exception as e: | |
| logging.error(f"Find user error: {e}") | |
| return jsonify({'success': False, 'message': 'Search failed'}) | |
| @app.route('/api/start_conversation', methods=['POST']) | |
| def start_conversation(): | |
| """Start a private conversation or create a group""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| data = request.get_json() | |
| conversation_type = data.get('type') # 'private' or 'group' | |
| if conversation_type == 'private': | |
| target_user_id = data.get('target_user_id') | |
| if not target_user_id or target_user_id not in users: | |
| return jsonify({'success': False, 'message': 'Target user not found'}) | |
| # Check if conversation already exists | |
| for conv_id, conv_data in conversations.items(): | |
| if (conv_data['type'] == 'private' and | |
| set(conv_data['participants']) == {current_user_id, target_user_id}): | |
| return jsonify({'success': True, 'conversation_id': conv_id}) | |
| # Create new private conversation | |
| conv_id = str(uuid.uuid4()) | |
| conversations[conv_id] = { | |
| 'type': 'private', | |
| 'participants': [current_user_id, target_user_id], | |
| 'messages': [], | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # Add to user conversations | |
| if current_user_id not in user_conversations: | |
| user_conversations[current_user_id] = [] | |
| if target_user_id not in user_conversations: | |
| user_conversations[target_user_id] = [] | |
| user_conversations[current_user_id].append(conv_id) | |
| user_conversations[target_user_id].append(conv_id) | |
| return jsonify({'success': True, 'conversation_id': conv_id}) | |
| elif conversation_type == 'group': | |
| group_name = data.get('group_name', '').strip() | |
| participant_ids = data.get('participant_ids', []) | |
| if not group_name: | |
| return jsonify({'success': False, 'message': 'Group name is required'}) | |
| if len(participant_ids) < 2 or len(participant_ids) > 9: | |
| return jsonify({'success': False, 'message': 'Groups must have 3-10 members (including you)'}) | |
| # Add current user to participants | |
| all_participants = list(set([current_user_id] + participant_ids)) | |
| # Validate all participants exist | |
| for pid in all_participants: | |
| if pid not in users: | |
| return jsonify({'success': False, 'message': f'User {pid} not found'}) | |
| # Create group conversation | |
| conv_id = str(uuid.uuid4()) | |
| conversations[conv_id] = { | |
| 'type': 'group', | |
| 'name': group_name, | |
| 'participants': all_participants, | |
| 'messages': [], | |
| 'created_at': datetime.now().isoformat(), | |
| 'created_by': current_user_id | |
| } | |
| # Add to all participants' conversations | |
| for pid in all_participants: | |
| if pid not in user_conversations: | |
| user_conversations[pid] = [] | |
| user_conversations[pid].append(conv_id) | |
| return jsonify({'success': True, 'conversation_id': conv_id}) | |
| return jsonify({'success': False, 'message': 'Invalid conversation type'}) | |
| except Exception as e: | |
| logging.error(f"Start conversation error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to start conversation'}) | |
| @app.route('/api/conversations') | |
| def get_conversations(): | |
| """Get user's conversations""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| user_convs = user_conversations.get(current_user_id, []) | |
| result = [] | |
| for conv_id in user_convs: | |
| if conv_id in conversations: | |
| conv = conversations[conv_id] | |
| # Get conversation info | |
| conv_info = { | |
| 'id': conv_id, | |
| 'type': conv['type'], | |
| 'participants': [] | |
| } | |
| if conv['type'] == 'private': | |
| # For private chat, get the other user's info | |
| other_user_id = next(pid for pid in conv['participants'] if pid != current_user_id) | |
| other_user = users.get(other_user_id, {}) | |
| conv_info['name'] = other_user.get('name', 'Unknown User') | |
| conv_info['online'] = other_user.get('online', False) | |
| else: | |
| # For group chat | |
| conv_info['name'] = conv.get('name', 'Group Chat') | |
| conv_info['online'] = True # Groups are always "online" | |
| # Get participant info | |
| for pid in conv['participants']: | |
| if pid in users: | |
| user_data = users[pid] | |
| conv_info['participants'].append({ | |
| 'user_id': pid, | |
| 'name': user_data['name'], | |
| 'unique_id': user_data['unique_id'], | |
| 'online': user_data['online'] | |
| }) | |
| # Get last message | |
| if conv['messages']: | |
| last_msg = conv['messages'][-1] | |
| conv_info['last_message'] = { | |
| 'content': last_msg['content'], | |
| 'timestamp': last_msg['timestamp'], | |
| 'sender_name': users.get(last_msg['sender_id'], {}).get('name', 'Unknown') | |
| } | |
| else: | |
| conv_info['last_message'] = None | |
| result.append(conv_info) | |
| return jsonify({'success': True, 'conversations': result}) | |
| except Exception as e: | |
| logging.error(f"Get conversations error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to load conversations'}) | |
| @app.route('/api/messages/<conversation_id>') | |
| def get_messages(conversation_id): | |
| """Get messages for a conversation""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| if conversation_id not in conversations: | |
| return jsonify({'success': False, 'message': 'Conversation not found'}) | |
| conv = conversations[conversation_id] | |
| # Check if user is participant | |
| if current_user_id not in conv['participants']: | |
| return jsonify({'success': False, 'message': 'Access denied'}) | |
| # Format messages | |
| messages = [] | |
| for msg in conv['messages']: | |
| sender = users.get(msg['sender_id'], {}) | |
| messages.append({ | |
| 'id': msg['id'], | |
| 'content': msg['content'], | |
| 'sender_id': msg['sender_id'], | |
| 'sender_name': sender.get('name', 'Unknown'), | |
| 'timestamp': msg['timestamp'], | |
| 'status': msg.get('status', 'sent'), | |
| 'seen_by': msg.get('seen_by', []) | |
| }) | |
| return jsonify({'success': True, 'messages': messages}) | |
| except Exception as e: | |
| logging.error(f"Get messages error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to load messages'}) | |
| @app.route('/api/send_message', methods=['POST']) | |
| def send_message(): | |
| """Send a message""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| data = request.get_json() | |
| conversation_id = data.get('conversation_id') | |
| content = data.get('content', '').strip() | |
| if not conversation_id or not content: | |
| return jsonify({'success': False, 'message': 'Conversation ID and content required'}) | |
| if conversation_id not in conversations: | |
| return jsonify({'success': False, 'message': 'Conversation not found'}) | |
| conv = conversations[conversation_id] | |
| # Check if user is participant | |
| if current_user_id not in conv['participants']: | |
| return jsonify({'success': False, 'message': 'Access denied'}) | |
| # Create message | |
| message = { | |
| 'id': str(uuid.uuid4()), | |
| 'content': content, | |
| 'sender_id': current_user_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'status': 'sent', | |
| 'seen_by': [current_user_id] # Sender has seen the message | |
| } | |
| conversations[conversation_id]['messages'].append(message) | |
| return jsonify({'success': True, 'message': message}) | |
| except Exception as e: | |
| logging.error(f"Send message error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to send message'}) | |
| @app.route('/api/mark_seen', methods=['POST']) | |
| def mark_seen(): | |
| """Mark messages as seen""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| data = request.get_json() | |
| conversation_id = data.get('conversation_id') | |
| if not conversation_id or conversation_id not in conversations: | |
| return jsonify({'success': False, 'message': 'Invalid conversation'}) | |
| conv = conversations[conversation_id] | |
| # Mark all messages as seen by current user | |
| for message in conv['messages']: | |
| if current_user_id not in message.get('seen_by', []): | |
| message['seen_by'].append(current_user_id) | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| logging.error(f"Mark seen error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to mark as seen'}) | |
| @app.route('/api/update_status', methods=['POST']) | |
| def update_status(): | |
| """Update user online status""" | |
| try: | |
| if 'user_id' not in session: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| current_user_id = session['user_id'] | |
| data = request.get_json() | |
| online = data.get('online', True) | |
| if current_user_id in users: | |
| users[current_user_id]['online'] = online | |
| users[current_user_id]['last_seen'] = datetime.now().isoformat() | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| logging.error(f"Update status error: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to update status'}) | |
| @app.route('/logout') | |
| def logout(): | |
| """Logout user""" | |
| if 'user_id' in session: | |
| user_id = session['user_id'] | |
| if user_id in users: | |
| users[user_id]['online'] = False | |
| users[user_id]['last_seen'] = datetime.now().isoformat() | |
| session.pop('user_id', None) | |
| return redirect(url_for('landing')) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=5000, debug=True) | |