/** * Connections * Pokemon Showdown - http://pokemonshowdown.com/ * * Abstraction layer for multi-process SockJS connections. * * This file handles all the communications between the users' * browsers, the networking processes, and users.ts in the * main process. * * @license MIT */ import * as fs from 'fs'; import * as http from 'http'; import * as https from 'https'; import * as path from 'path'; import { crashlogger, ProcessManager, Streams, Repl } from '../lib'; import { IPTools } from './ip-tools'; import { type ChannelID, extractChannelMessages } from '../sim/battle'; type StreamWorker = ProcessManager.StreamWorker; export const Sockets = new class { async onSpawn(worker: StreamWorker) { const id = worker.workerid; for await (const data of worker.stream) { switch (data.charAt(0)) { case '*': { // *socketid, ip, protocol // connect worker.load++; const [socketid, ip, protocol] = data.substr(1).split('\n'); Users.socketConnect(worker, id, socketid, ip, protocol); break; } case '!': { // !socketid // disconnect worker.load--; const socketid = data.substr(1); Users.socketDisconnect(worker, id, socketid); break; } case '<': { // void this.onSpawn(worker)); PM.subscribeUnspawn(this.onUnspawn); PM.spawn(workerCount); } socketSend(worker: StreamWorker, socketid: string, message: string) { void worker.stream.write(`>${socketid}\n${message}`); } socketDisconnect(worker: StreamWorker, socketid: string) { void worker.stream.write(`!${socketid}`); } roomBroadcast(roomid: RoomID, message: string) { for (const worker of PM.workers) { void worker.stream.write(`#${roomid}\n${message}`); } } roomAdd(worker: StreamWorker, roomid: RoomID, socketid: string) { void worker.stream.write(`+${roomid}\n${socketid}`); } roomRemove(worker: StreamWorker, roomid: RoomID, socketid: string) { void worker.stream.write(`-${roomid}\n${socketid}`); } channelBroadcast(roomid: RoomID, message: string) { for (const worker of PM.workers) { void worker.stream.write(`:${roomid}\n${message}`); } } channelMove(worker: StreamWorker, roomid: RoomID, channelid: ChannelID, socketid: string) { void worker.stream.write(`.${roomid}\n${channelid}\n${socketid}`); } eval(worker: StreamWorker, query: string) { void worker.stream.write(`$${query}`); } }; export class ServerStream extends Streams.ObjectReadWriteStream { /** socketid:Connection */ sockets = new Map(); /** roomid:socketid:Connection */ rooms = new Map>(); /** roomid:socketid:channelid */ roomChannels = new Map>(); server: http.Server; serverSsl: https.Server | null; socketCounter = 0; isTrustedProxyIp: (ip: string) => boolean; receivers: { [k: string]: (this: ServerStream, data: string) => void } = { '$'(data) { // $code // eslint-disable-next-line no-eval eval(data.substr(1)); }, '!'(data) { // !socketid // destroy const socketid = data.substr(1); const socket = this.sockets.get(socketid); if (!socket) return; socket.destroy(); this.sockets.delete(socketid); for (const [curRoomid, curRoom] of this.rooms) { curRoom.delete(socketid); const roomChannel = this.roomChannels.get(curRoomid); if (roomChannel) roomChannel.delete(socketid); if (!curRoom.size) { this.rooms.delete(curRoomid); if (roomChannel) this.roomChannels.delete(curRoomid); } } }, '>'(data) { // >socketid, message // message to single connection const nlLoc = data.indexOf('\n'); const socketid = data.substr(1, nlLoc - 1); const socket = this.sockets.get(socketid); if (!socket) return; const message = data.substr(nlLoc + 1); socket.write(message); }, '#'(data) { // #roomid, message // message to all connections in room // #, message // message to all connections const nlLoc = data.indexOf('\n'); const roomid = data.substr(1, nlLoc - 1) as RoomID; const room = roomid ? this.rooms.get(roomid) : this.sockets; if (!room) return; const message = data.substr(nlLoc + 1); for (const curSocket of room.values()) curSocket.write(message); }, '+'(data) { // +roomid, socketid // join room with connection const nlLoc = data.indexOf('\n'); const socketid = data.substr(nlLoc + 1); const socket = this.sockets.get(socketid); if (!socket) return; const roomid = data.substr(1, nlLoc - 1) as RoomID; let room = this.rooms.get(roomid); if (!room) { room = new Map(); this.rooms.set(roomid, room); } room.set(socketid, socket); }, '-'(data) { // -roomid, socketid // leave room with connection const nlLoc = data.indexOf('\n'); const roomid = data.slice(1, nlLoc) as RoomID; const room = this.rooms.get(roomid); if (!room) return; const socketid = data.slice(nlLoc + 1); room.delete(socketid); const roomChannel = this.roomChannels.get(roomid); if (roomChannel) roomChannel.delete(socketid); if (!room.size) { this.rooms.delete(roomid); if (roomChannel) this.roomChannels.delete(roomid); } }, '.'(data) { // .roomid, channelid, socketid // move connection to different channel in room const nlLoc = data.indexOf('\n'); const roomid = data.slice(1, nlLoc) as RoomID; const nlLoc2 = data.indexOf('\n', nlLoc + 1); const channelid = Number(data.slice(nlLoc + 1, nlLoc2)) as ChannelID; const socketid = data.slice(nlLoc2 + 1); let roomChannel = this.roomChannels.get(roomid); if (!roomChannel) { roomChannel = new Map(); this.roomChannels.set(roomid, roomChannel); } if (channelid === 0) { roomChannel.delete(socketid); } else { roomChannel.set(socketid, channelid); } }, ':'(data) { // :roomid, message // message to a room, splitting `|split` by channel const nlLoc = data.indexOf('\n'); const roomid = data.slice(1, nlLoc) as RoomID; const room = this.rooms.get(roomid); if (!room) return; const messages: [string | null, string | null, string | null, string | null, string | null] = [ null, null, null, null, null, ]; const message = data.substr(nlLoc + 1); const channelMessages = extractChannelMessages(message, [0, 1, 2, 3, 4]); const roomChannel = this.roomChannels.get(roomid); for (const [curSocketid, curSocket] of room) { const channelid = roomChannel?.get(curSocketid) || 0; if (!messages[channelid]) messages[channelid] = channelMessages[channelid].join('\n'); curSocket.write(messages[channelid]); } }, }; constructor(config: { port: number, bindaddress?: string, ssl?: typeof Config.ssl, wsdeflate?: typeof Config.wsdeflate, proxyip?: typeof Config.proxyip, customhttpresponse?: typeof Config.customhttpresponse, disablenodestatic?: boolean, }) { super(); if (!config.bindaddress) config.bindaddress = '0.0.0.0'; this.isTrustedProxyIp = config.proxyip ? IPTools.checker(config.proxyip) : () => false; // Static HTTP server // This handles the custom CSS and custom avatar features, and also // redirects yourserver:8001 to yourserver-8001.psim.us // It's optional if you don't need these features. this.server = http.createServer(); this.serverSsl = null; if (config.ssl) { let key; try { key = path.resolve(__dirname, config.ssl.options.key); if (!fs.statSync(key).isFile()) throw new Error(); try { key = fs.readFileSync(key); } catch (e: any) { crashlogger( new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`), `Socket process ${process.pid}` ); } } catch { console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); key = config.ssl.options.key; } let cert; try { cert = path.resolve(__dirname, config.ssl.options.cert); if (!fs.statSync(cert).isFile()) throw new Error(); try { cert = fs.readFileSync(cert); } catch (e: any) { crashlogger( new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`), `Socket process ${process.pid}` ); } } catch { console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); cert = config.ssl.options.cert; } if (key && cert) { try { // In case there are additional SSL config settings besides the key and cert... this.serverSsl = https.createServer({ ...config.ssl.options, key, cert }); } catch (e: any) { crashlogger(new Error(`The SSL settings are misconfigured:\n${e.stack}`), `Socket process ${process.pid}`); } } } // Static server try { if (config.disablenodestatic) throw new Error("disablenodestatic"); const StaticServer: typeof import('node-static').Server = require('node-static').Server; const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/; const cssServer = new StaticServer('./config'); const avatarServer = new StaticServer('./config/avatars'); const staticServer = new StaticServer('./server/static'); const staticRequestHandler = (req: http.IncomingMessage, res: http.ServerResponse) => { // console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`); req.resume(); req.addListener('end', () => { if (config.customhttpresponse?.(req, res)) { return; } let server = staticServer; if (req.url) { if (req.url === '/custom.css') { server = cssServer; } else if (req.url.startsWith('/avatars/')) { req.url = req.url.substr(8); server = avatarServer; } else if (roomidRegex.test(req.url)) { req.url = '/'; } } server.serve(req, res, e => { if (e && (e as any).status === 404) { staticServer.serveFile('404.html', 404, {}, req, res); } }); }); }; this.server.on('request', staticRequestHandler); if (this.serverSsl) this.serverSsl.on('request', staticRequestHandler); } catch (e: any) { if (e.message === 'disablenodestatic') { console.log('node-static is disabled'); } else { console.log('Could not start node-static - try `npm install` if you want to use it'); } } // SockJS server // This is the main server that handles users connecting to our server // and doing things on our server. const sockjs: typeof import('sockjs') = (require as any)('sockjs'); const options: import('sockjs').ServerOptions & { faye_server_options?: { [key: string]: any } } = { sockjs_url: `//play.pokemonshowdown.com/js/lib/sockjs-1.4.0-nwjsfix.min.js`, prefix: '/showdown', log(severity: string, message: string) { if (severity === 'error') console.log(`ERROR: ${message}`); }, }; if (config.wsdeflate !== null) { try { const deflate = (require as any)('permessage-deflate').configure(config.wsdeflate); options.faye_server_options = { extensions: [deflate] }; } catch { crashlogger( new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."), "Sockets" ); } } const server = sockjs.createServer(options); process.once('disconnect', () => this.cleanup()); process.once('exit', () => this.cleanup()); // this is global so it can be hotpatched if necessary server.on('connection', connection => this.onConnection(connection)); server.installHandlers(this.server, {}); this.server.listen(config.port, config.bindaddress); console.log(`Worker ${PM.workerid} now listening on ${config.bindaddress}:${config.port}`); if (this.serverSsl) { server.installHandlers(this.serverSsl, {}); // @ts-expect-error if appssl exists, then `config.ssl` must also exist this.serverSsl.listen(config.ssl.port, config.bindaddress); // @ts-expect-error if appssl exists, then `config.ssl` must also exist console.log(`Worker ${PM.workerid} now listening for SSL on port ${config.ssl.port}`); } console.log(`Test your server at http://${config.bindaddress === '0.0.0.0' ? 'localhost' : config.bindaddress}:${config.port}`); } /** * Clean up any remaining connections on disconnect. If this isn't done, * the process will not exit until any remaining connections have been destroyed. * Afterwards, the worker process will die on its own */ cleanup() { for (const socket of this.sockets.values()) { try { socket.destroy(); } catch {} } this.sockets.clear(); this.rooms.clear(); this.roomChannels.clear(); this.server.close(); if (this.serverSsl) this.serverSsl.close(); // Let the server(s) finish closing. setImmediate(() => process.exit(0)); } onConnection(socket: import('sockjs').Connection) { // For reasons that are not entirely clear, SockJS sometimes triggers // this event with a null `socket` argument. if (!socket) return; if (!socket.remoteAddress) { // SockJS sometimes fails to be able to cache the IP, port, and // address from connection request headers. try { socket.destroy(); } catch {} return; } const socketid = `${++this.socketCounter}`; this.sockets.set(socketid, socket); let socketip = socket.remoteAddress; if (this.isTrustedProxyIp(socketip)) { const ips = (socket.headers['x-forwarded-for'] || '').split(',').reverse(); for (const ip of ips) { const proxy = ip.trim(); if (!this.isTrustedProxyIp(proxy)) { socketip = proxy; break; } } } this.push(`*${socketid}\n${socketip}\n${socket.protocol}`); socket.on('data', message => { // drop empty messages (DDoS?) if (!message) return; // drop messages over 100KB if (message.length > (100 * 1024)) { socket.write(`|popup|Your message must be below 100KB`); console.log(`Dropping client message ${message.length / 1024} KB...`); console.log(message.slice(0, 160)); return; } // drop legacy JSON messages if (typeof message !== 'string' || message.startsWith('{')) return; // drop blank messages (DDoS?) const pipeIndex = message.indexOf('|'); if (pipeIndex < 0 || pipeIndex === message.length - 1) return; this.push(`<${socketid}\n${message}`); }); socket.once('close', () => { this.push(`!${socketid}`); this.sockets.delete(socketid); for (const room of this.rooms.values()) room.delete(socketid); }); } _write(data: string) { // console.log('worker received: ' + data); const receiver = this.receivers[data.charAt(0)]; if (receiver) receiver.call(this, data); } } /********************************************************* * Process manager *********************************************************/ export const PM = new ProcessManager.RawProcessManager({ module, setupChild: () => new ServerStream(Config), isCluster: true, }); if (!PM.isParentProcess) { // This is a child process! global.Config = (require as any)('./config-loader').Config; if (Config.crashguard) { // graceful crash - allow current battles to finish before restarting process.on('uncaughtException', err => { crashlogger(err, `Socket process ${PM.workerid} (${process.pid})`); }); process.on('unhandledRejection', err => { crashlogger(err as any || {}, `Socket process ${PM.workerid} (${process.pid}) Promise`); }); } if (Config.sockets) { try { require.resolve('node-oom-heapdump'); } catch (e: any) { if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen throw new Error( 'node-oom-heapdump is not installed, but it is a required dependency if Config.ofesockets is set to true! ' + 'Run npm install node-oom-heapdump and restart the server.' ); } // Create a heapdump if the process runs out of memory. (global as any).nodeOomHeapdump = (require as any)('node-oom-heapdump')({ addTimestamp: true, }); } // setup worker if (process.env.PSPORT) Config.port = +process.env.PSPORT; if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR; if (process.env.PSNOSSL && parseInt(process.env.PSNOSSL)) Config.ssl = null; // eslint-disable-next-line no-eval Repl.start(`sockets-${PM.workerid}-${process.pid}`, cmd => eval(cmd)); }