/** | |
* Connections | |
* Pokemon Showdown - | |
* | |
* Abstraction layer for multi-process SockJS connections. | |
* | |
* This file handles all the communications between the users' | |
* browsers, the networking processes, and users.ts in the | |
* main process. | |
* | |
* @license MIT | |
*/ | |
import * as fs from 'fs'; | |
import * as http from 'http'; | |
import * as https from 'https'; | |
import * as path from 'path'; | |
import { crashlogger, ProcessManager, Streams, Repl } from '../lib'; | |
import { IPTools } from './ip-tools'; | |
import { type ChannelID, extractChannelMessages } from '../sim/battle'; | |
type StreamWorker = ProcessManager.StreamWorker; | |
export const Sockets = new class { | |
async onSpawn(worker: StreamWorker) { | |
const id = worker.workerid; | |
for await (const data of { | |
switch (data.charAt(0)) { | |
case '*': { | |
// *socketid, ip, protocol | |
// connect | |
worker.load++; | |
const [socketid, ip, protocol] = data.substr(1).split('\n'); | |
Users.socketConnect(worker, id, socketid, ip, protocol); | |
break; | |
} | |
case '!': { | |
// !socketid | |
// disconnect | |
worker.load--; | |
const socketid = data.substr(1); | |
Users.socketDisconnect(worker, id, socketid); | |
break; | |
} | |
case '<': { | |
// <socketid, message | |
// message | |
const idx = data.indexOf('\n'); | |
const socketid = data.substr(1, idx - 1); | |
const message = data.substr(idx + 1); | |
Users.socketReceive(worker, id, socketid, message); | |
break; | |
} | |
default: | |
// unhandled | |
} | |
} | |
} | |
onUnspawn(this: void, worker: StreamWorker) { | |
Users.socketDisconnectAll(worker, worker.workerid); | |
} | |
listen(port?: number, bindAddress?: string, workerCount?: number) { | |
if (port !== undefined && !isNaN(port)) { | |
Config.port = port; | |
Config.ssl = null; | |
} else { | |
port = Config.port; | |
// Autoconfigure when running in cloud environments. | |
try { | |
const cloudenv = (require as any)('cloud-env'); | |
bindAddress = cloudenv.get('IP', bindAddress); | |
port = cloudenv.get('PORT', port); | |
} catch {} | |
} | |
if (bindAddress !== undefined) { | |
Config.bindaddress = bindAddress; | |
} | |
if (port !== undefined) { | |
Config.port = port; | |
} | |
if (workerCount === undefined) { | |
workerCount = (Config.workers !== undefined ? Config.workers : 1); | |
} | |
PM.env = { PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '', PSNOSSL: Config.ssl ? 0 : 1 }; | |
PM.subscribeSpawn(worker => void this.onSpawn(worker)); | |
PM.subscribeUnspawn(this.onUnspawn); | |
PM.spawn(workerCount); | |
} | |
socketSend(worker: StreamWorker, socketid: string, message: string) { | |
void`>${socketid}\n${message}`); | |
} | |
socketDisconnect(worker: StreamWorker, socketid: string) { | |
void`!${socketid}`); | |
} | |
roomBroadcast(roomid: RoomID, message: string) { | |
for (const worker of PM.workers) { | |
void`#${roomid}\n${message}`); | |
} | |
} | |
roomAdd(worker: StreamWorker, roomid: RoomID, socketid: string) { | |
void`+${roomid}\n${socketid}`); | |
} | |
roomRemove(worker: StreamWorker, roomid: RoomID, socketid: string) { | |
void`-${roomid}\n${socketid}`); | |
} | |
channelBroadcast(roomid: RoomID, message: string) { | |
for (const worker of PM.workers) { | |
void`:${roomid}\n${message}`); | |
} | |
} | |
channelMove(worker: StreamWorker, roomid: RoomID, channelid: ChannelID, socketid: string) { | |
void`.${roomid}\n${channelid}\n${socketid}`); | |
} | |
eval(worker: StreamWorker, query: string) { | |
void`$${query}`); | |
} | |
}; | |
export class ServerStream extends Streams.ObjectReadWriteStream<string> { | |
/** socketid:Connection */ | |
sockets = new Map<string, import('sockjs').Connection>(); | |
/** roomid:socketid:Connection */ | |
rooms = new Map<RoomID, Map<string, import('sockjs').Connection>>(); | |
/** roomid:socketid:channelid */ | |
roomChannels = new Map<RoomID, Map<string, ChannelID>>(); | |
server: http.Server; | |
serverSsl: https.Server | null; | |
socketCounter = 0; | |
isTrustedProxyIp: (ip: string) => boolean; | |
receivers: { [k: string]: (this: ServerStream, data: string) => void } = { | |
'$'(data) { | |
// $code | |
// eslint-disable-next-line no-eval | |
eval(data.substr(1)); | |
}, | |
'!'(data) { | |
// !socketid | |
// destroy | |
const socketid = data.substr(1); | |
const socket = this.sockets.get(socketid); | |
if (!socket) return; | |
socket.destroy(); | |
this.sockets.delete(socketid); | |
for (const [curRoomid, curRoom] of this.rooms) { | |
curRoom.delete(socketid); | |
const roomChannel = this.roomChannels.get(curRoomid); | |
if (roomChannel) roomChannel.delete(socketid); | |
if (!curRoom.size) { | |
this.rooms.delete(curRoomid); | |
if (roomChannel) this.roomChannels.delete(curRoomid); | |
} | |
} | |
}, | |
'>'(data) { | |
// >socketid, message | |
// message to single connection | |
const nlLoc = data.indexOf('\n'); | |
const socketid = data.substr(1, nlLoc - 1); | |
const socket = this.sockets.get(socketid); | |
if (!socket) return; | |
const message = data.substr(nlLoc + 1); | |
socket.write(message); | |
}, | |
'#'(data) { | |
// #roomid, message | |
// message to all connections in room | |
// #, message | |
// message to all connections | |
const nlLoc = data.indexOf('\n'); | |
const roomid = data.substr(1, nlLoc - 1) as RoomID; | |
const room = roomid ? this.rooms.get(roomid) : this.sockets; | |
if (!room) return; | |
const message = data.substr(nlLoc + 1); | |
for (const curSocket of room.values()) curSocket.write(message); | |
}, | |
'+'(data) { | |
// +roomid, socketid | |
// join room with connection | |
const nlLoc = data.indexOf('\n'); | |
const socketid = data.substr(nlLoc + 1); | |
const socket = this.sockets.get(socketid); | |
if (!socket) return; | |
const roomid = data.substr(1, nlLoc - 1) as RoomID; | |
let room = this.rooms.get(roomid); | |
if (!room) { | |
room = new Map(); | |
this.rooms.set(roomid, room); | |
} | |
room.set(socketid, socket); | |
}, | |
'-'(data) { | |
// -roomid, socketid | |
// leave room with connection | |
const nlLoc = data.indexOf('\n'); | |
const roomid = data.slice(1, nlLoc) as RoomID; | |
const room = this.rooms.get(roomid); | |
if (!room) return; | |
const socketid = data.slice(nlLoc + 1); | |
room.delete(socketid); | |
const roomChannel = this.roomChannels.get(roomid); | |
if (roomChannel) roomChannel.delete(socketid); | |
if (!room.size) { | |
this.rooms.delete(roomid); | |
if (roomChannel) this.roomChannels.delete(roomid); | |
} | |
}, | |
'.'(data) { | |
// .roomid, channelid, socketid | |
// move connection to different channel in room | |
const nlLoc = data.indexOf('\n'); | |
const roomid = data.slice(1, nlLoc) as RoomID; | |
const nlLoc2 = data.indexOf('\n', nlLoc + 1); | |
const channelid = Number(data.slice(nlLoc + 1, nlLoc2)) as ChannelID; | |
const socketid = data.slice(nlLoc2 + 1); | |
let roomChannel = this.roomChannels.get(roomid); | |
if (!roomChannel) { | |
roomChannel = new Map(); | |
this.roomChannels.set(roomid, roomChannel); | |
} | |
if (channelid === 0) { | |
roomChannel.delete(socketid); | |
} else { | |
roomChannel.set(socketid, channelid); | |
} | |
}, | |
':'(data) { | |
// :roomid, message | |
// message to a room, splitting `|split` by channel | |
const nlLoc = data.indexOf('\n'); | |
const roomid = data.slice(1, nlLoc) as RoomID; | |
const room = this.rooms.get(roomid); | |
if (!room) return; | |
const messages: [string | null, string | null, string | null, string | null, string | null] = [ | |
null, null, null, null, null, | |
]; | |
const message = data.substr(nlLoc + 1); | |
const channelMessages = extractChannelMessages(message, [0, 1, 2, 3, 4]); | |
const roomChannel = this.roomChannels.get(roomid); | |
for (const [curSocketid, curSocket] of room) { | |
const channelid = roomChannel?.get(curSocketid) || 0; | |
if (!messages[channelid]) messages[channelid] = channelMessages[channelid].join('\n'); | |
curSocket.write(messages[channelid]); | |
} | |
}, | |
}; | |
constructor(config: { | |
port: number, | |
bindaddress?: string, | |
ssl?: typeof Config.ssl, | |
wsdeflate?: typeof Config.wsdeflate, | |
proxyip?: typeof Config.proxyip, | |
customhttpresponse?: typeof Config.customhttpresponse, | |
disablenodestatic?: boolean, | |
}) { | |
super(); | |
if (!config.bindaddress) config.bindaddress = ''; | |
this.isTrustedProxyIp = config.proxyip ? IPTools.checker(config.proxyip) : () => false; | |
// Static HTTP server | |
// This handles the custom CSS and custom avatar features, and also | |
// redirects yourserver:8001 to | |
// It's optional if you don't need these features. | |
this.server = http.createServer(); | |
this.serverSsl = null; | |
if (config.ssl) { | |
let key; | |
try { | |
key = path.resolve(__dirname, config.ssl.options.key); | |
if (!fs.statSync(key).isFile()) throw new Error(); | |
try { | |
key = fs.readFileSync(key); | |
} catch (e: any) { | |
crashlogger( | |
new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`), | |
`Socket process ${}` | |
); | |
} | |
} catch { | |
console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); | |
key = config.ssl.options.key; | |
} | |
let cert; | |
try { | |
cert = path.resolve(__dirname, config.ssl.options.cert); | |
if (!fs.statSync(cert).isFile()) throw new Error(); | |
try { | |
cert = fs.readFileSync(cert); | |
} catch (e: any) { | |
crashlogger( | |
new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`), | |
`Socket process ${}` | |
); | |
} | |
} catch { | |
console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); | |
cert = config.ssl.options.cert; | |
} | |
if (key && cert) { | |
try { | |
// In case there are additional SSL config settings besides the key and cert... | |
this.serverSsl = https.createServer({ ...config.ssl.options, key, cert }); | |
} catch (e: any) { | |
crashlogger(new Error(`The SSL settings are misconfigured:\n${e.stack}`), `Socket process ${}`); | |
} | |
} | |
} | |
// Static server | |
try { | |
if (config.disablenodestatic) throw new Error("disablenodestatic"); | |
const StaticServer: typeof import('node-static').Server = require('node-static').Server; | |
const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/; | |
const cssServer = new StaticServer('./config'); | |
const avatarServer = new StaticServer('./config/avatars'); | |
const staticServer = new StaticServer('./server/static'); | |
const staticRequestHandler = (req: http.IncomingMessage, res: http.ServerResponse) => { | |
// console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`); | |
req.resume(); | |
req.addListener('end', () => { | |
if (config.customhttpresponse?.(req, res)) { | |
return; | |
} | |
let server = staticServer; | |
if (req.url) { | |
if (req.url === '/custom.css') { | |
server = cssServer; | |
} else if (req.url.startsWith('/avatars/')) { | |
req.url = req.url.substr(8); | |
server = avatarServer; | |
} else if (roomidRegex.test(req.url)) { | |
req.url = '/'; | |
} | |
} | |
server.serve(req, res, e => { | |
if (e && (e as any).status === 404) { | |
staticServer.serveFile('404.html', 404, {}, req, res); | |
} | |
}); | |
}); | |
}; | |
this.server.on('request', staticRequestHandler); | |
if (this.serverSsl) this.serverSsl.on('request', staticRequestHandler); | |
} catch (e: any) { | |
if (e.message === 'disablenodestatic') { | |
console.log('node-static is disabled'); | |
} else { | |
console.log('Could not start node-static - try `npm install` if you want to use it'); | |
} | |
} | |
// SockJS server | |
// This is the main server that handles users connecting to our server | |
// and doing things on our server. | |
const sockjs: typeof import('sockjs') = (require as any)('sockjs'); | |
const options: import('sockjs').ServerOptions & { faye_server_options?: { [key: string]: any } } = { | |
sockjs_url: `//`, | |
prefix: '/showdown', | |
log(severity: string, message: string) { | |
if (severity === 'error') console.log(`ERROR: ${message}`); | |
}, | |
}; | |
if (config.wsdeflate !== null) { | |
try { | |
const deflate = (require as any)('permessage-deflate').configure(config.wsdeflate); | |
options.faye_server_options = { extensions: [deflate] }; | |
} catch { | |
crashlogger( | |
new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."), | |
"Sockets" | |
); | |
} | |
} | |
const server = sockjs.createServer(options); | |
process.once('disconnect', () => this.cleanup()); | |
process.once('exit', () => this.cleanup()); | |
// this is global so it can be hotpatched if necessary | |
server.on('connection', connection => this.onConnection(connection)); | |
server.installHandlers(this.server, {}); | |
this.server.listen(config.port, config.bindaddress); | |
console.log(`Worker ${PM.workerid} now listening on ${config.bindaddress}:${config.port}`); | |
if (this.serverSsl) { | |
server.installHandlers(this.serverSsl, {}); | |
// @ts-expect-error if appssl exists, then `config.ssl` must also exist | |
this.serverSsl.listen(config.ssl.port, config.bindaddress); | |
// @ts-expect-error if appssl exists, then `config.ssl` must also exist | |
console.log(`Worker ${PM.workerid} now listening for SSL on port ${config.ssl.port}`); | |
} | |
console.log(`Test your server at http://${config.bindaddress === '' ? 'localhost' : config.bindaddress}:${config.port}`); | |
} | |
/** | |
* Clean up any remaining connections on disconnect. If this isn't done, | |
* the process will not exit until any remaining connections have been destroyed. | |
* Afterwards, the worker process will die on its own | |
*/ | |
cleanup() { | |
for (const socket of this.sockets.values()) { | |
try { | |
socket.destroy(); | |
} catch {} | |
} | |
this.sockets.clear(); | |
this.rooms.clear(); | |
this.roomChannels.clear(); | |
this.server.close(); | |
if (this.serverSsl) this.serverSsl.close(); | |
// Let the server(s) finish closing. | |
setImmediate(() => process.exit(0)); | |
} | |
onConnection(socket: import('sockjs').Connection) { | |
// For reasons that are not entirely clear, SockJS sometimes triggers | |
// this event with a null `socket` argument. | |
if (!socket) return; | |
if (!socket.remoteAddress) { | |
// SockJS sometimes fails to be able to cache the IP, port, and | |
// address from connection request headers. | |
try { | |
socket.destroy(); | |
} catch {} | |
return; | |
} | |
const socketid = `${++this.socketCounter}`; | |
this.sockets.set(socketid, socket); | |
let socketip = socket.remoteAddress; | |
if (this.isTrustedProxyIp(socketip)) { | |
const ips = (socket.headers['x-forwarded-for'] || '').split(',').reverse(); | |
for (const ip of ips) { | |
const proxy = ip.trim(); | |
if (!this.isTrustedProxyIp(proxy)) { | |
socketip = proxy; | |
break; | |
} | |
} | |
} | |
this.push(`*${socketid}\n${socketip}\n${socket.protocol}`); | |
socket.on('data', message => { | |
// drop empty messages (DDoS?) | |
if (!message) return; | |
// drop messages over 100KB | |
if (message.length > (100 * 1024)) { | |
socket.write(`|popup|Your message must be below 100KB`); | |
console.log(`Dropping client message ${message.length / 1024} KB...`); | |
console.log(message.slice(0, 160)); | |
return; | |
} | |
// drop legacy JSON messages | |
if (typeof message !== 'string' || message.startsWith('{')) return; | |
// drop blank messages (DDoS?) | |
const pipeIndex = message.indexOf('|'); | |
if (pipeIndex < 0 || pipeIndex === message.length - 1) return; | |
this.push(`<${socketid}\n${message}`); | |
}); | |
socket.once('close', () => { | |
this.push(`!${socketid}`); | |
this.sockets.delete(socketid); | |
for (const room of this.rooms.values()) room.delete(socketid); | |
}); | |
} | |
_write(data: string) { | |
// console.log('worker received: ' + data); | |
const receiver = this.receivers[data.charAt(0)]; | |
if (receiver), data); | |
} | |
} | |
/********************************************************* | |
* Process manager | |
*********************************************************/ | |
export const PM = new ProcessManager.RawProcessManager({ | |
module, | |
setupChild: () => new ServerStream(Config), | |
isCluster: true, | |
}); | |
if (!PM.isParentProcess) { | |
// This is a child process! | |
global.Config = (require as any)('./config-loader').Config; | |
if (Config.crashguard) { | |
// graceful crash - allow current battles to finish before restarting | |
process.on('uncaughtException', err => { | |
crashlogger(err, `Socket process ${PM.workerid} (${})`); | |
}); | |
process.on('unhandledRejection', err => { | |
crashlogger(err as any || {}, `Socket process ${PM.workerid} (${}) Promise`); | |
}); | |
} | |
if (Config.sockets) { | |
try { | |
require.resolve('node-oom-heapdump'); | |
} catch (e: any) { | |
if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen | |
throw new Error( | |
'node-oom-heapdump is not installed, but it is a required dependency if Config.ofesockets is set to true! ' + | |
'Run npm install node-oom-heapdump and restart the server.' | |
); | |
} | |
// Create a heapdump if the process runs out of memory. | |
(global as any).nodeOomHeapdump = (require as any)('node-oom-heapdump')({ | |
addTimestamp: true, | |
}); | |
} | |
// setup worker | |
if (process.env.PSPORT) Config.port = +process.env.PSPORT; | |
if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR; | |
if (process.env.PSNOSSL && parseInt(process.env.PSNOSSL)) Config.ssl = null; | |
// eslint-disable-next-line no-eval | |
Repl.start(`sockets-${PM.workerid}-${}`, cmd => eval(cmd)); | |
} | |