From c9bd053fbd5f1415091b8bc26aec398fa1bdae2b Mon Sep 17 00:00:00 2001 From: whyour Date: Wed, 11 Jun 2025 00:42:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=9C=8D=E5=8A=A1=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/app.ts | 197 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 166 insertions(+), 31 deletions(-) diff --git a/back/app.ts b/back/app.ts index ea7a9476..8c76ffd1 100644 --- a/back/app.ts +++ b/back/app.ts @@ -1,4 +1,5 @@ import 'reflect-metadata'; +import cluster, { type Worker } from 'cluster'; import compression from 'compression'; import cors from 'cors'; import express from 'express'; @@ -10,11 +11,19 @@ import { monitoringMiddleware } from './middlewares/monitoring'; import { type GrpcServerService } from './services/grpc'; import { type HttpServerService } from './services/http'; +interface WorkerMetadata { + id: number; + pid: number; + serviceType: string; + startTime: Date; +} + class Application { private app: express.Application; private httpServerService?: HttpServerService; private grpcServerService?: GrpcServerService; private isShuttingDown = false; + private workerMetadataMap = new Map(); constructor() { this.app = express(); @@ -22,24 +31,57 @@ class Application { async start() { try { - await this.initializeDatabase(); - await this.initServer(); - this.setupMiddlewares(); - await this.initializeServices(); - this.setupGracefulShutdown(); - - process.send?.('ready'); + if (cluster.isPrimary) { + await this.initializeDatabase(); + } + if (cluster.isPrimary) { + this.startMasterProcess(); + } else { + await this.startWorkerProcess(); + } } catch (error) { Logger.error('Failed to start application:', error); process.exit(1); } } - async initServer() { - const { HttpServerService } = await import('./services/http'); - const { GrpcServerService } = await import('./services/grpc'); - this.httpServerService = Container.get(HttpServerService); - this.grpcServerService = Container.get(GrpcServerService); + private startMasterProcess() { + this.forkWorker('http'); + this.forkWorker('grpc'); + + cluster.on('exit', (worker, code, signal) => { + const metadata = this.workerMetadataMap.get(worker.id); + if (metadata) { + if (!this.isShuttingDown) { + Logger.error( + `${metadata.serviceType} worker ${worker.process.pid} died (${ + signal || code + }). Restarting...`, + ); + const newWorker = this.forkWorker(metadata.serviceType); + Logger.info( + `Restarted ${metadata.serviceType} worker (New PID: ${newWorker.process.pid})`, + ); + } + + this.workerMetadataMap.delete(worker.id); + } + }); + + this.setupMasterShutdown(); + } + + private forkWorker(serviceType: string): Worker { + const worker = cluster.fork({ SERVICE_TYPE: serviceType }); + + this.workerMetadataMap.set(worker.id, { + id: worker.id, + pid: worker.process.pid!, + serviceType, + startTime: new Date(), + }); + + return worker; } private async initializeDatabase() { @@ -53,33 +95,49 @@ class Application { this.app.use(monitoringMiddleware); } - private async initializeServices() { - await this.grpcServerService?.initialize(); - - await require('./loaders/app').default({ app: this.app }); - - const server = await this.httpServerService?.initialize( - this.app, - config.port, - ); - - await require('./loaders/server').default({ server }); - } - - private setupGracefulShutdown() { + private setupMasterShutdown() { const shutdown = async () => { if (this.isShuttingDown) return; this.isShuttingDown = true; - Logger.info('Shutting down services...'); + const workers = Object.values(cluster.workers || {}); + const workerPromises: Promise[] = []; + + workers.forEach((worker) => { + if (worker) { + const exitPromise = new Promise((resolve) => { + worker.once('exit', () => { + Logger.info(`Worker ${worker.process.pid} exited`); + resolve(); + }); + + try { + worker.send('shutdown'); + } catch (error) { + Logger.warn( + `Failed to send shutdown to worker ${worker.process.pid}:`, + error, + ); + } + }); + + workerPromises.push(exitPromise); + } + }); + try { - await Promise.all([ - this.grpcServerService?.shutdown(), - this.httpServerService?.shutdown(), + await Promise.race([ + Promise.all(workerPromises), + new Promise((resolve) => { + setTimeout(() => { + Logger.warn('Worker shutdown timeout reached'); + resolve(); + }, 10000); + }), ]); process.exit(0); } catch (error) { - Logger.error('Error during shutdown:', error); + Logger.error('Error during worker shutdown:', error); process.exit(1); } }; @@ -87,6 +145,83 @@ class Application { process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); } + + private async startWorkerProcess() { + const serviceType = process.env.SERVICE_TYPE; + if (!serviceType || !['http', 'grpc'].includes(serviceType)) { + Logger.error('Invalid SERVICE_TYPE:', serviceType); + process.exit(1); + } + + Logger.info(`✌️ ${serviceType} worker started (PID: ${process.pid})`); + + try { + if (serviceType === 'http') { + await this.startHttpService(); + } else { + await this.startGrpcService(); + } + + process.send?.('ready'); + } catch (error) { + Logger.error(`${serviceType} worker failed:`, error); + process.exit(1); + } + } + + private async startHttpService() { + this.setupMiddlewares(); + + const { HttpServerService } = await import('./services/http'); + this.httpServerService = Container.get(HttpServerService); + + await require('./loaders/app').default({ app: this.app }); + + const server = await this.httpServerService.initialize( + this.app, + config.port, + ); + + await require('./loaders/server').default({ server }); + this.setupWorkerShutdown('http'); + } + + private async startGrpcService() { + const { GrpcServerService } = await import('./services/grpc'); + this.grpcServerService = Container.get(GrpcServerService); + + await this.grpcServerService.initialize(); + this.setupWorkerShutdown('grpc'); + } + + private setupWorkerShutdown(serviceType: string) { + process.on('message', (msg) => { + if (msg === 'shutdown') { + this.gracefulShutdown(serviceType); + } + }); + + const shutdown = () => this.gracefulShutdown(serviceType); + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); + } + + private async gracefulShutdown(serviceType: string) { + if (this.isShuttingDown) return; + this.isShuttingDown = true; + + try { + if (serviceType === 'http') { + await this.httpServerService?.shutdown(); + } else { + await this.grpcServerService?.shutdown(); + } + process.exit(0); + } catch (error) { + Logger.error(`[${serviceType}] Error during shutdown:`, error); + process.exit(1); + } + } } const app = new Application();