Pokemon_server / lib /streams.ts
Jofthomas's picture
Jofthomas HF staff
Upload 4781 files
5c2ed06 verified
raw
history blame
25.6 kB
/**
* Streams
* Pokemon Showdown - http://pokemonshowdown.com/
*
* The Node.js standard library's Streams are really hard to use. This
* offers a better stream API.
*
* Documented in STREAMS.md.
*
* @license MIT
*/
const BUF_SIZE = 65536 * 4;
type BufferEncoding =
'ascii' | 'utf8' | 'utf-8' | 'utf16le' | 'ucs2' | 'ucs-2' | 'base64' | 'latin1' | 'binary' | 'hex';
export class ReadStream {
buf: Buffer;
bufStart: number;
bufEnd: number;
bufCapacity: number;
readSize: number;
atEOF: boolean;
errorBuf: Error[] | null;
encoding: BufferEncoding;
isReadable: boolean;
isWritable: boolean;
nodeReadableStream: NodeJS.ReadableStream | null;
nextPushResolver: (() => void) | null;
nextPush: Promise<void>;
awaitingPush: boolean;
constructor(optionsOrStreamLike: { [k: string]: any } | NodeJS.ReadableStream | string | Buffer = {}) {
this.buf = Buffer.allocUnsafe(BUF_SIZE);
this.bufStart = 0;
this.bufEnd = 0;
this.bufCapacity = BUF_SIZE;
this.readSize = 0;
this.atEOF = false;
this.errorBuf = null;
this.encoding = 'utf8';
this.isReadable = true;
this.isWritable = false;
this.nodeReadableStream = null;
this.nextPushResolver = null;
this.nextPush = new Promise(resolve => {
this.nextPushResolver = resolve;
});
this.awaitingPush = false;
let options: { [k: string]: any };
if (typeof optionsOrStreamLike === 'string') {
options = { buffer: optionsOrStreamLike };
} else if (optionsOrStreamLike instanceof Buffer) {
options = { buffer: optionsOrStreamLike };
} else if (typeof (optionsOrStreamLike as any)._readableState === 'object') {
options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream };
} else {
options = optionsOrStreamLike;
}
if (options.nodeStream) {
const nodeStream: NodeJS.ReadableStream = options.nodeStream;
this.nodeReadableStream = nodeStream;
nodeStream.on('data', data => {
this.push(data);
});
nodeStream.on('end', () => {
this.pushEnd();
});
options.read = function (this: ReadStream, unusedBytes: number) {
this.nodeReadableStream!.resume();
};
options.pause = function (this: ReadStream, unusedBytes: number) {
this.nodeReadableStream!.pause();
};
}
if (options.read) this._read = options.read;
if (options.pause) this._pause = options.pause;
if (options.destroy) this._destroy = options.destroy;
if (options.encoding) this.encoding = options.encoding;
if (options.buffer !== undefined) {
this.push(options.buffer);
this.pushEnd();
}
}
get bufSize() {
return this.bufEnd - this.bufStart;
}
moveBuf() {
if (this.bufStart !== this.bufEnd) {
this.buf.copy(this.buf, 0, this.bufStart, this.bufEnd);
}
this.bufEnd -= this.bufStart;
this.bufStart = 0;
}
expandBuf(newCapacity = this.bufCapacity * 2) {
const newBuf = Buffer.allocUnsafe(newCapacity);
this.buf.copy(newBuf, 0, this.bufStart, this.bufEnd);
this.bufEnd -= this.bufStart;
this.bufStart = 0;
this.bufCapacity = newCapacity;
this.buf = newBuf;
}
ensureCapacity(additionalCapacity: number) {
if (this.bufEnd + additionalCapacity <= this.bufCapacity) return;
const capacity = this.bufEnd - this.bufStart + additionalCapacity;
if (capacity <= this.bufCapacity) {
return this.moveBuf();
}
let newCapacity = this.bufCapacity * 2;
while (newCapacity < capacity) newCapacity *= 2;
this.expandBuf(newCapacity);
}
push(buf: Buffer | string, encoding: BufferEncoding = this.encoding) {
let size;
if (this.atEOF) return;
if (typeof buf === 'string') {
size = Buffer.byteLength(buf, encoding);
this.ensureCapacity(size);
this.buf.write(buf, this.bufEnd);
} else {
size = buf.length;
this.ensureCapacity(size);
buf.copy(this.buf, this.bufEnd);
}
this.bufEnd += size;
if (this.bufSize > this.readSize && size * 2 < this.bufSize) this._pause();
this.resolvePush();
}
pushEnd() {
this.atEOF = true;
this.resolvePush();
}
pushError(err: Error, recoverable?: boolean) {
if (!this.errorBuf) this.errorBuf = [];
this.errorBuf.push(err);
if (!recoverable) this.atEOF = true;
this.resolvePush();
}
readError() {
if (this.errorBuf) {
const err = this.errorBuf.shift()!;
if (!this.errorBuf.length) this.errorBuf = null;
throw err;
}
}
peekError() {
if (this.errorBuf) {
throw this.errorBuf[0];
}
}
resolvePush() {
if (!this.nextPushResolver) throw new Error(`Push after end of read stream`);
this.nextPushResolver();
if (this.atEOF) {
this.nextPushResolver = null;
return;
}
this.nextPush = new Promise(resolve => {
this.nextPushResolver = resolve;
});
}
_read(size = 0): void | Promise<void> {
throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`);
}
_destroy(): void | Promise<void> {}
_pause() {}
/**
* Reads until the internal buffer is non-empty. Does nothing if the
* internal buffer is already non-empty.
*
* If `byteCount` is a number, instead read until the internal buffer
* contains at least `byteCount` bytes.
*
* If `byteCount` is `true`, reads even if the internal buffer is
* non-empty.
*/
loadIntoBuffer(byteCount: number | null | true = null, readError?: boolean) {
this[readError ? 'readError' : 'peekError']();
if (byteCount === 0) return;
this.readSize = Math.max(
byteCount === true ? this.bufSize + 1 : byteCount === null ? 1 : byteCount,
this.readSize
);
if (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) {
let bytes: number | null = this.readSize - this.bufSize;
if (bytes === Infinity || byteCount === null || byteCount === true) bytes = null;
return this.doLoad(bytes, readError);
}
}
async doLoad(chunkSize?: number | null, readError?: boolean) {
while (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) {
if (chunkSize) void this._read(chunkSize);
else void this._read();
await this.nextPush;
this[readError ? 'readError' : 'peekError']();
}
}
peek(byteCount?: number | null, encoding?: BufferEncoding): string | null | Promise<string | null>;
peek(encoding: BufferEncoding): string | null | Promise<string | null>;
peek(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) {
if (typeof byteCount === 'string') {
encoding = byteCount as BufferEncoding;
byteCount = null;
}
const maybeLoad = this.loadIntoBuffer(byteCount);
if (maybeLoad) return maybeLoad.then(() => this.peek(byteCount as number, encoding));
if (!this.bufSize && byteCount !== 0) return null;
if (byteCount === null) return this.buf.toString(encoding, this.bufStart, this.bufEnd);
if (byteCount > this.bufSize) byteCount = this.bufSize;
return this.buf.toString(encoding, this.bufStart, this.bufStart + byteCount);
}
peekBuffer(byteCount: number | null = null): Buffer | null | Promise<Buffer | null> {
const maybeLoad = this.loadIntoBuffer(byteCount);
if (maybeLoad) return maybeLoad.then(() => this.peekBuffer(byteCount));
if (!this.bufSize && byteCount !== 0) return null;
if (byteCount === null) return this.buf.slice(this.bufStart, this.bufEnd);
if (byteCount > this.bufSize) byteCount = this.bufSize;
return this.buf.slice(this.bufStart, this.bufStart + byteCount);
}
async read(byteCount?: number | null, encoding?: BufferEncoding): Promise<string | null>;
async read(encoding: BufferEncoding): Promise<string | null>;
async read(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) {
if (typeof byteCount === 'string') {
encoding = byteCount as BufferEncoding;
byteCount = null;
}
await this.loadIntoBuffer(byteCount, true);
// This MUST NOT be awaited: we MUST synchronously clear byteCount after peeking
// if the buffer is written to after peek but before clearing the buffer, the write
// will be lost forever
const out = this.peek(byteCount, encoding);
if (out && typeof out !== 'string') {
throw new Error("Race condition; you must not read before a previous read has completed");
}
if (byteCount === null || byteCount >= this.bufSize) {
this.bufStart = 0;
this.bufEnd = 0;
this.readSize = 0;
} else {
this.bufStart += byteCount;
this.readSize -= byteCount;
}
return out;
}
byChunk(byteCount?: number | null) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const byteStream = this;
return new ObjectReadStream<string>({
async read(this: ObjectReadStream<string>) {
const next = await byteStream.read(byteCount);
if (typeof next === 'string') this.push(next);
else this.pushEnd();
},
});
}
byLine() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const byteStream = this;
return new ObjectReadStream<string>({
async read(this: ObjectReadStream<string>) {
const next = await byteStream.readLine();
if (typeof next === 'string') this.push(next);
else this.pushEnd();
},
});
}
delimitedBy(delimiter: string) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const byteStream = this;
return new ObjectReadStream<string>({
async read(this: ObjectReadStream<string>) {
const next = await byteStream.readDelimitedBy(delimiter);
if (typeof next === 'string') this.push(next);
else this.pushEnd();
},
});
}
async readBuffer(byteCount: number | null = null) {
await this.loadIntoBuffer(byteCount, true);
// This MUST NOT be awaited: we must synchronously clear the buffer after peeking
// (see `read`)
const out = this.peekBuffer(byteCount);
if (out && (out as Promise<unknown>).then) {
throw new Error("Race condition; you must not read before a previous read has completed");
}
if (byteCount === null || byteCount >= this.bufSize) {
this.bufStart = 0;
this.bufEnd = 0;
} else {
this.bufStart += byteCount;
}
return out;
}
async indexOf(symbol: string, encoding: BufferEncoding = this.encoding) {
let idx = this.buf.indexOf(symbol, this.bufStart, encoding);
while (!this.atEOF && (idx >= this.bufEnd || idx < 0)) {
await this.loadIntoBuffer(true);
idx = this.buf.indexOf(symbol, this.bufStart, encoding);
}
if (idx >= this.bufEnd) return -1;
return idx - this.bufStart;
}
async readAll(encoding: BufferEncoding = this.encoding) {
return (await this.read(Infinity, encoding)) || '';
}
peekAll(encoding: BufferEncoding = this.encoding) {
return this.peek(Infinity, encoding);
}
async readDelimitedBy(symbol: string, encoding: BufferEncoding = this.encoding) {
if (this.atEOF && !this.bufSize) return null;
const idx = await this.indexOf(symbol, encoding);
if (idx < 0) {
return this.readAll(encoding);
} else {
const out = await this.read(idx, encoding);
this.bufStart += Buffer.byteLength(symbol, 'utf8');
return out;
}
}
async readLine(encoding: BufferEncoding = this.encoding) {
if (!encoding) throw new Error(`readLine must have an encoding`);
let line = await this.readDelimitedBy('\n', encoding);
if (line?.endsWith('\r')) line = line.slice(0, -1);
return line;
}
destroy() {
this.atEOF = true;
this.bufStart = 0;
this.bufEnd = 0;
if (this.nextPushResolver) this.resolvePush();
return this._destroy();
}
async next(byteCount: number | null = null) {
const value = await this.read(byteCount);
return { value, done: value === null } as { value: string, done: false } | { value: null, done: true };
}
async pipeTo(outStream: WriteStream, options: { noEnd?: boolean } = {}) {
let next;
while ((next = await this.next(), !next.done)) {
await outStream.write(next.value);
}
if (!options.noEnd) return outStream.writeEnd();
}
}
interface WriteStreamOptions {
nodeStream?: NodeJS.WritableStream;
write?: (this: WriteStream, data: string | Buffer) => (Promise<undefined> | undefined | void);
writeEnd?: (this: WriteStream) => Promise<any>;
}
export class WriteStream {
isReadable: boolean;
isWritable: true;
encoding: BufferEncoding;
nodeWritableStream: NodeJS.WritableStream | null;
drainListeners: (() => void)[];
constructor(optionsOrStream: WriteStreamOptions | NodeJS.WritableStream = {}) {
this.isReadable = false;
this.isWritable = true;
this.encoding = 'utf8';
this.nodeWritableStream = null;
this.drainListeners = [];
let options: WriteStreamOptions = optionsOrStream as any;
if ((options as any)._writableState) {
options = { nodeStream: optionsOrStream as NodeJS.WritableStream };
}
if (options.nodeStream) {
const nodeStream: NodeJS.WritableStream = options.nodeStream;
this.nodeWritableStream = nodeStream;
options.write = function (data: string | Buffer) {
const result = this.nodeWritableStream!.write(data);
if (result !== false) return undefined;
if (!this.drainListeners.length) {
this.nodeWritableStream!.once('drain', () => {
for (const listener of this.drainListeners) listener();
this.drainListeners = [];
});
}
return new Promise(resolve => {
// `as () => void` is necessary because TypeScript thinks that it should be a function
// that takes an undefined value as its only parameter: `(value: PromiseLike<undefined> | undefined) => void`
this.drainListeners.push(resolve as () => void);
});
};
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
options.writeEnd = function () {
return new Promise<void>(resolve => {
this.nodeWritableStream!.end(() => resolve());
});
};
}
}
if (options.write) this._write = options.write;
if (options.writeEnd) this._writeEnd = options.writeEnd;
}
write(chunk: Buffer | string): void | Promise<void> {
return this._write(chunk);
}
writeLine(chunk: string): void | Promise<void> {
if (chunk === null) {
return this.writeEnd();
}
return this.write(chunk + '\n');
}
_write(chunk: Buffer | string): void | Promise<void> {
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
}
_writeEnd(): void | Promise<void> {}
async writeEnd(chunk?: string): Promise<void> {
if (chunk) {
await this.write(chunk);
}
return this._writeEnd();
}
}
export class ReadWriteStream extends ReadStream implements WriteStream {
isReadable: true;
isWritable: true;
nodeWritableStream: NodeJS.WritableStream | null;
drainListeners: (() => void)[];
constructor(options: AnyObject = {}) {
super(options);
this.isReadable = true;
this.isWritable = true;
this.nodeWritableStream = null;
this.drainListeners = [];
if (options.nodeStream) {
const nodeStream: NodeJS.WritableStream = options.nodeStream;
this.nodeWritableStream = nodeStream;
options.write = function (data: string | Buffer) {
const result = this.nodeWritableStream.write(data);
if (result !== false) return undefined;
if (!this.drainListeners.length) {
this.nodeWritableStream.once('drain', () => {
for (const listener of this.drainListeners) listener();
this.drainListeners = [];
});
}
return new Promise(resolve => {
this.drainListeners.push(resolve);
});
};
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
options.writeEnd = function () {
return new Promise<void>(resolve => {
this.nodeWritableStream.end(() => resolve());
});
};
}
}
if (options.write) this._write = options.write;
if (options.writeEnd) this._writeEnd = options.writeEnd;
}
write(chunk: Buffer | string): Promise<void> | void {
return this._write(chunk);
}
writeLine(chunk: string): Promise<void> | void {
return this.write(chunk + '\n');
}
_write(chunk: Buffer | string): Promise<void> | void {
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
}
/**
* In a ReadWriteStream, `_read` does not need to be implemented,
* because it's valid for the read stream buffer to be filled only by
* `_write`.
*/
_read(size?: number) {}
_writeEnd(): void | Promise<void> {}
async writeEnd() {
return this._writeEnd();
}
}
type ObjectReadStreamOptions<T> = {
buffer?: T[],
read?: (this: ObjectReadStream<T>) => void | Promise<void>,
pause?: (this: ObjectReadStream<T>) => void | Promise<void>,
destroy?: (this: ObjectReadStream<T>) => void | Promise<void>,
nodeStream?: undefined,
} | {
buffer?: undefined,
read?: undefined,
pause?: undefined,
destroy?: undefined,
nodeStream: NodeJS.ReadableStream,
};
export class ObjectReadStream<T> {
buf: T[];
readSize: number;
atEOF: boolean;
errorBuf: Error[] | null;
isReadable: boolean;
isWritable: boolean;
nodeReadableStream: NodeJS.ReadableStream | null;
nextPushResolver: (() => void) | null;
nextPush: Promise<void>;
awaitingPush: boolean;
constructor(optionsOrStreamLike: ObjectReadStreamOptions<T> | NodeJS.ReadableStream | T[] = {}) {
this.buf = [];
this.readSize = 0;
this.atEOF = false;
this.errorBuf = null;
this.isReadable = true;
this.isWritable = false;
this.nodeReadableStream = null;
this.nextPushResolver = null;
this.nextPush = new Promise(resolve => {
this.nextPushResolver = resolve;
});
this.awaitingPush = false;
let options: ObjectReadStreamOptions<T>;
if (Array.isArray(optionsOrStreamLike)) {
options = { buffer: optionsOrStreamLike };
} else if (typeof (optionsOrStreamLike as any)._readableState === 'object') {
options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream };
} else {
options = optionsOrStreamLike as ObjectReadStreamOptions<T>;
}
if ((options as any).nodeStream) {
const nodeStream: NodeJS.ReadableStream = (options as any).nodeStream;
this.nodeReadableStream = nodeStream;
nodeStream.on('data', data => {
this.push(data);
});
nodeStream.on('end', () => {
this.pushEnd();
});
options = {
read() {
this.nodeReadableStream!.resume();
},
pause() {
this.nodeReadableStream!.pause();
},
};
}
if (options.read) this._read = options.read;
if (options.pause) this._pause = options.pause;
if (options.destroy) this._destroy = options.destroy;
if (options.buffer !== undefined) {
this.buf = options.buffer.slice();
this.pushEnd();
}
}
push(elem: T) {
if (this.atEOF) return;
this.buf.push(elem);
if (this.buf.length > this.readSize && this.buf.length >= 16) void this._pause();
this.resolvePush();
}
pushEnd() {
this.atEOF = true;
this.resolvePush();
}
pushError(err: Error, recoverable?: boolean) {
if (!this.errorBuf) this.errorBuf = [];
this.errorBuf.push(err);
if (!recoverable) this.atEOF = true;
this.resolvePush();
}
readError() {
if (this.errorBuf) {
const err = this.errorBuf.shift()!;
if (!this.errorBuf.length) this.errorBuf = null;
throw err;
}
}
peekError() {
if (this.errorBuf) {
throw this.errorBuf[0];
}
}
resolvePush() {
if (!this.nextPushResolver) throw new Error(`Push after end of read stream`);
this.nextPushResolver();
if (this.atEOF) {
this.nextPushResolver = null;
return;
}
this.nextPush = new Promise(resolve => {
this.nextPushResolver = resolve;
});
}
_read(size = 0): void | Promise<void> {
throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`);
}
_destroy(): void | Promise<void> {}
_pause(): void | Promise<void> {}
async loadIntoBuffer(count: number | true = 1, readError?: boolean) {
this[readError ? 'readError' : 'peekError']();
if (count === true) count = this.buf.length + 1;
if (this.buf.length >= count) return;
this.readSize = Math.max(count, this.readSize);
while (!this.errorBuf && !this.atEOF && this.buf.length < this.readSize) {
const readResult = this._read();
if (readResult) {
await readResult;
} else {
await this.nextPush;
}
this[readError ? 'readError' : 'peekError']();
}
}
async peek() {
if (this.buf.length) return this.buf[0];
await this.loadIntoBuffer();
return this.buf[0];
}
async read() {
if (this.buf.length) return this.buf.shift();
await this.loadIntoBuffer(1, true);
if (!this.buf.length) return null;
return this.buf.shift()!;
}
async peekArray(count: number | null = null) {
await this.loadIntoBuffer(count === null ? 1 : count);
return this.buf.slice(0, count === null ? Infinity : count);
}
async readArray(count: number | null = null) {
await this.loadIntoBuffer(count === null ? 1 : count, true);
const out = this.buf.slice(0, count === null ? Infinity : count);
this.buf = this.buf.slice(out.length);
return out;
}
async readAll() {
await this.loadIntoBuffer(Infinity, true);
const out = this.buf;
this.buf = [];
return out;
}
async peekAll() {
await this.loadIntoBuffer(Infinity);
return this.buf.slice();
}
destroy() {
this.atEOF = true;
this.buf = [];
this.resolvePush();
return this._destroy();
}
[Symbol.asyncIterator]() { return this; }
async next() {
if (this.buf.length) return { value: this.buf.shift() as T, done: false as const };
await this.loadIntoBuffer(1, true);
if (!this.buf.length) return { value: undefined, done: true as const };
return { value: this.buf.shift() as T, done: false as const };
}
async pipeTo(outStream: ObjectWriteStream<T>, options: { noEnd?: boolean } = {}) {
let next;
while ((next = await this.next(), !next.done)) {
await outStream.write(next.value);
}
if (!options.noEnd) return outStream.writeEnd();
}
}
interface ObjectWriteStreamOptions<T> {
_writableState?: any;
nodeStream?: NodeJS.WritableStream;
write?: (this: ObjectWriteStream<T>, data: T) => Promise<any> | undefined;
writeEnd?: (this: ObjectWriteStream<T>) => Promise<any>;
}
export class ObjectWriteStream<T> {
isReadable: boolean;
isWritable: true;
nodeWritableStream: NodeJS.WritableStream | null;
constructor(optionsOrStream: ObjectWriteStreamOptions<T> | NodeJS.WritableStream = {}) {
this.isReadable = false;
this.isWritable = true;
this.nodeWritableStream = null;
let options: ObjectWriteStreamOptions<T> = optionsOrStream as any;
if (options._writableState) {
options = { nodeStream: optionsOrStream as NodeJS.WritableStream };
}
if (options.nodeStream) {
const nodeStream: NodeJS.WritableStream = options.nodeStream;
this.nodeWritableStream = nodeStream;
options.write = function (data: T) {
const result = this.nodeWritableStream!.write(data as unknown as string);
if (result === false) {
return new Promise<void>(resolve => {
this.nodeWritableStream!.once('drain', () => {
resolve();
});
});
}
};
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
options.writeEnd = function () {
return new Promise<void>(resolve => {
this.nodeWritableStream!.end(() => resolve());
});
};
}
}
if (options.write) this._write = options.write;
if (options.writeEnd) this._writeEnd = options.writeEnd;
}
write(elem: T | null): void | Promise<void> {
if (elem === null) {
return this.writeEnd();
}
return this._write(elem);
}
_write(elem: T): void | Promise<void> {
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
}
_writeEnd(): void | Promise<void> {}
async writeEnd(elem?: T): Promise<void> {
if (elem !== undefined) {
await this.write(elem);
}
return this._writeEnd();
}
}
interface ObjectReadWriteStreamOptions<T> {
read?: (this: ObjectReadStream<T>) => void | Promise<void>;
pause?: (this: ObjectReadStream<T>) => void | Promise<void>;
destroy?: (this: ObjectReadStream<T>) => void | Promise<void>;
write?: (this: ObjectWriteStream<T>, elem: T) => Promise<any> | undefined | void;
writeEnd?: () => Promise<any> | undefined | void;
}
export class ObjectReadWriteStream<T> extends ObjectReadStream<T> implements ObjectWriteStream<T> {
isReadable: true;
isWritable: true;
nodeWritableStream: NodeJS.WritableStream | null;
constructor(options: ObjectReadWriteStreamOptions<T> = {}) {
super(options);
this.isReadable = true;
this.isWritable = true;
this.nodeWritableStream = null;
if (options.write) this._write = options.write;
if (options.writeEnd) this._writeEnd = options.writeEnd;
}
write(elem: T): void | Promise<void> {
return this._write(elem);
}
_write(elem: T): void | Promise<void> {
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
}
/** In a ReadWriteStream, _read does not need to be implemented. */
_read() {}
_writeEnd(): void | Promise<void> {}
async writeEnd() {
return this._writeEnd();
}
}
export function readAll(nodeStream: NodeJS.ReadableStream, encoding?: any) {
return new ReadStream(nodeStream).readAll(encoding);
}
export function stdin() {
return new ReadStream(process.stdin);
}
export function stdout() {
return new WriteStream(process.stdout);
}
export function stdpipe(stream: WriteStream | ReadStream | ReadWriteStream) {
const promises = [];
if ((stream as ReadStream | WriteStream & { pipeTo: undefined }).pipeTo) {
promises.push((stream as ReadStream).pipeTo(stdout()));
}
if ((stream as WriteStream | ReadStream & { write: undefined }).write) {
promises.push(stdin().pipeTo(stream as WriteStream));
}
return Promise.all(promises);
}