diff --git a/back/app.ts b/back/app.ts index 0859140c..e677421d 100644 --- a/back/app.ts +++ b/back/app.ts @@ -24,6 +24,7 @@ class Application { private grpcServerService?: GrpcServerService; private isShuttingDown = false; private workerMetadataMap = new Map(); + private httpWorker?: Worker; constructor() { this.app = express(); @@ -53,8 +54,19 @@ class Application { } private startMasterProcess() { - this.forkWorker('http'); - this.forkWorker('grpc'); + // Fork gRPC worker first and wait for it to be ready + const grpcWorker = this.forkWorker('grpc'); + + // Wait for gRPC worker to signal it's ready before starting HTTP worker + this.waitForWorkerReady(grpcWorker, 30000) + .then(() => { + Logger.info('gRPC worker is ready, starting HTTP worker'); + this.httpWorker = this.forkWorker('http'); + }) + .catch((error) => { + Logger.error('Failed to wait for gRPC worker:', error); + process.exit(1); + }); cluster.on('exit', (worker, code, signal) => { const metadata = this.workerMetadataMap.get(worker.id); @@ -64,10 +76,32 @@ class Application { `${metadata.serviceType} worker ${worker.process.pid} died (${signal || code }). Restarting...`, ); - const newWorker = this.forkWorker(metadata.serviceType); - Logger.info( - `Restarted ${metadata.serviceType} worker (New PID: ${newWorker.process.pid})`, - ); + // If gRPC worker died, restart it and wait for it to be ready + if (metadata.serviceType === 'grpc') { + const newGrpcWorker = this.forkWorker('grpc'); + this.waitForWorkerReady(newGrpcWorker, 30000) + .then(() => { + Logger.info('gRPC worker restarted and ready'); + // Re-register cron jobs by notifying the HTTP worker + if (this.httpWorker) { + try { + this.httpWorker.send('reregister-crons'); + Logger.info('Sent reregister-crons message to HTTP worker'); + } catch (error) { + Logger.error('Failed to send reregister-crons message:', error); + } + } + }) + .catch((error) => { + Logger.error('Failed to restart gRPC worker:', error); + process.exit(1); + }); + } else { + // For HTTP worker, just restart it + const newWorker = this.forkWorker(metadata.serviceType); + this.httpWorker = newWorker; + Logger.info(`Restarted ${metadata.serviceType} worker (PID: ${newWorker.process.pid})`); + } } this.workerMetadataMap.delete(worker.id); @@ -77,6 +111,25 @@ class Application { this.setupMasterShutdown(); } + private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise { + return new Promise((resolve, reject) => { + const messageHandler = (msg: any) => { + if (msg === 'ready') { + worker.removeListener('message', messageHandler); + clearTimeout(timeoutId); + resolve(); + } + }; + worker.on('message', messageHandler); + + // Timeout after specified milliseconds + const timeoutId = setTimeout(() => { + worker.removeListener('message', messageHandler); + reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`)); + }, timeoutMs); + }); + } + private forkWorker(serviceType: string): Worker { const worker = cluster.fork({ SERVICE_TYPE: serviceType }); @@ -206,9 +259,20 @@ class Application { } private setupWorkerShutdown(serviceType: string) { - process.on('message', (msg) => { + process.on('message', async (msg) => { if (msg === 'shutdown') { this.gracefulShutdown(serviceType); + } else if (msg === 'reregister-crons' && serviceType === 'http') { + // Re-register cron jobs when gRPC worker restarts + try { + Logger.info('Received reregister-crons message, re-registering cron jobs...'); + const CronService = (await import('./services/cron')).default; + const cronService = Container.get(CronService); + await cronService.autosave_crontab(); + Logger.info('Cron jobs re-registered successfully'); + } catch (error) { + Logger.error('Failed to re-register cron jobs:', error); + } } });