qinglong/back/app.ts
copilot-swe-agent[bot] 95939bbea5 Improve logging accuracy for worker restarts
- Fixed log messages to accurately reflect worker state
- Added proper logging after gRPC worker is confirmed ready
- Improved HTTP worker restart logging with PID
- Addresses code review feedback for better clarity

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
2025-11-07 16:34:42 +00:00

285 lines
8.0 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'reflect-metadata';
import cluster, { type Worker } from 'cluster';
import compression from 'compression';
import cors from 'cors';
import express from 'express';
import helmet from 'helmet';
import { Container } from 'typedi';
import config from './config';
import Logger from './loaders/logger';
import { monitoringMiddleware } from './middlewares/monitoring';
import { type GrpcServerService } from './services/grpc';
import { type HttpServerService } from './services/http';
interface WorkerMetadata {
id: number;
pid: number;
serviceType: string;
startTime: Date;
}
class Application {
private app: express.Application;
private httpServerService?: HttpServerService;
private grpcServerService?: GrpcServerService;
private isShuttingDown = false;
private workerMetadataMap = new Map<number, WorkerMetadata>();
constructor() {
this.app = express();
// 创建一个全局中间件删除查询参数中的t
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.query.t) {
delete req.query.t;
}
next();
});
}
async start() {
try {
if (cluster.isPrimary) {
await this.initializeDatabase();
}
if (cluster.isPrimary) {
this.startMasterProcess();
} else {
await this.startWorkerProcess();
}
} catch (error) {
Logger.error('Failed to start application:', error);
process.exit(1);
}
}
private startMasterProcess() {
// Fork gRPC worker first and wait for it to be ready
const grpcWorker = this.forkWorker('grpc');
// Wait for gRPC worker to signal it's ready before starting HTTP worker
this.waitForWorkerReady(grpcWorker, 30000)
.then(() => {
Logger.info('gRPC worker is ready, starting HTTP worker');
this.forkWorker('http');
})
.catch((error) => {
Logger.error('Failed to wait for gRPC worker:', error);
process.exit(1);
});
cluster.on('exit', (worker, code, signal) => {
const metadata = this.workerMetadataMap.get(worker.id);
if (metadata) {
if (!this.isShuttingDown) {
Logger.error(
`${metadata.serviceType} worker ${worker.process.pid} died (${signal || code
}). Restarting...`,
);
// If gRPC worker died, restart it and wait for it to be ready
if (metadata.serviceType === 'grpc') {
const newGrpcWorker = this.forkWorker('grpc');
this.waitForWorkerReady(newGrpcWorker, 30000)
.then(() => {
Logger.info('gRPC worker restarted and ready');
})
.catch((error) => {
Logger.error('Failed to restart gRPC worker:', error);
process.exit(1);
});
} else {
// For HTTP worker, just restart it
const newWorker = this.forkWorker(metadata.serviceType);
Logger.info(`Restarted ${metadata.serviceType} worker (PID: ${newWorker.process.pid})`);
}
}
this.workerMetadataMap.delete(worker.id);
}
});
this.setupMasterShutdown();
}
private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise<void> {
return new Promise<void>((resolve, reject) => {
const messageHandler = (msg: any) => {
if (msg === 'ready') {
worker.removeListener('message', messageHandler);
clearTimeout(timeoutId);
resolve();
}
};
worker.on('message', messageHandler);
// Timeout after specified milliseconds
const timeoutId = setTimeout(() => {
worker.removeListener('message', messageHandler);
reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`));
}, timeoutMs);
});
}
private forkWorker(serviceType: string): Worker {
const worker = cluster.fork({ SERVICE_TYPE: serviceType });
this.workerMetadataMap.set(worker.id, {
id: worker.id,
pid: worker.process.pid!,
serviceType,
startTime: new Date(),
});
return worker;
}
private async initializeDatabase() {
const dbLoader = await import('./loaders/db');
await dbLoader.default();
}
private setupMiddlewares() {
this.app.use(helmet({
contentSecurityPolicy: false,
}));
this.app.use(cors(config.cors));
this.app.use(compression());
this.app.use(monitoringMiddleware);
}
private setupMasterShutdown() {
const shutdown = async () => {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
const workers = Object.values(cluster.workers || {});
const workerPromises: Promise<void>[] = [];
workers.forEach((worker) => {
if (worker) {
const exitPromise = new Promise<void>((resolve) => {
worker.once('exit', () => {
Logger.info(`Worker ${worker.process.pid} exited`);
resolve();
});
try {
worker.send('shutdown');
} catch (error) {
Logger.warn(
`Failed to send shutdown to worker ${worker.process.pid}:`,
error,
);
}
});
workerPromises.push(exitPromise);
}
});
try {
await Promise.race([
Promise.all(workerPromises),
new Promise<void>((resolve) => {
setTimeout(() => {
Logger.warn('Worker shutdown timeout reached');
resolve();
}, 10000);
}),
]);
process.exit(0);
} catch (error) {
Logger.error('Error during worker shutdown:', error);
process.exit(1);
}
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async startWorkerProcess() {
const serviceType = process.env.SERVICE_TYPE;
if (!serviceType || !['http', 'grpc'].includes(serviceType)) {
Logger.error('Invalid SERVICE_TYPE:', serviceType);
process.exit(1);
}
Logger.info(`✌️ ${serviceType} worker started (PID: ${process.pid})`);
try {
if (serviceType === 'http') {
await this.startHttpService();
} else {
await this.startGrpcService();
}
process.send?.('ready');
} catch (error) {
Logger.error(`${serviceType} worker failed:`, error);
process.exit(1);
}
}
private async startHttpService() {
this.setupMiddlewares();
const { HttpServerService } = await import('./services/http');
this.httpServerService = Container.get(HttpServerService);
const appLoader = await import('./loaders/app');
await appLoader.default({ app: this.app });
const server = await this.httpServerService.initialize(
this.app,
config.port,
);
const serverLoader = await import('./loaders/server');
await (serverLoader.default as any)({ server });
this.setupWorkerShutdown('http');
}
private async startGrpcService() {
const { GrpcServerService } = await import('./services/grpc');
this.grpcServerService = Container.get(GrpcServerService);
await this.grpcServerService.initialize();
this.setupWorkerShutdown('grpc');
}
private setupWorkerShutdown(serviceType: string) {
process.on('message', (msg) => {
if (msg === 'shutdown') {
this.gracefulShutdown(serviceType);
}
});
const shutdown = () => this.gracefulShutdown(serviceType);
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async gracefulShutdown(serviceType: string) {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
try {
if (serviceType === 'http') {
await this.httpServerService?.shutdown();
} else {
await this.grpcServerService?.shutdown();
}
process.exit(0);
} catch (error) {
Logger.error(`[${serviceType}] Error during shutdown:`, error);
process.exit(1);
}
}
}
const app = new Application();
app.start().catch((error) => {
Logger.error('Application failed to start:', error);
process.exit(1);
});