|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import * as fs from 'fs'; |
|
import * as http from 'http'; |
|
import * as https from 'https'; |
|
import * as path from 'path'; |
|
import { crashlogger, ProcessManager, Streams, Repl } from '../lib'; |
|
import { IPTools } from './ip-tools'; |
|
import { type ChannelID, extractChannelMessages } from '../sim/battle'; |
|
|
|
type StreamWorker = ProcessManager.StreamWorker; |
|
|
|
export const Sockets = new class { |
|
async onSpawn(worker: StreamWorker) { |
|
const id = worker.workerid; |
|
for await (const data of worker.stream) { |
|
switch (data.charAt(0)) { |
|
case '*': { |
|
|
|
|
|
worker.load++; |
|
const [socketid, ip, protocol] = data.substr(1).split('\n'); |
|
Users.socketConnect(worker, id, socketid, ip, protocol); |
|
break; |
|
} |
|
|
|
case '!': { |
|
|
|
|
|
worker.load--; |
|
const socketid = data.substr(1); |
|
Users.socketDisconnect(worker, id, socketid); |
|
break; |
|
} |
|
|
|
case '<': { |
|
|
|
|
|
const idx = data.indexOf('\n'); |
|
const socketid = data.substr(1, idx - 1); |
|
const message = data.substr(idx + 1); |
|
Users.socketReceive(worker, id, socketid, message); |
|
break; |
|
} |
|
|
|
default: |
|
|
|
} |
|
} |
|
} |
|
onUnspawn(this: void, worker: StreamWorker) { |
|
Users.socketDisconnectAll(worker, worker.workerid); |
|
} |
|
|
|
listen(port?: number, bindAddress?: string, workerCount?: number) { |
|
if (port !== undefined && !isNaN(port)) { |
|
Config.port = port; |
|
Config.ssl = null; |
|
} else { |
|
port = Config.port; |
|
|
|
|
|
try { |
|
const cloudenv = (require as any)('cloud-env'); |
|
bindAddress = cloudenv.get('IP', bindAddress); |
|
port = cloudenv.get('PORT', port); |
|
} catch {} |
|
} |
|
if (bindAddress !== undefined) { |
|
Config.bindaddress = bindAddress; |
|
} |
|
if (port !== undefined) { |
|
Config.port = port; |
|
} |
|
if (workerCount === undefined) { |
|
workerCount = (Config.workers !== undefined ? Config.workers : 1); |
|
} |
|
|
|
PM.env = { PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '0.0.0.0', PSNOSSL: Config.ssl ? 0 : 1 }; |
|
PM.subscribeSpawn(worker => void this.onSpawn(worker)); |
|
PM.subscribeUnspawn(this.onUnspawn); |
|
|
|
PM.spawn(workerCount); |
|
} |
|
|
|
socketSend(worker: StreamWorker, socketid: string, message: string) { |
|
void worker.stream.write(`>${socketid}\n${message}`); |
|
} |
|
|
|
socketDisconnect(worker: StreamWorker, socketid: string) { |
|
void worker.stream.write(`!${socketid}`); |
|
} |
|
|
|
roomBroadcast(roomid: RoomID, message: string) { |
|
for (const worker of PM.workers) { |
|
void worker.stream.write(`#${roomid}\n${message}`); |
|
} |
|
} |
|
|
|
roomAdd(worker: StreamWorker, roomid: RoomID, socketid: string) { |
|
void worker.stream.write(`+${roomid}\n${socketid}`); |
|
} |
|
|
|
roomRemove(worker: StreamWorker, roomid: RoomID, socketid: string) { |
|
void worker.stream.write(`-${roomid}\n${socketid}`); |
|
} |
|
|
|
channelBroadcast(roomid: RoomID, message: string) { |
|
for (const worker of PM.workers) { |
|
void worker.stream.write(`:${roomid}\n${message}`); |
|
} |
|
} |
|
|
|
channelMove(worker: StreamWorker, roomid: RoomID, channelid: ChannelID, socketid: string) { |
|
void worker.stream.write(`.${roomid}\n${channelid}\n${socketid}`); |
|
} |
|
|
|
eval(worker: StreamWorker, query: string) { |
|
void worker.stream.write(`$${query}`); |
|
} |
|
}; |
|
|
|
export class ServerStream extends Streams.ObjectReadWriteStream<string> { |
|
|
|
sockets = new Map<string, import('sockjs').Connection>(); |
|
|
|
rooms = new Map<RoomID, Map<string, import('sockjs').Connection>>(); |
|
|
|
roomChannels = new Map<RoomID, Map<string, ChannelID>>(); |
|
|
|
server: http.Server; |
|
serverSsl: https.Server | null; |
|
socketCounter = 0; |
|
|
|
isTrustedProxyIp: (ip: string) => boolean; |
|
|
|
receivers: { [k: string]: (this: ServerStream, data: string) => void } = { |
|
'$'(data) { |
|
|
|
|
|
eval(data.substr(1)); |
|
}, |
|
'!'(data) { |
|
|
|
|
|
const socketid = data.substr(1); |
|
const socket = this.sockets.get(socketid); |
|
if (!socket) return; |
|
socket.destroy(); |
|
this.sockets.delete(socketid); |
|
for (const [curRoomid, curRoom] of this.rooms) { |
|
curRoom.delete(socketid); |
|
const roomChannel = this.roomChannels.get(curRoomid); |
|
if (roomChannel) roomChannel.delete(socketid); |
|
if (!curRoom.size) { |
|
this.rooms.delete(curRoomid); |
|
if (roomChannel) this.roomChannels.delete(curRoomid); |
|
} |
|
} |
|
}, |
|
'>'(data) { |
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const socketid = data.substr(1, nlLoc - 1); |
|
const socket = this.sockets.get(socketid); |
|
if (!socket) return; |
|
const message = data.substr(nlLoc + 1); |
|
socket.write(message); |
|
}, |
|
'#'(data) { |
|
|
|
|
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const roomid = data.substr(1, nlLoc - 1) as RoomID; |
|
const room = roomid ? this.rooms.get(roomid) : this.sockets; |
|
if (!room) return; |
|
const message = data.substr(nlLoc + 1); |
|
for (const curSocket of room.values()) curSocket.write(message); |
|
}, |
|
'+'(data) { |
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const socketid = data.substr(nlLoc + 1); |
|
const socket = this.sockets.get(socketid); |
|
if (!socket) return; |
|
const roomid = data.substr(1, nlLoc - 1) as RoomID; |
|
let room = this.rooms.get(roomid); |
|
if (!room) { |
|
room = new Map(); |
|
this.rooms.set(roomid, room); |
|
} |
|
room.set(socketid, socket); |
|
}, |
|
'-'(data) { |
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const roomid = data.slice(1, nlLoc) as RoomID; |
|
const room = this.rooms.get(roomid); |
|
if (!room) return; |
|
const socketid = data.slice(nlLoc + 1); |
|
room.delete(socketid); |
|
const roomChannel = this.roomChannels.get(roomid); |
|
if (roomChannel) roomChannel.delete(socketid); |
|
if (!room.size) { |
|
this.rooms.delete(roomid); |
|
if (roomChannel) this.roomChannels.delete(roomid); |
|
} |
|
}, |
|
'.'(data) { |
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const roomid = data.slice(1, nlLoc) as RoomID; |
|
const nlLoc2 = data.indexOf('\n', nlLoc + 1); |
|
const channelid = Number(data.slice(nlLoc + 1, nlLoc2)) as ChannelID; |
|
const socketid = data.slice(nlLoc2 + 1); |
|
|
|
let roomChannel = this.roomChannels.get(roomid); |
|
if (!roomChannel) { |
|
roomChannel = new Map(); |
|
this.roomChannels.set(roomid, roomChannel); |
|
} |
|
if (channelid === 0) { |
|
roomChannel.delete(socketid); |
|
} else { |
|
roomChannel.set(socketid, channelid); |
|
} |
|
}, |
|
':'(data) { |
|
|
|
|
|
const nlLoc = data.indexOf('\n'); |
|
const roomid = data.slice(1, nlLoc) as RoomID; |
|
const room = this.rooms.get(roomid); |
|
if (!room) return; |
|
|
|
const messages: [string | null, string | null, string | null, string | null, string | null] = [ |
|
null, null, null, null, null, |
|
]; |
|
const message = data.substr(nlLoc + 1); |
|
const channelMessages = extractChannelMessages(message, [0, 1, 2, 3, 4]); |
|
const roomChannel = this.roomChannels.get(roomid); |
|
for (const [curSocketid, curSocket] of room) { |
|
const channelid = roomChannel?.get(curSocketid) || 0; |
|
if (!messages[channelid]) messages[channelid] = channelMessages[channelid].join('\n'); |
|
curSocket.write(messages[channelid]); |
|
} |
|
}, |
|
}; |
|
|
|
constructor(config: { |
|
port: number, |
|
bindaddress?: string, |
|
ssl?: typeof Config.ssl, |
|
wsdeflate?: typeof Config.wsdeflate, |
|
proxyip?: typeof Config.proxyip, |
|
customhttpresponse?: typeof Config.customhttpresponse, |
|
disablenodestatic?: boolean, |
|
}) { |
|
super(); |
|
if (!config.bindaddress) config.bindaddress = '0.0.0.0'; |
|
|
|
this.isTrustedProxyIp = config.proxyip ? IPTools.checker(config.proxyip) : () => false; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this.server = http.createServer(); |
|
this.serverSsl = null; |
|
if (config.ssl) { |
|
let key; |
|
try { |
|
key = path.resolve(__dirname, config.ssl.options.key); |
|
if (!fs.statSync(key).isFile()) throw new Error(); |
|
try { |
|
key = fs.readFileSync(key); |
|
} catch (e: any) { |
|
crashlogger( |
|
new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`), |
|
`Socket process ${process.pid}` |
|
); |
|
} |
|
} catch { |
|
console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); |
|
key = config.ssl.options.key; |
|
} |
|
|
|
let cert; |
|
try { |
|
cert = path.resolve(__dirname, config.ssl.options.cert); |
|
if (!fs.statSync(cert).isFile()) throw new Error(); |
|
try { |
|
cert = fs.readFileSync(cert); |
|
} catch (e: any) { |
|
crashlogger( |
|
new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`), |
|
`Socket process ${process.pid}` |
|
); |
|
} |
|
} catch { |
|
console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.'); |
|
cert = config.ssl.options.cert; |
|
} |
|
|
|
if (key && cert) { |
|
try { |
|
|
|
this.serverSsl = https.createServer({ ...config.ssl.options, key, cert }); |
|
} catch (e: any) { |
|
crashlogger(new Error(`The SSL settings are misconfigured:\n${e.stack}`), `Socket process ${process.pid}`); |
|
} |
|
} |
|
} |
|
|
|
|
|
try { |
|
if (config.disablenodestatic) throw new Error("disablenodestatic"); |
|
const StaticServer: typeof import('node-static').Server = require('node-static').Server; |
|
const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/; |
|
const cssServer = new StaticServer('./config'); |
|
const avatarServer = new StaticServer('./config/avatars'); |
|
const staticServer = new StaticServer('./server/static'); |
|
const staticRequestHandler = (req: http.IncomingMessage, res: http.ServerResponse) => { |
|
|
|
req.resume(); |
|
req.addListener('end', () => { |
|
if (config.customhttpresponse?.(req, res)) { |
|
return; |
|
} |
|
|
|
let server = staticServer; |
|
if (req.url) { |
|
if (req.url === '/custom.css') { |
|
server = cssServer; |
|
} else if (req.url.startsWith('/avatars/')) { |
|
req.url = req.url.substr(8); |
|
server = avatarServer; |
|
} else if (roomidRegex.test(req.url)) { |
|
req.url = '/'; |
|
} |
|
} |
|
|
|
server.serve(req, res, e => { |
|
if (e && (e as any).status === 404) { |
|
staticServer.serveFile('404.html', 404, {}, req, res); |
|
} |
|
}); |
|
}); |
|
}; |
|
|
|
this.server.on('request', staticRequestHandler); |
|
if (this.serverSsl) this.serverSsl.on('request', staticRequestHandler); |
|
} catch (e: any) { |
|
if (e.message === 'disablenodestatic') { |
|
console.log('node-static is disabled'); |
|
} else { |
|
console.log('Could not start node-static - try `npm install` if you want to use it'); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
const sockjs: typeof import('sockjs') = (require as any)('sockjs'); |
|
const options: import('sockjs').ServerOptions & { faye_server_options?: { [key: string]: any } } = { |
|
sockjs_url: `//play.pokemonshowdown.com/js/lib/sockjs-1.4.0-nwjsfix.min.js`, |
|
prefix: '/showdown', |
|
log(severity: string, message: string) { |
|
if (severity === 'error') console.log(`ERROR: ${message}`); |
|
}, |
|
}; |
|
|
|
if (config.wsdeflate !== null) { |
|
try { |
|
const deflate = (require as any)('permessage-deflate').configure(config.wsdeflate); |
|
options.faye_server_options = { extensions: [deflate] }; |
|
} catch { |
|
crashlogger( |
|
new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."), |
|
"Sockets" |
|
); |
|
} |
|
} |
|
|
|
const server = sockjs.createServer(options); |
|
|
|
process.once('disconnect', () => this.cleanup()); |
|
process.once('exit', () => this.cleanup()); |
|
|
|
|
|
server.on('connection', connection => this.onConnection(connection)); |
|
server.installHandlers(this.server, {}); |
|
this.server.listen(config.port, config.bindaddress); |
|
console.log(`Worker ${PM.workerid} now listening on ${config.bindaddress}:${config.port}`); |
|
|
|
if (this.serverSsl) { |
|
server.installHandlers(this.serverSsl, {}); |
|
|
|
this.serverSsl.listen(config.ssl.port, config.bindaddress); |
|
|
|
console.log(`Worker ${PM.workerid} now listening for SSL on port ${config.ssl.port}`); |
|
} |
|
|
|
console.log(`Test your server at http://${config.bindaddress === '0.0.0.0' ? 'localhost' : config.bindaddress}:${config.port}`); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
cleanup() { |
|
for (const socket of this.sockets.values()) { |
|
try { |
|
socket.destroy(); |
|
} catch {} |
|
} |
|
this.sockets.clear(); |
|
this.rooms.clear(); |
|
this.roomChannels.clear(); |
|
|
|
this.server.close(); |
|
if (this.serverSsl) this.serverSsl.close(); |
|
|
|
|
|
setImmediate(() => process.exit(0)); |
|
} |
|
|
|
onConnection(socket: import('sockjs').Connection) { |
|
|
|
|
|
if (!socket) return; |
|
|
|
if (!socket.remoteAddress) { |
|
|
|
|
|
try { |
|
socket.destroy(); |
|
} catch {} |
|
return; |
|
} |
|
|
|
const socketid = `${++this.socketCounter}`; |
|
this.sockets.set(socketid, socket); |
|
|
|
let socketip = socket.remoteAddress; |
|
if (this.isTrustedProxyIp(socketip)) { |
|
const ips = (socket.headers['x-forwarded-for'] || '').split(',').reverse(); |
|
for (const ip of ips) { |
|
const proxy = ip.trim(); |
|
if (!this.isTrustedProxyIp(proxy)) { |
|
socketip = proxy; |
|
break; |
|
} |
|
} |
|
} |
|
|
|
this.push(`*${socketid}\n${socketip}\n${socket.protocol}`); |
|
|
|
socket.on('data', message => { |
|
|
|
if (!message) return; |
|
|
|
if (message.length > (100 * 1024)) { |
|
socket.write(`|popup|Your message must be below 100KB`); |
|
console.log(`Dropping client message ${message.length / 1024} KB...`); |
|
console.log(message.slice(0, 160)); |
|
return; |
|
} |
|
|
|
if (typeof message !== 'string' || message.startsWith('{')) return; |
|
|
|
const pipeIndex = message.indexOf('|'); |
|
if (pipeIndex < 0 || pipeIndex === message.length - 1) return; |
|
|
|
this.push(`<${socketid}\n${message}`); |
|
}); |
|
|
|
socket.once('close', () => { |
|
this.push(`!${socketid}`); |
|
this.sockets.delete(socketid); |
|
for (const room of this.rooms.values()) room.delete(socketid); |
|
}); |
|
} |
|
|
|
_write(data: string) { |
|
|
|
|
|
const receiver = this.receivers[data.charAt(0)]; |
|
if (receiver) receiver.call(this, data); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
export const PM = new ProcessManager.RawProcessManager({ |
|
module, |
|
setupChild: () => new ServerStream(Config), |
|
isCluster: true, |
|
}); |
|
|
|
if (!PM.isParentProcess) { |
|
|
|
global.Config = (require as any)('./config-loader').Config; |
|
|
|
if (Config.crashguard) { |
|
|
|
process.on('uncaughtException', err => { |
|
crashlogger(err, `Socket process ${PM.workerid} (${process.pid})`); |
|
}); |
|
process.on('unhandledRejection', err => { |
|
crashlogger(err as any || {}, `Socket process ${PM.workerid} (${process.pid}) Promise`); |
|
}); |
|
} |
|
|
|
if (Config.sockets) { |
|
try { |
|
require.resolve('node-oom-heapdump'); |
|
} catch (e: any) { |
|
if (e.code !== 'MODULE_NOT_FOUND') throw e; |
|
throw new Error( |
|
'node-oom-heapdump is not installed, but it is a required dependency if Config.ofesockets is set to true! ' + |
|
'Run npm install node-oom-heapdump and restart the server.' |
|
); |
|
} |
|
|
|
|
|
(global as any).nodeOomHeapdump = (require as any)('node-oom-heapdump')({ |
|
addTimestamp: true, |
|
}); |
|
} |
|
|
|
|
|
if (process.env.PSPORT) Config.port = +process.env.PSPORT; |
|
if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR; |
|
if (process.env.PSNOSSL && parseInt(process.env.PSNOSSL)) Config.ssl = null; |
|
|
|
|
|
Repl.start(`sockets-${PM.workerid}-${process.pid}`, cmd => eval(cmd)); |
|
} |
|
|