Spaces:
Runtime error
Runtime error
| /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex$" }] */ | |
| ; | |
| const EventEmitter = require('events'); | |
| const http = require('http'); | |
| const { Duplex } = require('stream'); | |
| const { createHash } = require('crypto'); | |
| const extension = require('./extension'); | |
| const PerMessageDeflate = require('./permessage-deflate'); | |
| const subprotocol = require('./subprotocol'); | |
| const WebSocket = require('./websocket'); | |
| const { GUID, kWebSocket } = require('./constants'); | |
| const keyRegex = /^[+/0-9A-Za-z]{22}==$/; | |
| const RUNNING = 0; | |
| const CLOSING = 1; | |
| const CLOSED = 2; | |
| /** | |
| * Class representing a WebSocket server. | |
| * | |
| * @extends EventEmitter | |
| */ | |
| class WebSocketServer extends EventEmitter { | |
| /** | |
| * Create a `WebSocketServer` instance. | |
| * | |
| * @param {Object} options Configuration options | |
| * @param {Number} [options.backlog=511] The maximum length of the queue of | |
| * pending connections | |
| * @param {Boolean} [options.clientTracking=true] Specifies whether or not to | |
| * track clients | |
| * @param {Function} [options.handleProtocols] A hook to handle protocols | |
| * @param {String} [options.host] The hostname where to bind the server | |
| * @param {Number} [options.maxPayload=104857600] The maximum allowed message | |
| * size | |
| * @param {Boolean} [options.noServer=false] Enable no server mode | |
| * @param {String} [options.path] Accept only connections matching this path | |
| * @param {(Boolean|Object)} [options.perMessageDeflate=false] Enable/disable | |
| * permessage-deflate | |
| * @param {Number} [options.port] The port where to bind the server | |
| * @param {(http.Server|https.Server)} [options.server] A pre-created HTTP/S | |
| * server to use | |
| * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or | |
| * not to skip UTF-8 validation for text and close messages | |
| * @param {Function} [options.verifyClient] A hook to reject connections | |
| * @param {Function} [options.WebSocket=WebSocket] Specifies the `WebSocket` | |
| * class to use. It must be the `WebSocket` class or class that extends it | |
| * @param {Function} [callback] A listener for the `listening` event | |
| */ | |
| constructor(options, callback) { | |
| super(); | |
| options = { | |
| maxPayload: 100 * 1024 * 1024, | |
| skipUTF8Validation: false, | |
| perMessageDeflate: false, | |
| handleProtocols: null, | |
| clientTracking: true, | |
| verifyClient: null, | |
| noServer: false, | |
| backlog: null, // use default (511 as implemented in net.js) | |
| server: null, | |
| host: null, | |
| path: null, | |
| port: null, | |
| WebSocket, | |
| ...options | |
| }; | |
| if ( | |
| (options.port == null && !options.server && !options.noServer) || | |
| (options.port != null && (options.server || options.noServer)) || | |
| (options.server && options.noServer) | |
| ) { | |
| throw new TypeError( | |
| 'One and only one of the "port", "server", or "noServer" options ' + | |
| 'must be specified' | |
| ); | |
| } | |
| if (options.port != null) { | |
| this._server = http.createServer((req, res) => { | |
| const body = http.STATUS_CODES[426]; | |
| res.writeHead(426, { | |
| 'Content-Length': body.length, | |
| 'Content-Type': 'text/plain' | |
| }); | |
| res.end(body); | |
| }); | |
| this._server.listen( | |
| options.port, | |
| options.host, | |
| options.backlog, | |
| callback | |
| ); | |
| } else if (options.server) { | |
| this._server = options.server; | |
| } | |
| if (this._server) { | |
| const emitConnection = this.emit.bind(this, 'connection'); | |
| this._removeListeners = addListeners(this._server, { | |
| listening: this.emit.bind(this, 'listening'), | |
| error: this.emit.bind(this, 'error'), | |
| upgrade: (req, socket, head) => { | |
| this.handleUpgrade(req, socket, head, emitConnection); | |
| } | |
| }); | |
| } | |
| if (options.perMessageDeflate === true) options.perMessageDeflate = {}; | |
| if (options.clientTracking) { | |
| this.clients = new Set(); | |
| this._shouldEmitClose = false; | |
| } | |
| this.options = options; | |
| this._state = RUNNING; | |
| } | |
| /** | |
| * Returns the bound address, the address family name, and port of the server | |
| * as reported by the operating system if listening on an IP socket. | |
| * If the server is listening on a pipe or UNIX domain socket, the name is | |
| * returned as a string. | |
| * | |
| * @return {(Object|String|null)} The address of the server | |
| * @public | |
| */ | |
| address() { | |
| if (this.options.noServer) { | |
| throw new Error('The server is operating in "noServer" mode'); | |
| } | |
| if (!this._server) return null; | |
| return this._server.address(); | |
| } | |
| /** | |
| * Stop the server from accepting new connections and emit the `'close'` event | |
| * when all existing connections are closed. | |
| * | |
| * @param {Function} [cb] A one-time listener for the `'close'` event | |
| * @public | |
| */ | |
| close(cb) { | |
| if (this._state === CLOSED) { | |
| if (cb) { | |
| this.once('close', () => { | |
| cb(new Error('The server is not running')); | |
| }); | |
| } | |
| process.nextTick(emitClose, this); | |
| return; | |
| } | |
| if (cb) this.once('close', cb); | |
| if (this._state === CLOSING) return; | |
| this._state = CLOSING; | |
| if (this.options.noServer || this.options.server) { | |
| if (this._server) { | |
| this._removeListeners(); | |
| this._removeListeners = this._server = null; | |
| } | |
| if (this.clients) { | |
| if (!this.clients.size) { | |
| process.nextTick(emitClose, this); | |
| } else { | |
| this._shouldEmitClose = true; | |
| } | |
| } else { | |
| process.nextTick(emitClose, this); | |
| } | |
| } else { | |
| const server = this._server; | |
| this._removeListeners(); | |
| this._removeListeners = this._server = null; | |
| // | |
| // The HTTP/S server was created internally. Close it, and rely on its | |
| // `'close'` event. | |
| // | |
| server.close(() => { | |
| emitClose(this); | |
| }); | |
| } | |
| } | |
| /** | |
| * See if a given request should be handled by this server instance. | |
| * | |
| * @param {http.IncomingMessage} req Request object to inspect | |
| * @return {Boolean} `true` if the request is valid, else `false` | |
| * @public | |
| */ | |
| shouldHandle(req) { | |
| if (this.options.path) { | |
| const index = req.url.indexOf('?'); | |
| const pathname = index !== -1 ? req.url.slice(0, index) : req.url; | |
| if (pathname !== this.options.path) return false; | |
| } | |
| return true; | |
| } | |
| /** | |
| * Handle a HTTP Upgrade request. | |
| * | |
| * @param {http.IncomingMessage} req The request object | |
| * @param {Duplex} socket The network socket between the server and client | |
| * @param {Buffer} head The first packet of the upgraded stream | |
| * @param {Function} cb Callback | |
| * @public | |
| */ | |
| handleUpgrade(req, socket, head, cb) { | |
| socket.on('error', socketOnError); | |
| const key = req.headers['sec-websocket-key']; | |
| const version = +req.headers['sec-websocket-version']; | |
| if (req.method !== 'GET') { | |
| const message = 'Invalid HTTP method'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 405, message); | |
| return; | |
| } | |
| if (req.headers.upgrade.toLowerCase() !== 'websocket') { | |
| const message = 'Invalid Upgrade header'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); | |
| return; | |
| } | |
| if (!key || !keyRegex.test(key)) { | |
| const message = 'Missing or invalid Sec-WebSocket-Key header'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); | |
| return; | |
| } | |
| if (version !== 8 && version !== 13) { | |
| const message = 'Missing or invalid Sec-WebSocket-Version header'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); | |
| return; | |
| } | |
| if (!this.shouldHandle(req)) { | |
| abortHandshake(socket, 400); | |
| return; | |
| } | |
| const secWebSocketProtocol = req.headers['sec-websocket-protocol']; | |
| let protocols = new Set(); | |
| if (secWebSocketProtocol !== undefined) { | |
| try { | |
| protocols = subprotocol.parse(secWebSocketProtocol); | |
| } catch (err) { | |
| const message = 'Invalid Sec-WebSocket-Protocol header'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); | |
| return; | |
| } | |
| } | |
| const secWebSocketExtensions = req.headers['sec-websocket-extensions']; | |
| const extensions = {}; | |
| if ( | |
| this.options.perMessageDeflate && | |
| secWebSocketExtensions !== undefined | |
| ) { | |
| const perMessageDeflate = new PerMessageDeflate( | |
| this.options.perMessageDeflate, | |
| true, | |
| this.options.maxPayload | |
| ); | |
| try { | |
| const offers = extension.parse(secWebSocketExtensions); | |
| if (offers[PerMessageDeflate.extensionName]) { | |
| perMessageDeflate.accept(offers[PerMessageDeflate.extensionName]); | |
| extensions[PerMessageDeflate.extensionName] = perMessageDeflate; | |
| } | |
| } catch (err) { | |
| const message = | |
| 'Invalid or unacceptable Sec-WebSocket-Extensions header'; | |
| abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); | |
| return; | |
| } | |
| } | |
| // | |
| // Optionally call external client verification handler. | |
| // | |
| if (this.options.verifyClient) { | |
| const info = { | |
| origin: | |
| req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`], | |
| secure: !!(req.socket.authorized || req.socket.encrypted), | |
| req | |
| }; | |
| if (this.options.verifyClient.length === 2) { | |
| this.options.verifyClient(info, (verified, code, message, headers) => { | |
| if (!verified) { | |
| return abortHandshake(socket, code || 401, message, headers); | |
| } | |
| this.completeUpgrade( | |
| extensions, | |
| key, | |
| protocols, | |
| req, | |
| socket, | |
| head, | |
| cb | |
| ); | |
| }); | |
| return; | |
| } | |
| if (!this.options.verifyClient(info)) return abortHandshake(socket, 401); | |
| } | |
| this.completeUpgrade(extensions, key, protocols, req, socket, head, cb); | |
| } | |
| /** | |
| * Upgrade the connection to WebSocket. | |
| * | |
| * @param {Object} extensions The accepted extensions | |
| * @param {String} key The value of the `Sec-WebSocket-Key` header | |
| * @param {Set} protocols The subprotocols | |
| * @param {http.IncomingMessage} req The request object | |
| * @param {Duplex} socket The network socket between the server and client | |
| * @param {Buffer} head The first packet of the upgraded stream | |
| * @param {Function} cb Callback | |
| * @throws {Error} If called more than once with the same socket | |
| * @private | |
| */ | |
| completeUpgrade(extensions, key, protocols, req, socket, head, cb) { | |
| // | |
| // Destroy the socket if the client has already sent a FIN packet. | |
| // | |
| if (!socket.readable || !socket.writable) return socket.destroy(); | |
| if (socket[kWebSocket]) { | |
| throw new Error( | |
| 'server.handleUpgrade() was called more than once with the same ' + | |
| 'socket, possibly due to a misconfiguration' | |
| ); | |
| } | |
| if (this._state > RUNNING) return abortHandshake(socket, 503); | |
| const digest = createHash('sha1') | |
| .update(key + GUID) | |
| .digest('base64'); | |
| const headers = [ | |
| 'HTTP/1.1 101 Switching Protocols', | |
| 'Upgrade: websocket', | |
| 'Connection: Upgrade', | |
| `Sec-WebSocket-Accept: ${digest}` | |
| ]; | |
| const ws = new this.options.WebSocket(null); | |
| if (protocols.size) { | |
| // | |
| // Optionally call external protocol selection handler. | |
| // | |
| const protocol = this.options.handleProtocols | |
| ? this.options.handleProtocols(protocols, req) | |
| : protocols.values().next().value; | |
| if (protocol) { | |
| headers.push(`Sec-WebSocket-Protocol: ${protocol}`); | |
| ws._protocol = protocol; | |
| } | |
| } | |
| if (extensions[PerMessageDeflate.extensionName]) { | |
| const params = extensions[PerMessageDeflate.extensionName].params; | |
| const value = extension.format({ | |
| [PerMessageDeflate.extensionName]: [params] | |
| }); | |
| headers.push(`Sec-WebSocket-Extensions: ${value}`); | |
| ws._extensions = extensions; | |
| } | |
| // | |
| // Allow external modification/inspection of handshake headers. | |
| // | |
| this.emit('headers', headers, req); | |
| socket.write(headers.concat('\r\n').join('\r\n')); | |
| socket.removeListener('error', socketOnError); | |
| ws.setSocket(socket, head, { | |
| maxPayload: this.options.maxPayload, | |
| skipUTF8Validation: this.options.skipUTF8Validation | |
| }); | |
| if (this.clients) { | |
| this.clients.add(ws); | |
| ws.on('close', () => { | |
| this.clients.delete(ws); | |
| if (this._shouldEmitClose && !this.clients.size) { | |
| process.nextTick(emitClose, this); | |
| } | |
| }); | |
| } | |
| cb(ws, req); | |
| } | |
| } | |
| module.exports = WebSocketServer; | |
| /** | |
| * Add event listeners on an `EventEmitter` using a map of <event, listener> | |
| * pairs. | |
| * | |
| * @param {EventEmitter} server The event emitter | |
| * @param {Object.<String, Function>} map The listeners to add | |
| * @return {Function} A function that will remove the added listeners when | |
| * called | |
| * @private | |
| */ | |
| function addListeners(server, map) { | |
| for (const event of Object.keys(map)) server.on(event, map[event]); | |
| return function removeListeners() { | |
| for (const event of Object.keys(map)) { | |
| server.removeListener(event, map[event]); | |
| } | |
| }; | |
| } | |
| /** | |
| * Emit a `'close'` event on an `EventEmitter`. | |
| * | |
| * @param {EventEmitter} server The event emitter | |
| * @private | |
| */ | |
| function emitClose(server) { | |
| server._state = CLOSED; | |
| server.emit('close'); | |
| } | |
| /** | |
| * Handle socket errors. | |
| * | |
| * @private | |
| */ | |
| function socketOnError() { | |
| this.destroy(); | |
| } | |
| /** | |
| * Close the connection when preconditions are not fulfilled. | |
| * | |
| * @param {Duplex} socket The socket of the upgrade request | |
| * @param {Number} code The HTTP response status code | |
| * @param {String} [message] The HTTP response body | |
| * @param {Object} [headers] Additional HTTP response headers | |
| * @private | |
| */ | |
| function abortHandshake(socket, code, message, headers) { | |
| // | |
| // The socket is writable unless the user destroyed or ended it before calling | |
| // `server.handleUpgrade()` or in the `verifyClient` function, which is a user | |
| // error. Handling this does not make much sense as the worst that can happen | |
| // is that some of the data written by the user might be discarded due to the | |
| // call to `socket.end()` below, which triggers an `'error'` event that in | |
| // turn causes the socket to be destroyed. | |
| // | |
| message = message || http.STATUS_CODES[code]; | |
| headers = { | |
| Connection: 'close', | |
| 'Content-Type': 'text/html', | |
| 'Content-Length': Buffer.byteLength(message), | |
| ...headers | |
| }; | |
| socket.once('finish', socket.destroy); | |
| socket.end( | |
| `HTTP/1.1 ${code} ${http.STATUS_CODES[code]}\r\n` + | |
| Object.keys(headers) | |
| .map((h) => `${h}: ${headers[h]}`) | |
| .join('\r\n') + | |
| '\r\n\r\n' + | |
| message | |
| ); | |
| } | |
| /** | |
| * Emit a `'wsClientError'` event on a `WebSocketServer` if there is at least | |
| * one listener for it, otherwise call `abortHandshake()`. | |
| * | |
| * @param {WebSocketServer} server The WebSocket server | |
| * @param {http.IncomingMessage} req The request object | |
| * @param {Duplex} socket The socket of the upgrade request | |
| * @param {Number} code The HTTP response status code | |
| * @param {String} message The HTTP response body | |
| * @private | |
| */ | |
| function abortHandshakeOrEmitwsClientError(server, req, socket, code, message) { | |
| if (server.listenerCount('wsClientError')) { | |
| const err = new Error(message); | |
| Error.captureStackTrace(err, abortHandshakeOrEmitwsClientError); | |
| server.emit('wsClientError', err, socket, req); | |
| } else { | |
| abortHandshake(socket, code, message); | |
| } | |
| } | |