Spaces:
Running
Running
/** | |
* Process Manager | |
* Pokemon Showdown - http://pokemonshowdown.com/ | |
* | |
* This file abstract out multiprocess logic involved in several tasks. | |
* | |
* Child processes can be queried. | |
* | |
* @license MIT | |
*/ | |
import * as child_process from 'child_process'; | |
import * as cluster from 'cluster'; | |
import * as path from 'path'; | |
import * as Streams from './streams'; | |
import { FS } from './fs'; | |
type ChildProcess = child_process.ChildProcess; | |
type Worker = cluster.Worker; | |
export const processManagers: ProcessManager[] = []; | |
export function exec( | |
args: string, execOptions?: child_process.ExecOptions | |
): Promise<{ stderr: string, stdout: string }>; | |
export function exec( | |
args: [string, ...string[]], execOptions?: child_process.ExecFileOptions | |
): Promise<{ stderr: string, stdout: string }>; | |
export function exec(args: string | string[], execOptions?: AnyObject) { | |
if (Array.isArray(args)) { | |
const cmd = args.shift(); | |
if (!cmd) throw new Error(`You must pass a command to ProcessManager.exec.`); | |
return new Promise<{ stderr: string, stdout: string }>((resolve, reject) => { | |
child_process.execFile(cmd, args, execOptions, (err, stdout, stderr) => { | |
if (err) reject(err); | |
if (typeof stdout !== 'string') stdout = stdout.toString(); | |
if (typeof stderr !== 'string') stderr = stderr.toString(); | |
resolve({ stdout, stderr }); | |
}); | |
}); | |
} else { | |
return new Promise<string>((resolve, reject) => { | |
child_process.exec(args, execOptions, (error, stdout, stderr) => { | |
if (error) reject(error); | |
if (typeof stdout !== 'string') stdout = stdout.toString(); | |
resolve(stdout); | |
}); | |
}); | |
} | |
} | |
class SubprocessStream extends Streams.ObjectReadWriteStream<string> { | |
process: StreamProcessWrapper; | |
taskId: number; | |
constructor(process: StreamProcessWrapper, taskId: number) { | |
super(); | |
this.process = process; | |
this.taskId = taskId; | |
this.process.process.send(`${taskId}\nNEW`); | |
} | |
_write(message: string) { | |
if (!this.process.process.connected) { | |
this.pushError(new Error(`Process disconnected (possibly crashed?)`)); | |
return; | |
} | |
this.process.process.send(`${this.taskId}\nWRITE\n${message}`); | |
// responses are handled in ProcessWrapper | |
} | |
_writeEnd() { | |
this.process.process.send(`${this.taskId}\nWRITEEND`); | |
} | |
_destroy() { | |
if (!this.process.process.connected) return; | |
this.process.process.send(`${this.taskId}\nDESTROY`); | |
this.process.deleteStream(this.taskId); | |
this.process = null!; | |
} | |
} | |
class RawSubprocessStream extends Streams.ObjectReadWriteStream<string> { | |
process: RawProcessWrapper; | |
constructor(process: RawProcessWrapper) { | |
super(); | |
this.process = process; | |
} | |
_write(message: string) { | |
if (!this.process.getProcess().connected) { | |
// no error because the crash handler should already have shown an error, and | |
// sometimes harmless messages are sent during cleanup | |
return; | |
} | |
this.process.process.send(message); | |
// responses are handled in ProcessWrapper | |
} | |
} | |
export interface ProcessWrapper { | |
getLoad: () => number; | |
process: ChildProcess | Worker; | |
release: () => Promise<void>; | |
getProcess: () => ChildProcess; | |
} | |
/** Wraps the process object in the PARENT process. */ | |
export class QueryProcessWrapper<T, U> implements ProcessWrapper { | |
process: ChildProcess; | |
taskId: number; | |
pendingTasks: Map<number, (resp: U) => void>; | |
messageCallback: ((message: string) => any) | null; | |
pendingRelease: Promise<void> | null; | |
resolveRelease: (() => void) | null; | |
debug?: string; | |
file: string; | |
constructor(file: string, messageCallback?: (message: string) => any) { | |
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH }); | |
this.taskId = 0; | |
this.file = file; | |
this.pendingTasks = new Map(); | |
this.pendingRelease = null; | |
this.resolveRelease = null; | |
this.messageCallback = messageCallback || null; | |
this.process.on('message', (message: string) => { | |
if (message.startsWith('THROW\n')) { | |
const error = new Error(); | |
error.stack = message.slice(6); | |
throw error; | |
} | |
if (message.startsWith('DEBUG\n')) { | |
this.debug = message.slice(6); | |
return; | |
} | |
if (this.messageCallback && message.startsWith(`CALLBACK\n`)) { | |
this.messageCallback(message.slice(9)); | |
return; | |
} | |
const nlLoc = message.indexOf('\n'); | |
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`); | |
const taskId = parseInt(message.slice(0, nlLoc)); | |
const resolve = this.pendingTasks.get(taskId); | |
if (!resolve) throw new Error(`Invalid taskId ${message.slice(0, nlLoc)}`); | |
this.pendingTasks.delete(taskId); | |
const resp = this.safeJSON(message.slice(nlLoc + 1)); | |
resolve(resp); | |
if (this.resolveRelease && !this.getLoad()) this.destroy(); | |
}); | |
} | |
safeJSON(obj: string): any { | |
// special cases? undefined should strictly be fine | |
// so let's just return it since we can't parse it | |
if (obj === "undefined") { | |
return undefined; | |
} | |
try { | |
return JSON.parse(obj); | |
} catch (e: any) { | |
// this is in the parent, so it should usually exist, but it's possible | |
// it's also futureproofing in case other external modfules require this | |
// we also specifically do not throw here because this json might be sensitive, | |
// so we only want it to go to emails | |
(global as any).Monitor?.crashlog?.(e, `a ${path.basename(this.file)} process`, { result: obj }); | |
return undefined; | |
} | |
} | |
getProcess() { | |
return this.process; | |
} | |
getLoad() { | |
return this.pendingTasks.size; | |
} | |
query(input: T): Promise<U> { | |
this.taskId++; | |
const taskId = this.taskId; | |
this.process.send(`${taskId}\n${JSON.stringify(input)}`); | |
return new Promise(resolve => { | |
this.pendingTasks.set(taskId, resolve); | |
}); | |
} | |
release(): Promise<void> { | |
if (this.pendingRelease) return this.pendingRelease; | |
if (!this.getLoad()) { | |
this.destroy(); | |
} else { | |
this.pendingRelease = new Promise(resolve => { | |
this.resolveRelease = resolve; | |
}); | |
} | |
return this.pendingRelease!; | |
} | |
destroy() { | |
if (this.pendingRelease && !this.resolveRelease) { | |
// already destroyed | |
return; | |
} | |
this.process.disconnect(); | |
for (const resolver of this.pendingTasks.values()) { | |
// maybe we should track reject functions too... | |
resolver('' as any); | |
} | |
this.pendingTasks.clear(); | |
if (this.resolveRelease) { | |
this.resolveRelease(); | |
this.resolveRelease = null; | |
} else if (!this.pendingRelease) { | |
this.pendingRelease = Promise.resolve(); | |
} | |
} | |
} | |
/** Wraps the process object in the PARENT process. */ | |
export class StreamProcessWrapper implements ProcessWrapper { | |
process: ChildProcess; | |
taskId = 0; | |
activeStreams = new Map<number, SubprocessStream>(); | |
pendingRelease: Promise<void> | null = null; | |
resolveRelease: (() => void) | null = null; | |
debug?: string; | |
setDebug(message: string) { | |
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message; | |
} | |
messageCallback?: (message: string) => any; | |
constructor(file: string, messageCallback?: (message: string) => any) { | |
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH }); | |
this.messageCallback = messageCallback; | |
this.process.on('message', (message: string) => { | |
if (message.startsWith('THROW\n')) { | |
const error = new Error(); | |
error.stack = message.slice(6); | |
throw error; | |
} | |
if (this.messageCallback && message.startsWith(`CALLBACK\n`)) { | |
this.messageCallback(message.slice(9)); | |
return; | |
} | |
if (message.startsWith('DEBUG\n')) { | |
this.setDebug(message.slice(6)); | |
return; | |
} | |
let nlLoc = message.indexOf('\n'); | |
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`); | |
const taskId = parseInt(message.slice(0, nlLoc)); | |
const stream = this.activeStreams.get(taskId); | |
if (!stream) return; // stream already destroyed | |
message = message.slice(nlLoc + 1); | |
nlLoc = message.indexOf('\n'); | |
if (nlLoc < 0) nlLoc = message.length; | |
const messageType = message.slice(0, nlLoc); | |
message = message.slice(nlLoc + 1); | |
if (messageType === 'END') { | |
stream.pushEnd(); | |
this.deleteStream(taskId); | |
} else if (messageType === 'PUSH') { | |
stream.push(message); | |
} else if (messageType === 'THROW') { | |
const error = new Error(); | |
error.stack = message; | |
stream.pushError(error, true); | |
} else { | |
throw new Error(`Unrecognized messageType ${messageType}`); | |
} | |
}); | |
} | |
getLoad() { | |
return this.activeStreams.size; | |
} | |
getProcess() { | |
return this.process; | |
} | |
deleteStream(taskId: number) { | |
this.activeStreams.delete(taskId); | |
// try to release | |
if (this.resolveRelease && !this.getLoad()) void this.destroy(); | |
} | |
createStream(): SubprocessStream { | |
this.taskId++; | |
const taskId = this.taskId; | |
const stream = new SubprocessStream(this, taskId); | |
this.activeStreams.set(taskId, stream); | |
return stream; | |
} | |
release(): Promise<void> { | |
if (this.pendingRelease) return this.pendingRelease; | |
if (!this.getLoad()) { | |
void this.destroy(); | |
} else { | |
this.pendingRelease = new Promise(resolve => { | |
this.resolveRelease = resolve; | |
}); | |
} | |
return this.pendingRelease!; | |
} | |
destroy() { | |
if (this.pendingRelease && !this.resolveRelease) { | |
// already destroyed | |
return; | |
} | |
this.process.disconnect(); | |
const destroyed = []; | |
for (const stream of this.activeStreams.values()) { | |
destroyed.push(stream.destroy()); | |
} | |
this.activeStreams.clear(); | |
if (this.resolveRelease) { | |
this.resolveRelease(); | |
this.resolveRelease = null; | |
} else if (!this.pendingRelease) { | |
this.pendingRelease = Promise.resolve(); | |
} | |
return Promise.all(destroyed); | |
} | |
} | |
/** | |
* A container for a RawProcessManager stream. This is usually the | |
* RawProcessWrapper, but it can also be a fake RawProcessWrapper if the PM is | |
* told to spawn 0 worker processes. | |
*/ | |
export class StreamWorker { | |
load = 0; | |
workerid = 0; | |
stream: Streams.ObjectReadWriteStream<string>; | |
constructor(stream: Streams.ObjectReadWriteStream<string>) { | |
this.stream = stream; | |
} | |
} | |
/** Wraps the process object in the PARENT process. */ | |
export class RawProcessWrapper implements ProcessWrapper, StreamWorker { | |
process: ChildProcess & { process: undefined } | Worker; | |
taskId = 0; | |
stream: RawSubprocessStream; | |
pendingRelease: Promise<void> | null = null; | |
resolveRelease: (() => void) | null = null; | |
debug?: string; | |
workerid = 0; | |
/** Not managed by RawProcessWrapper itself */ | |
load = 0; | |
setDebug(message: string) { | |
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message; | |
} | |
constructor(file: string, isCluster?: boolean, env?: AnyObject) { | |
if (isCluster) { | |
this.process = cluster.fork(env); | |
this.workerid = this.process.id; | |
} else { | |
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH, env }) as any; | |
} | |
this.process.on('message', (message: string) => { | |
this.stream.push(message); | |
}); | |
this.stream = new RawSubprocessStream(this); | |
} | |
getLoad() { | |
return this.load; | |
} | |
getProcess() { | |
return this.process.process ? this.process.process : this.process; | |
} | |
release(): Promise<void> { | |
if (this.pendingRelease) return this.pendingRelease; | |
if (!this.getLoad()) { | |
void this.destroy(); | |
} else { | |
this.pendingRelease = new Promise(resolve => { | |
this.resolveRelease = resolve; | |
}); | |
} | |
return this.pendingRelease!; | |
} | |
destroy() { | |
if (this.pendingRelease && !this.resolveRelease) { | |
// already destroyed | |
return; | |
} | |
void this.stream.destroy(); | |
this.process.disconnect(); | |
} | |
} | |
/** | |
* A ProcessManager wraps a query function: A function that takes a | |
* string and returns a string or Promise<string>. | |
*/ | |
export abstract class ProcessManager<T extends ProcessWrapper = ProcessWrapper> { | |
static disabled = false; | |
processes: T[] = []; | |
releasingProcesses: T[] = []; | |
crashedProcesses: T[] = []; | |
readonly filename: string; | |
readonly basename: string; | |
readonly isParentProcess: boolean; | |
crashTime = 0; | |
crashRespawnCount = 0; | |
constructor(module: NodeJS.Module) { | |
this.filename = module.filename; | |
this.basename = path.basename(module.filename); | |
this.isParentProcess = (process.mainModule !== module || !process.send); | |
this.listen(); | |
} | |
acquire() { | |
if (!this.processes.length) { | |
return null; | |
} | |
let lowestLoad = this.processes[0]; | |
for (const process of this.processes) { | |
if (process.getLoad() < lowestLoad.getLoad()) { | |
lowestLoad = process; | |
} | |
} | |
return lowestLoad; | |
} | |
releaseCrashed(process: T) { | |
const index = this.processes.indexOf(process); | |
// The process was shut down sanely, not crashed | |
if (index < 0) return; | |
this.processes.splice(index, 1); | |
this.destroyProcess(process); | |
void process.release().then(() => { | |
const releasingIndex = this.releasingProcesses.indexOf(process); | |
if (releasingIndex >= 0) { | |
this.releasingProcesses.splice(releasingIndex, 1); | |
} | |
}); | |
const now = Date.now(); | |
if (this.crashTime && now - this.crashTime > 30 * 60 * 1000) { | |
this.crashTime = 0; | |
this.crashRespawnCount = 0; | |
} | |
if (!this.crashTime) this.crashTime = now; | |
this.crashRespawnCount += 1; | |
// Notify any global crash logger | |
void Promise.reject( | |
new Error(`Process ${this.basename} ${process.getProcess().pid} crashed and had to be restarted`) | |
); | |
this.releasingProcesses.push(process); | |
this.crashedProcesses.push(process); | |
// only respawn processes if there have been fewer than 5 crashes in 30 minutes | |
if (this.crashRespawnCount <= 5) { | |
this.spawn(this.processes.length + 1); | |
} | |
} | |
unspawn() { | |
return Promise.all([...this.processes].map( | |
process => this.unspawnOne(process) | |
)); | |
} | |
async unspawnOne(process: T | null) { | |
if (!process) return; | |
this.destroyProcess(process); | |
const processIndex = this.processes.indexOf(process); | |
if (processIndex < 0) throw new Error('Process inactive'); | |
this.processes.splice(this.processes.indexOf(process), 1); | |
this.releasingProcesses.push(process); | |
await process.release(); | |
const index = this.releasingProcesses.indexOf(process); | |
if (index < 0) return; // can happen if process crashed while releasing | |
this.releasingProcesses.splice(index, 1); | |
} | |
spawn(count = 1, force?: boolean) { | |
if (!this.isParentProcess) return; | |
if (ProcessManager.disabled && !force) return; | |
const spawnCount = count - this.processes.length; | |
for (let i = 0; i < spawnCount; i++) { | |
this.spawnOne(force); | |
} | |
} | |
spawnOne(force?: boolean) { | |
if (!this.isParentProcess) throw new Error('Must use in parent process'); | |
if (ProcessManager.disabled && !force) return null; | |
const process = this.createProcess(); | |
process.process.on('disconnect', () => this.releaseCrashed(process)); | |
this.processes.push(process); | |
return process; | |
} | |
respawn(count: number | null = null) { | |
if (count === null) count = this.processes.length; | |
const unspawned = this.unspawn(); | |
this.spawn(count); | |
return unspawned; | |
} | |
abstract listen(): void; | |
abstract createProcess(...args: any): T; | |
destroyProcess(process: T) {} | |
destroy() { | |
const index = processManagers.indexOf(this); | |
if (index >= 0) processManagers.splice(index, 1); | |
return this.unspawn(); | |
} | |
} | |
export class QueryProcessManager<T = string, U = string> extends ProcessManager<QueryProcessWrapper<T, U>> { | |
_query: (input: T) => U | Promise<U>; | |
messageCallback?: (message: string) => any; | |
timeout: number; | |
/** | |
* @param timeout The number of milliseconds to wait before terminating a query. Defaults to 900000 ms (15 minutes). | |
*/ | |
constructor( | |
module: NodeJS.Module, query: (input: T) => U | Promise<U>, | |
timeout = 15 * 60 * 1000, debugCallback?: (message: string) => any | |
) { | |
super(module); | |
this._query = query; | |
this.timeout = timeout; | |
this.messageCallback = debugCallback; | |
processManagers.push(this); | |
} | |
async query(input: T, process = this.acquire()) { | |
if (!process) return this._query(input); | |
const timeout = setTimeout(() => { | |
const debugInfo = process.debug || "No debug information found."; | |
process.destroy(); | |
this.spawnOne(); | |
throw new Error( | |
`A query originating in ${this.basename} took too long to complete; the process has been respawned.\n${debugInfo}` | |
); | |
}, this.timeout); | |
const result = await process.query(input); | |
clearTimeout(timeout); | |
return result; | |
} | |
queryTemporaryProcess(input: T, force?: boolean) { | |
const process = this.spawnOne(force); | |
const result = this.query(input, process); | |
void this.unspawnOne(process); | |
return result; | |
} | |
createProcess() { | |
return new QueryProcessWrapper<T, U>(this.filename, this.messageCallback); | |
} | |
listen() { | |
if (this.isParentProcess) return; | |
// child process | |
process.on('message', (message: string) => { | |
const nlLoc = message.indexOf('\n'); | |
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`); | |
const taskId = message.slice(0, nlLoc); | |
message = message.slice(nlLoc + 1); | |
if (taskId.startsWith('EVAL')) { | |
// eslint-disable-next-line no-eval | |
process.send!(`${taskId}\n` + eval(message)); | |
return; | |
} | |
void Promise.resolve(this._query(JSON.parse(message))).then( | |
response => process.send!(`${taskId}\n${JSON.stringify(response)}`) | |
); | |
}); | |
process.on('disconnect', () => { | |
process.exit(); | |
}); | |
} | |
} | |
export class StreamProcessManager extends ProcessManager<StreamProcessWrapper> { | |
/* taskid: stream used only in child process */ | |
activeStreams: Map<string, Streams.ObjectReadWriteStream<string>>; | |
_createStream: () => Streams.ObjectReadWriteStream<string>; | |
messageCallback?: (message: string) => any; | |
constructor( | |
module: NodeJS.Module, | |
createStream: () => Streams.ObjectReadWriteStream<string>, | |
messageCallback?: (message: string) => any | |
) { | |
super(module); | |
this.activeStreams = new Map(); | |
this._createStream = createStream; | |
this.messageCallback = messageCallback; | |
processManagers.push(this); | |
} | |
createStream() { | |
const process = this.acquire(); | |
if (!process) return this._createStream(); | |
return process.createStream(); | |
} | |
createProcess() { | |
return new StreamProcessWrapper(this.filename, this.messageCallback); | |
} | |
async pipeStream(taskId: string, stream: Streams.ObjectReadStream<string>) { | |
let done = false; | |
while (!done) { | |
try { | |
let value; | |
({ value, done } = await stream.next()); | |
process.send!(`${taskId}\nPUSH\n${value}`); | |
} catch (err: any) { | |
process.send!(`${taskId}\nTHROW\n${err.stack}`); | |
} | |
} | |
if (!this.activeStreams.has(taskId)) { | |
// stream.destroy() was called, don't send an END message | |
return; | |
} | |
process.send!(`${taskId}\nEND`); | |
this.activeStreams.delete(taskId); | |
} | |
listen() { | |
if (this.isParentProcess) return; | |
// child process | |
process.on('message', (message: string) => { | |
let nlLoc = message.indexOf('\n'); | |
if (nlLoc <= 0) throw new Error(`Invalid request ${message}`); | |
const taskId = message.slice(0, nlLoc); | |
const stream = this.activeStreams.get(taskId); | |
message = message.slice(nlLoc + 1); | |
nlLoc = message.indexOf('\n'); | |
if (nlLoc < 0) nlLoc = message.length; | |
const messageType = message.slice(0, nlLoc); | |
message = message.slice(nlLoc + 1); | |
if (taskId.startsWith('EVAL')) { | |
// eslint-disable-next-line no-eval | |
process.send!(`${taskId}\n` + eval(message)); | |
return; | |
} | |
if (messageType === 'NEW') { | |
if (stream) throw new Error(`NEW: taskId ${taskId} already exists`); | |
const newStream = this._createStream(); | |
this.activeStreams.set(taskId, newStream); | |
void this.pipeStream(taskId, newStream); | |
} else if (messageType === 'DESTROY') { | |
if (!stream) throw new Error(`DESTROY: Invalid taskId ${taskId}`); | |
void stream.destroy(); | |
this.activeStreams.delete(taskId); | |
} else if (messageType === 'WRITE') { | |
if (!stream) throw new Error(`WRITE: Invalid taskId ${taskId}`); | |
void stream.write(message); | |
} else if (messageType === 'WRITEEND') { | |
if (!stream) throw new Error(`WRITEEND: Invalid taskId ${taskId}`); | |
void stream.writeEnd(); | |
} else { | |
throw new Error(`Unrecognized messageType ${messageType}`); | |
} | |
}); | |
process.on('disconnect', () => { | |
process.exit(); | |
}); | |
} | |
} | |
export class RawProcessManager extends ProcessManager<RawProcessWrapper> { | |
/** full list of processes - parent process only */ | |
workers: StreamWorker[] = []; | |
/** if spawning 0 worker processes, the worker is instead stored here in the parent process */ | |
masterWorker: StreamWorker | null = null; | |
/** stream used only in the child process */ | |
activeStream: Streams.ObjectReadWriteStream<string> | null = null; | |
isCluster: boolean; | |
spawnSubscription: ((worker: StreamWorker) => void) | null = null; | |
unspawnSubscription: ((worker: StreamWorker) => void) | null = null; | |
_setupChild: () => Streams.ObjectReadWriteStream<string>; | |
/** worker ID of cluster worker - cluster child process only (0 otherwise) */ | |
readonly workerid = cluster.worker?.id || 0; | |
env: AnyObject | undefined; | |
constructor(options: { | |
module: NodeJS.Module, | |
setupChild: () => Streams.ObjectReadWriteStream<string>, | |
isCluster?: boolean, | |
env?: AnyObject, | |
}) { | |
super(options.module); | |
this.isCluster = !!options.isCluster; | |
this._setupChild = options.setupChild; | |
this.env = options.env; | |
if (this.isCluster && this.isParentProcess) { | |
cluster.setupMaster({ | |
exec: this.filename, | |
cwd: FS.ROOT_PATH, | |
}); | |
} | |
processManagers.push(this); | |
} | |
subscribeSpawn(callback: (worker: StreamWorker) => void) { | |
this.spawnSubscription = callback; | |
} | |
subscribeUnspawn(callback: (worker: StreamWorker) => void) { | |
this.unspawnSubscription = callback; | |
} | |
spawn(count?: number) { | |
super.spawn(count); | |
if (!this.workers.length) { | |
this.masterWorker = new StreamWorker(this._setupChild()); | |
this.workers.push(this.masterWorker); | |
this.spawnSubscription?.(this.masterWorker); | |
} | |
} | |
createProcess() { | |
const process = new RawProcessWrapper(this.filename, this.isCluster, this.env); | |
this.workers.push(process); | |
this.spawnSubscription?.(process); | |
return process; | |
} | |
destroyProcess(process: RawProcessWrapper) { | |
const index = this.workers.indexOf(process); | |
if (index >= 0) this.workers.splice(index, 1); | |
this.unspawnSubscription?.(process); | |
} | |
async pipeStream(stream: Streams.ObjectReadStream<string>) { | |
let done = false; | |
while (!done) { | |
try { | |
let value; | |
({ value, done } = await stream.next()); | |
process.send!(value); | |
} catch (err: any) { | |
process.send!(`THROW\n${err.stack}`); | |
} | |
} | |
} | |
listen() { | |
if (this.isParentProcess) return; | |
setImmediate(() => { | |
this.activeStream = this._setupChild(); | |
void this.pipeStream(this.activeStream); | |
}); | |
// child process | |
process.on('message', (message: string) => { | |
void this.activeStream!.write(message); | |
}); | |
process.on('disconnect', () => { | |
process.exit(); | |
}); | |
} | |
} | |