mirror of
https://github.com/whyour/qinglong.git
synced 2025-11-08 15:06:08 +08:00
Refactor worker ready logic and improve restart handling
- Extracted waitForWorkerReady() method for better code reusability - Improved worker restart logic to wait for gRPC worker readiness - This addresses code review feedback for better maintainability Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
parent
827453986b
commit
2baf352350
68
back/app.ts
68
back/app.ts
|
|
@ -57,31 +57,15 @@ class Application {
|
|||
const grpcWorker = this.forkWorker('grpc');
|
||||
|
||||
// Wait for gRPC worker to signal it's ready before starting HTTP worker
|
||||
const grpcReadyPromise = new Promise<void>((resolve, reject) => {
|
||||
const messageHandler = (msg: any) => {
|
||||
if (msg === 'ready') {
|
||||
grpcWorker.removeListener('message', messageHandler);
|
||||
clearTimeout(timeoutId);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
grpcWorker.on('message', messageHandler);
|
||||
|
||||
// Timeout after 30 seconds
|
||||
const timeoutId = setTimeout(() => {
|
||||
grpcWorker.removeListener('message', messageHandler);
|
||||
reject(new Error('gRPC worker failed to start within 30 seconds'));
|
||||
}, 30000);
|
||||
});
|
||||
|
||||
// Start HTTP worker after gRPC is ready
|
||||
grpcReadyPromise.then(() => {
|
||||
Logger.info('gRPC worker is ready, starting HTTP worker');
|
||||
this.forkWorker('http');
|
||||
}).catch((error) => {
|
||||
Logger.error('Failed to wait for gRPC worker:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
this.waitForWorkerReady(grpcWorker, 30000)
|
||||
.then(() => {
|
||||
Logger.info('gRPC worker is ready, starting HTTP worker');
|
||||
this.forkWorker('http');
|
||||
})
|
||||
.catch((error) => {
|
||||
Logger.error('Failed to wait for gRPC worker:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
cluster.on('exit', (worker, code, signal) => {
|
||||
const metadata = this.workerMetadataMap.get(worker.id);
|
||||
|
|
@ -91,9 +75,20 @@ class Application {
|
|||
`${metadata.serviceType} worker ${worker.process.pid} died (${signal || code
|
||||
}). Restarting...`,
|
||||
);
|
||||
const newWorker = this.forkWorker(metadata.serviceType);
|
||||
// If gRPC worker died, restart it and wait for it to be ready
|
||||
// before potentially needing to restart HTTP worker
|
||||
if (metadata.serviceType === 'grpc') {
|
||||
const newGrpcWorker = this.forkWorker('grpc');
|
||||
this.waitForWorkerReady(newGrpcWorker, 30000).catch((error) => {
|
||||
Logger.error('Failed to restart gRPC worker:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
} else {
|
||||
// For HTTP worker, just restart it
|
||||
this.forkWorker(metadata.serviceType);
|
||||
}
|
||||
Logger.info(
|
||||
`Restarted ${metadata.serviceType} worker (New PID: ${newWorker.process.pid})`,
|
||||
`Restarted ${metadata.serviceType} worker`,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -104,6 +99,25 @@ class Application {
|
|||
this.setupMasterShutdown();
|
||||
}
|
||||
|
||||
private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const messageHandler = (msg: any) => {
|
||||
if (msg === 'ready') {
|
||||
worker.removeListener('message', messageHandler);
|
||||
clearTimeout(timeoutId);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
worker.on('message', messageHandler);
|
||||
|
||||
// Timeout after specified milliseconds
|
||||
const timeoutId = setTimeout(() => {
|
||||
worker.removeListener('message', messageHandler);
|
||||
reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`));
|
||||
}, timeoutMs);
|
||||
});
|
||||
}
|
||||
|
||||
private forkWorker(serviceType: string): Worker {
|
||||
const worker = cluster.fork({ SERVICE_TYPE: serviceType });
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user