/** * Streams * Pokemon Showdown - http://pokemonshowdown.com/ * * The Node.js standard library's Streams are really hard to use. This * offers a better stream API. * * Documented in STREAMS.md. * * @license MIT */ const BUF_SIZE = 65536 * 4; type BufferEncoding = 'ascii' | 'utf8' | 'utf-8' | 'utf16le' | 'ucs2' | 'ucs-2' | 'base64' | 'latin1' | 'binary' | 'hex'; export class ReadStream { buf: Buffer; bufStart: number; bufEnd: number; bufCapacity: number; readSize: number; atEOF: boolean; errorBuf: Error[] | null; encoding: BufferEncoding; isReadable: boolean; isWritable: boolean; nodeReadableStream: NodeJS.ReadableStream | null; nextPushResolver: (() => void) | null; nextPush: Promise; awaitingPush: boolean; constructor(optionsOrStreamLike: { [k: string]: any } | NodeJS.ReadableStream | string | Buffer = {}) { this.buf = Buffer.allocUnsafe(BUF_SIZE); this.bufStart = 0; this.bufEnd = 0; this.bufCapacity = BUF_SIZE; this.readSize = 0; this.atEOF = false; this.errorBuf = null; this.encoding = 'utf8'; this.isReadable = true; this.isWritable = false; this.nodeReadableStream = null; this.nextPushResolver = null; this.nextPush = new Promise(resolve => { this.nextPushResolver = resolve; }); this.awaitingPush = false; let options: { [k: string]: any }; if (typeof optionsOrStreamLike === 'string') { options = { buffer: optionsOrStreamLike }; } else if (optionsOrStreamLike instanceof Buffer) { options = { buffer: optionsOrStreamLike }; } else if (typeof (optionsOrStreamLike as any)._readableState === 'object') { options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream }; } else { options = optionsOrStreamLike; } if (options.nodeStream) { const nodeStream: NodeJS.ReadableStream = options.nodeStream; this.nodeReadableStream = nodeStream; nodeStream.on('data', data => { this.push(data); }); nodeStream.on('end', () => { this.pushEnd(); }); options.read = function (this: ReadStream, unusedBytes: number) { this.nodeReadableStream!.resume(); }; options.pause = function (this: ReadStream, unusedBytes: number) { this.nodeReadableStream!.pause(); }; } if (options.read) this._read = options.read; if (options.pause) this._pause = options.pause; if (options.destroy) this._destroy = options.destroy; if (options.encoding) this.encoding = options.encoding; if (options.buffer !== undefined) { this.push(options.buffer); this.pushEnd(); } } get bufSize() { return this.bufEnd - this.bufStart; } moveBuf() { if (this.bufStart !== this.bufEnd) { this.buf.copy(this.buf, 0, this.bufStart, this.bufEnd); } this.bufEnd -= this.bufStart; this.bufStart = 0; } expandBuf(newCapacity = this.bufCapacity * 2) { const newBuf = Buffer.allocUnsafe(newCapacity); this.buf.copy(newBuf, 0, this.bufStart, this.bufEnd); this.bufEnd -= this.bufStart; this.bufStart = 0; this.bufCapacity = newCapacity; this.buf = newBuf; } ensureCapacity(additionalCapacity: number) { if (this.bufEnd + additionalCapacity <= this.bufCapacity) return; const capacity = this.bufEnd - this.bufStart + additionalCapacity; if (capacity <= this.bufCapacity) { return this.moveBuf(); } let newCapacity = this.bufCapacity * 2; while (newCapacity < capacity) newCapacity *= 2; this.expandBuf(newCapacity); } push(buf: Buffer | string, encoding: BufferEncoding = this.encoding) { let size; if (this.atEOF) return; if (typeof buf === 'string') { size = Buffer.byteLength(buf, encoding); this.ensureCapacity(size); this.buf.write(buf, this.bufEnd); } else { size = buf.length; this.ensureCapacity(size); buf.copy(this.buf, this.bufEnd); } this.bufEnd += size; if (this.bufSize > this.readSize && size * 2 < this.bufSize) this._pause(); this.resolvePush(); } pushEnd() { this.atEOF = true; this.resolvePush(); } pushError(err: Error, recoverable?: boolean) { if (!this.errorBuf) this.errorBuf = []; this.errorBuf.push(err); if (!recoverable) this.atEOF = true; this.resolvePush(); } readError() { if (this.errorBuf) { const err = this.errorBuf.shift()!; if (!this.errorBuf.length) this.errorBuf = null; throw err; } } peekError() { if (this.errorBuf) { throw this.errorBuf[0]; } } resolvePush() { if (!this.nextPushResolver) throw new Error(`Push after end of read stream`); this.nextPushResolver(); if (this.atEOF) { this.nextPushResolver = null; return; } this.nextPush = new Promise(resolve => { this.nextPushResolver = resolve; }); } _read(size = 0): void | Promise { throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`); } _destroy(): void | Promise {} _pause() {} /** * Reads until the internal buffer is non-empty. Does nothing if the * internal buffer is already non-empty. * * If `byteCount` is a number, instead read until the internal buffer * contains at least `byteCount` bytes. * * If `byteCount` is `true`, reads even if the internal buffer is * non-empty. */ loadIntoBuffer(byteCount: number | null | true = null, readError?: boolean) { this[readError ? 'readError' : 'peekError'](); if (byteCount === 0) return; this.readSize = Math.max( byteCount === true ? this.bufSize + 1 : byteCount === null ? 1 : byteCount, this.readSize ); if (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) { let bytes: number | null = this.readSize - this.bufSize; if (bytes === Infinity || byteCount === null || byteCount === true) bytes = null; return this.doLoad(bytes, readError); } } async doLoad(chunkSize?: number | null, readError?: boolean) { while (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) { if (chunkSize) void this._read(chunkSize); else void this._read(); await this.nextPush; this[readError ? 'readError' : 'peekError'](); } } peek(byteCount?: number | null, encoding?: BufferEncoding): string | null | Promise; peek(encoding: BufferEncoding): string | null | Promise; peek(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) { if (typeof byteCount === 'string') { encoding = byteCount as BufferEncoding; byteCount = null; } const maybeLoad = this.loadIntoBuffer(byteCount); if (maybeLoad) return maybeLoad.then(() => this.peek(byteCount as number, encoding)); if (!this.bufSize && byteCount !== 0) return null; if (byteCount === null) return this.buf.toString(encoding, this.bufStart, this.bufEnd); if (byteCount > this.bufSize) byteCount = this.bufSize; return this.buf.toString(encoding, this.bufStart, this.bufStart + byteCount); } peekBuffer(byteCount: number | null = null): Buffer | null | Promise { const maybeLoad = this.loadIntoBuffer(byteCount); if (maybeLoad) return maybeLoad.then(() => this.peekBuffer(byteCount)); if (!this.bufSize && byteCount !== 0) return null; if (byteCount === null) return this.buf.slice(this.bufStart, this.bufEnd); if (byteCount > this.bufSize) byteCount = this.bufSize; return this.buf.slice(this.bufStart, this.bufStart + byteCount); } async read(byteCount?: number | null, encoding?: BufferEncoding): Promise; async read(encoding: BufferEncoding): Promise; async read(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) { if (typeof byteCount === 'string') { encoding = byteCount as BufferEncoding; byteCount = null; } await this.loadIntoBuffer(byteCount, true); // This MUST NOT be awaited: we MUST synchronously clear byteCount after peeking // if the buffer is written to after peek but before clearing the buffer, the write // will be lost forever const out = this.peek(byteCount, encoding); if (out && typeof out !== 'string') { throw new Error("Race condition; you must not read before a previous read has completed"); } if (byteCount === null || byteCount >= this.bufSize) { this.bufStart = 0; this.bufEnd = 0; this.readSize = 0; } else { this.bufStart += byteCount; this.readSize -= byteCount; } return out; } byChunk(byteCount?: number | null) { // eslint-disable-next-line @typescript-eslint/no-this-alias const byteStream = this; return new ObjectReadStream({ async read(this: ObjectReadStream) { const next = await byteStream.read(byteCount); if (typeof next === 'string') this.push(next); else this.pushEnd(); }, }); } byLine() { // eslint-disable-next-line @typescript-eslint/no-this-alias const byteStream = this; return new ObjectReadStream({ async read(this: ObjectReadStream) { const next = await byteStream.readLine(); if (typeof next === 'string') this.push(next); else this.pushEnd(); }, }); } delimitedBy(delimiter: string) { // eslint-disable-next-line @typescript-eslint/no-this-alias const byteStream = this; return new ObjectReadStream({ async read(this: ObjectReadStream) { const next = await byteStream.readDelimitedBy(delimiter); if (typeof next === 'string') this.push(next); else this.pushEnd(); }, }); } async readBuffer(byteCount: number | null = null) { await this.loadIntoBuffer(byteCount, true); // This MUST NOT be awaited: we must synchronously clear the buffer after peeking // (see `read`) const out = this.peekBuffer(byteCount); if (out && (out as Promise).then) { throw new Error("Race condition; you must not read before a previous read has completed"); } if (byteCount === null || byteCount >= this.bufSize) { this.bufStart = 0; this.bufEnd = 0; } else { this.bufStart += byteCount; } return out; } async indexOf(symbol: string, encoding: BufferEncoding = this.encoding) { let idx = this.buf.indexOf(symbol, this.bufStart, encoding); while (!this.atEOF && (idx >= this.bufEnd || idx < 0)) { await this.loadIntoBuffer(true); idx = this.buf.indexOf(symbol, this.bufStart, encoding); } if (idx >= this.bufEnd) return -1; return idx - this.bufStart; } async readAll(encoding: BufferEncoding = this.encoding) { return (await this.read(Infinity, encoding)) || ''; } peekAll(encoding: BufferEncoding = this.encoding) { return this.peek(Infinity, encoding); } async readDelimitedBy(symbol: string, encoding: BufferEncoding = this.encoding) { if (this.atEOF && !this.bufSize) return null; const idx = await this.indexOf(symbol, encoding); if (idx < 0) { return this.readAll(encoding); } else { const out = await this.read(idx, encoding); this.bufStart += Buffer.byteLength(symbol, 'utf8'); return out; } } async readLine(encoding: BufferEncoding = this.encoding) { if (!encoding) throw new Error(`readLine must have an encoding`); let line = await this.readDelimitedBy('\n', encoding); if (line?.endsWith('\r')) line = line.slice(0, -1); return line; } destroy() { this.atEOF = true; this.bufStart = 0; this.bufEnd = 0; if (this.nextPushResolver) this.resolvePush(); return this._destroy(); } async next(byteCount: number | null = null) { const value = await this.read(byteCount); return { value, done: value === null } as { value: string, done: false } | { value: null, done: true }; } async pipeTo(outStream: WriteStream, options: { noEnd?: boolean } = {}) { let next; while ((next = await this.next(), !next.done)) { await outStream.write(next.value); } if (!options.noEnd) return outStream.writeEnd(); } } interface WriteStreamOptions { nodeStream?: NodeJS.WritableStream; write?: (this: WriteStream, data: string | Buffer) => (Promise | undefined | void); writeEnd?: (this: WriteStream) => Promise; } export class WriteStream { isReadable: boolean; isWritable: true; encoding: BufferEncoding; nodeWritableStream: NodeJS.WritableStream | null; drainListeners: (() => void)[]; constructor(optionsOrStream: WriteStreamOptions | NodeJS.WritableStream = {}) { this.isReadable = false; this.isWritable = true; this.encoding = 'utf8'; this.nodeWritableStream = null; this.drainListeners = []; let options: WriteStreamOptions = optionsOrStream as any; if ((options as any)._writableState) { options = { nodeStream: optionsOrStream as NodeJS.WritableStream }; } if (options.nodeStream) { const nodeStream: NodeJS.WritableStream = options.nodeStream; this.nodeWritableStream = nodeStream; options.write = function (data: string | Buffer) { const result = this.nodeWritableStream!.write(data); if (result !== false) return undefined; if (!this.drainListeners.length) { this.nodeWritableStream!.once('drain', () => { for (const listener of this.drainListeners) listener(); this.drainListeners = []; }); } return new Promise(resolve => { // `as () => void` is necessary because TypeScript thinks that it should be a function // that takes an undefined value as its only parameter: `(value: PromiseLike | undefined) => void` this.drainListeners.push(resolve as () => void); }); }; // Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw if (nodeStream !== process.stdout && nodeStream !== process.stderr) { options.writeEnd = function () { return new Promise(resolve => { this.nodeWritableStream!.end(() => resolve()); }); }; } } if (options.write) this._write = options.write; if (options.writeEnd) this._writeEnd = options.writeEnd; } write(chunk: Buffer | string): void | Promise { return this._write(chunk); } writeLine(chunk: string): void | Promise { if (chunk === null) { return this.writeEnd(); } return this.write(chunk + '\n'); } _write(chunk: Buffer | string): void | Promise { throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`); } _writeEnd(): void | Promise {} async writeEnd(chunk?: string): Promise { if (chunk) { await this.write(chunk); } return this._writeEnd(); } } export class ReadWriteStream extends ReadStream implements WriteStream { isReadable: true; isWritable: true; nodeWritableStream: NodeJS.WritableStream | null; drainListeners: (() => void)[]; constructor(options: AnyObject = {}) { super(options); this.isReadable = true; this.isWritable = true; this.nodeWritableStream = null; this.drainListeners = []; if (options.nodeStream) { const nodeStream: NodeJS.WritableStream = options.nodeStream; this.nodeWritableStream = nodeStream; options.write = function (data: string | Buffer) { const result = this.nodeWritableStream.write(data); if (result !== false) return undefined; if (!this.drainListeners.length) { this.nodeWritableStream.once('drain', () => { for (const listener of this.drainListeners) listener(); this.drainListeners = []; }); } return new Promise(resolve => { this.drainListeners.push(resolve); }); }; // Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw if (nodeStream !== process.stdout && nodeStream !== process.stderr) { options.writeEnd = function () { return new Promise(resolve => { this.nodeWritableStream.end(() => resolve()); }); }; } } if (options.write) this._write = options.write; if (options.writeEnd) this._writeEnd = options.writeEnd; } write(chunk: Buffer | string): Promise | void { return this._write(chunk); } writeLine(chunk: string): Promise | void { return this.write(chunk + '\n'); } _write(chunk: Buffer | string): Promise | void { throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`); } /** * In a ReadWriteStream, `_read` does not need to be implemented, * because it's valid for the read stream buffer to be filled only by * `_write`. */ _read(size?: number) {} _writeEnd(): void | Promise {} async writeEnd() { return this._writeEnd(); } } type ObjectReadStreamOptions = { buffer?: T[], read?: (this: ObjectReadStream) => void | Promise, pause?: (this: ObjectReadStream) => void | Promise, destroy?: (this: ObjectReadStream) => void | Promise, nodeStream?: undefined, } | { buffer?: undefined, read?: undefined, pause?: undefined, destroy?: undefined, nodeStream: NodeJS.ReadableStream, }; export class ObjectReadStream { buf: T[]; readSize: number; atEOF: boolean; errorBuf: Error[] | null; isReadable: boolean; isWritable: boolean; nodeReadableStream: NodeJS.ReadableStream | null; nextPushResolver: (() => void) | null; nextPush: Promise; awaitingPush: boolean; constructor(optionsOrStreamLike: ObjectReadStreamOptions | NodeJS.ReadableStream | T[] = {}) { this.buf = []; this.readSize = 0; this.atEOF = false; this.errorBuf = null; this.isReadable = true; this.isWritable = false; this.nodeReadableStream = null; this.nextPushResolver = null; this.nextPush = new Promise(resolve => { this.nextPushResolver = resolve; }); this.awaitingPush = false; let options: ObjectReadStreamOptions; if (Array.isArray(optionsOrStreamLike)) { options = { buffer: optionsOrStreamLike }; } else if (typeof (optionsOrStreamLike as any)._readableState === 'object') { options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream }; } else { options = optionsOrStreamLike as ObjectReadStreamOptions; } if ((options as any).nodeStream) { const nodeStream: NodeJS.ReadableStream = (options as any).nodeStream; this.nodeReadableStream = nodeStream; nodeStream.on('data', data => { this.push(data); }); nodeStream.on('end', () => { this.pushEnd(); }); options = { read() { this.nodeReadableStream!.resume(); }, pause() { this.nodeReadableStream!.pause(); }, }; } if (options.read) this._read = options.read; if (options.pause) this._pause = options.pause; if (options.destroy) this._destroy = options.destroy; if (options.buffer !== undefined) { this.buf = options.buffer.slice(); this.pushEnd(); } } push(elem: T) { if (this.atEOF) return; this.buf.push(elem); if (this.buf.length > this.readSize && this.buf.length >= 16) void this._pause(); this.resolvePush(); } pushEnd() { this.atEOF = true; this.resolvePush(); } pushError(err: Error, recoverable?: boolean) { if (!this.errorBuf) this.errorBuf = []; this.errorBuf.push(err); if (!recoverable) this.atEOF = true; this.resolvePush(); } readError() { if (this.errorBuf) { const err = this.errorBuf.shift()!; if (!this.errorBuf.length) this.errorBuf = null; throw err; } } peekError() { if (this.errorBuf) { throw this.errorBuf[0]; } } resolvePush() { if (!this.nextPushResolver) throw new Error(`Push after end of read stream`); this.nextPushResolver(); if (this.atEOF) { this.nextPushResolver = null; return; } this.nextPush = new Promise(resolve => { this.nextPushResolver = resolve; }); } _read(size = 0): void | Promise { throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`); } _destroy(): void | Promise {} _pause(): void | Promise {} async loadIntoBuffer(count: number | true = 1, readError?: boolean) { this[readError ? 'readError' : 'peekError'](); if (count === true) count = this.buf.length + 1; if (this.buf.length >= count) return; this.readSize = Math.max(count, this.readSize); while (!this.errorBuf && !this.atEOF && this.buf.length < this.readSize) { const readResult = this._read(); if (readResult) { await readResult; } else { await this.nextPush; } this[readError ? 'readError' : 'peekError'](); } } async peek() { if (this.buf.length) return this.buf[0]; await this.loadIntoBuffer(); return this.buf[0]; } async read() { if (this.buf.length) return this.buf.shift(); await this.loadIntoBuffer(1, true); if (!this.buf.length) return null; return this.buf.shift()!; } async peekArray(count: number | null = null) { await this.loadIntoBuffer(count === null ? 1 : count); return this.buf.slice(0, count === null ? Infinity : count); } async readArray(count: number | null = null) { await this.loadIntoBuffer(count === null ? 1 : count, true); const out = this.buf.slice(0, count === null ? Infinity : count); this.buf = this.buf.slice(out.length); return out; } async readAll() { await this.loadIntoBuffer(Infinity, true); const out = this.buf; this.buf = []; return out; } async peekAll() { await this.loadIntoBuffer(Infinity); return this.buf.slice(); } destroy() { this.atEOF = true; this.buf = []; this.resolvePush(); return this._destroy(); } [Symbol.asyncIterator]() { return this; } async next() { if (this.buf.length) return { value: this.buf.shift() as T, done: false as const }; await this.loadIntoBuffer(1, true); if (!this.buf.length) return { value: undefined, done: true as const }; return { value: this.buf.shift() as T, done: false as const }; } async pipeTo(outStream: ObjectWriteStream, options: { noEnd?: boolean } = {}) { let next; while ((next = await this.next(), !next.done)) { await outStream.write(next.value); } if (!options.noEnd) return outStream.writeEnd(); } } interface ObjectWriteStreamOptions { _writableState?: any; nodeStream?: NodeJS.WritableStream; write?: (this: ObjectWriteStream, data: T) => Promise | undefined; writeEnd?: (this: ObjectWriteStream) => Promise; } export class ObjectWriteStream { isReadable: boolean; isWritable: true; nodeWritableStream: NodeJS.WritableStream | null; constructor(optionsOrStream: ObjectWriteStreamOptions | NodeJS.WritableStream = {}) { this.isReadable = false; this.isWritable = true; this.nodeWritableStream = null; let options: ObjectWriteStreamOptions = optionsOrStream as any; if (options._writableState) { options = { nodeStream: optionsOrStream as NodeJS.WritableStream }; } if (options.nodeStream) { const nodeStream: NodeJS.WritableStream = options.nodeStream; this.nodeWritableStream = nodeStream; options.write = function (data: T) { const result = this.nodeWritableStream!.write(data as unknown as string); if (result === false) { return new Promise(resolve => { this.nodeWritableStream!.once('drain', () => { resolve(); }); }); } }; // Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw if (nodeStream !== process.stdout && nodeStream !== process.stderr) { options.writeEnd = function () { return new Promise(resolve => { this.nodeWritableStream!.end(() => resolve()); }); }; } } if (options.write) this._write = options.write; if (options.writeEnd) this._writeEnd = options.writeEnd; } write(elem: T | null): void | Promise { if (elem === null) { return this.writeEnd(); } return this._write(elem); } _write(elem: T): void | Promise { throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`); } _writeEnd(): void | Promise {} async writeEnd(elem?: T): Promise { if (elem !== undefined) { await this.write(elem); } return this._writeEnd(); } } interface ObjectReadWriteStreamOptions { read?: (this: ObjectReadStream) => void | Promise; pause?: (this: ObjectReadStream) => void | Promise; destroy?: (this: ObjectReadStream) => void | Promise; write?: (this: ObjectWriteStream, elem: T) => Promise | undefined | void; writeEnd?: () => Promise | undefined | void; } export class ObjectReadWriteStream extends ObjectReadStream implements ObjectWriteStream { isReadable: true; isWritable: true; nodeWritableStream: NodeJS.WritableStream | null; constructor(options: ObjectReadWriteStreamOptions = {}) { super(options); this.isReadable = true; this.isWritable = true; this.nodeWritableStream = null; if (options.write) this._write = options.write; if (options.writeEnd) this._writeEnd = options.writeEnd; } write(elem: T): void | Promise { return this._write(elem); } _write(elem: T): void | Promise { throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`); } /** In a ReadWriteStream, _read does not need to be implemented. */ _read() {} _writeEnd(): void | Promise {} async writeEnd() { return this._writeEnd(); } } export function readAll(nodeStream: NodeJS.ReadableStream, encoding?: any) { return new ReadStream(nodeStream).readAll(encoding); } export function stdin() { return new ReadStream(process.stdin); } export function stdout() { return new WriteStream(process.stdout); } export function stdpipe(stream: WriteStream | ReadStream | ReadWriteStream) { const promises = []; if ((stream as ReadStream | WriteStream & { pipeTo: undefined }).pipeTo) { promises.push((stream as ReadStream).pipeTo(stdout())); } if ((stream as WriteStream | ReadStream & { write: undefined }).write) { promises.push(stdin().pipeTo(stream as WriteStream)); } return Promise.all(promises); }