From 90ddf0fb5783586f48fff90521cc77452e370b64 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 12 Nov 2025 00:59:22 +0800 Subject: [PATCH] Fix race condition preventing scheduled tasks from executing in clustered mode (#2819) * Initial plan * Fix race condition causing scheduled tasks not to run Added synchronization to ensure gRPC worker is ready before HTTP worker starts. This prevents the race condition where autosave_crontab() tries to register cron jobs before the gRPC server is ready to accept them. Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Add timeout protection for gRPC worker initialization Added 30-second timeout to prevent system hang if gRPC worker fails to start. This provides better error handling and prevents indefinite waiting. Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Refactor worker ready logic and improve restart handling - Extracted waitForWorkerReady() method for better code reusability - Improved worker restart logic to wait for gRPC worker readiness - This addresses code review feedback for better maintainability Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Improve logging accuracy for worker restarts - Fixed log messages to accurately reflect worker state - Added proper logging after gRPC worker is confirmed ready - Improved HTTP worker restart logging with PID - Addresses code review feedback for better clarity Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Revert unnecessary pnpm-lock.yaml changes The pnpm-lock.yaml was unintentionally updated when installing dependencies for testing. No package dependencies were actually changed - only existing code was modified in back/app.ts. Reverting to original state. Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Re-register cron jobs when gRPC worker restarts When the gRPC worker restarts, the scheduled tasks need to be re-added to the new gRPC server instance. This fix: 1. Tracks the HTTP worker reference in the master process 2. Sends a 'reregister-crons' message to the HTTP worker after gRPC restarts 3. HTTP worker calls autosave_crontab() to re-register all cron jobs with the new gRPC server This ensures scheduled tasks continue to work after a gRPC worker restart. Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> --- back/app.ts | 78 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/back/app.ts b/back/app.ts index 0859140c..e677421d 100644 --- a/back/app.ts +++ b/back/app.ts @@ -24,6 +24,7 @@ class Application { private grpcServerService?: GrpcServerService; private isShuttingDown = false; private workerMetadataMap = new Map(); + private httpWorker?: Worker; constructor() { this.app = express(); @@ -53,8 +54,19 @@ class Application { } private startMasterProcess() { - this.forkWorker('http'); - this.forkWorker('grpc'); + // Fork gRPC worker first and wait for it to be ready + const grpcWorker = this.forkWorker('grpc'); + + // Wait for gRPC worker to signal it's ready before starting HTTP worker + this.waitForWorkerReady(grpcWorker, 30000) + .then(() => { + Logger.info('gRPC worker is ready, starting HTTP worker'); + this.httpWorker = this.forkWorker('http'); + }) + .catch((error) => { + Logger.error('Failed to wait for gRPC worker:', error); + process.exit(1); + }); cluster.on('exit', (worker, code, signal) => { const metadata = this.workerMetadataMap.get(worker.id); @@ -64,10 +76,32 @@ class Application { `${metadata.serviceType} worker ${worker.process.pid} died (${signal || code }). Restarting...`, ); - const newWorker = this.forkWorker(metadata.serviceType); - Logger.info( - `Restarted ${metadata.serviceType} worker (New PID: ${newWorker.process.pid})`, - ); + // If gRPC worker died, restart it and wait for it to be ready + if (metadata.serviceType === 'grpc') { + const newGrpcWorker = this.forkWorker('grpc'); + this.waitForWorkerReady(newGrpcWorker, 30000) + .then(() => { + Logger.info('gRPC worker restarted and ready'); + // Re-register cron jobs by notifying the HTTP worker + if (this.httpWorker) { + try { + this.httpWorker.send('reregister-crons'); + Logger.info('Sent reregister-crons message to HTTP worker'); + } catch (error) { + Logger.error('Failed to send reregister-crons message:', error); + } + } + }) + .catch((error) => { + Logger.error('Failed to restart gRPC worker:', error); + process.exit(1); + }); + } else { + // For HTTP worker, just restart it + const newWorker = this.forkWorker(metadata.serviceType); + this.httpWorker = newWorker; + Logger.info(`Restarted ${metadata.serviceType} worker (PID: ${newWorker.process.pid})`); + } } this.workerMetadataMap.delete(worker.id); @@ -77,6 +111,25 @@ class Application { this.setupMasterShutdown(); } + private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise { + return new Promise((resolve, reject) => { + const messageHandler = (msg: any) => { + if (msg === 'ready') { + worker.removeListener('message', messageHandler); + clearTimeout(timeoutId); + resolve(); + } + }; + worker.on('message', messageHandler); + + // Timeout after specified milliseconds + const timeoutId = setTimeout(() => { + worker.removeListener('message', messageHandler); + reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`)); + }, timeoutMs); + }); + } + private forkWorker(serviceType: string): Worker { const worker = cluster.fork({ SERVICE_TYPE: serviceType }); @@ -206,9 +259,20 @@ class Application { } private setupWorkerShutdown(serviceType: string) { - process.on('message', (msg) => { + process.on('message', async (msg) => { if (msg === 'shutdown') { this.gracefulShutdown(serviceType); + } else if (msg === 'reregister-crons' && serviceType === 'http') { + // Re-register cron jobs when gRPC worker restarts + try { + Logger.info('Received reregister-crons message, re-registering cron jobs...'); + const CronService = (await import('./services/cron')).default; + const cronService = Container.get(CronService); + await cronService.autosave_crontab(); + Logger.info('Cron jobs re-registered successfully'); + } catch (error) { + Logger.error('Failed to re-register cron jobs:', error); + } } });