qinglong/back/app.ts
copilot-swe-agent[bot] 15f4bcf363 Fix race condition causing scheduled tasks not to run
Added synchronization to ensure gRPC worker is ready before HTTP worker starts. This prevents the race condition where autosave_crontab() tries to register cron jobs before the gRPC server is ready to accept them.

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
2025-11-07 16:28:33 +00:00

263 lines
7.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'reflect-metadata';
import cluster, { type Worker } from 'cluster';
import compression from 'compression';
import cors from 'cors';
import express from 'express';
import helmet from 'helmet';
import { Container } from 'typedi';
import config from './config';
import Logger from './loaders/logger';
import { monitoringMiddleware } from './middlewares/monitoring';
import { type GrpcServerService } from './services/grpc';
import { type HttpServerService } from './services/http';
interface WorkerMetadata {
id: number;
pid: number;
serviceType: string;
startTime: Date;
}
class Application {
private app: express.Application;
private httpServerService?: HttpServerService;
private grpcServerService?: GrpcServerService;
private isShuttingDown = false;
private workerMetadataMap = new Map<number, WorkerMetadata>();
constructor() {
this.app = express();
// 创建一个全局中间件删除查询参数中的t
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.query.t) {
delete req.query.t;
}
next();
});
}
async start() {
try {
if (cluster.isPrimary) {
await this.initializeDatabase();
}
if (cluster.isPrimary) {
this.startMasterProcess();
} else {
await this.startWorkerProcess();
}
} catch (error) {
Logger.error('Failed to start application:', error);
process.exit(1);
}
}
private startMasterProcess() {
// Fork gRPC worker first and wait for it to be ready
const grpcWorker = this.forkWorker('grpc');
// Wait for gRPC worker to signal it's ready before starting HTTP worker
const grpcReadyPromise = new Promise<void>((resolve) => {
const messageHandler = (msg: any) => {
if (msg === 'ready') {
grpcWorker.removeListener('message', messageHandler);
resolve();
}
};
grpcWorker.on('message', messageHandler);
});
// Start HTTP worker after gRPC is ready
grpcReadyPromise.then(() => {
Logger.info('gRPC worker is ready, starting HTTP worker');
this.forkWorker('http');
}).catch((error) => {
Logger.error('Failed to wait for gRPC worker:', error);
process.exit(1);
});
cluster.on('exit', (worker, code, signal) => {
const metadata = this.workerMetadataMap.get(worker.id);
if (metadata) {
if (!this.isShuttingDown) {
Logger.error(
`${metadata.serviceType} worker ${worker.process.pid} died (${signal || code
}). Restarting...`,
);
const newWorker = this.forkWorker(metadata.serviceType);
Logger.info(
`Restarted ${metadata.serviceType} worker (New PID: ${newWorker.process.pid})`,
);
}
this.workerMetadataMap.delete(worker.id);
}
});
this.setupMasterShutdown();
}
private forkWorker(serviceType: string): Worker {
const worker = cluster.fork({ SERVICE_TYPE: serviceType });
this.workerMetadataMap.set(worker.id, {
id: worker.id,
pid: worker.process.pid!,
serviceType,
startTime: new Date(),
});
return worker;
}
private async initializeDatabase() {
const dbLoader = await import('./loaders/db');
await dbLoader.default();
}
private setupMiddlewares() {
this.app.use(helmet({
contentSecurityPolicy: false,
}));
this.app.use(cors(config.cors));
this.app.use(compression());
this.app.use(monitoringMiddleware);
}
private setupMasterShutdown() {
const shutdown = async () => {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
const workers = Object.values(cluster.workers || {});
const workerPromises: Promise<void>[] = [];
workers.forEach((worker) => {
if (worker) {
const exitPromise = new Promise<void>((resolve) => {
worker.once('exit', () => {
Logger.info(`Worker ${worker.process.pid} exited`);
resolve();
});
try {
worker.send('shutdown');
} catch (error) {
Logger.warn(
`Failed to send shutdown to worker ${worker.process.pid}:`,
error,
);
}
});
workerPromises.push(exitPromise);
}
});
try {
await Promise.race([
Promise.all(workerPromises),
new Promise<void>((resolve) => {
setTimeout(() => {
Logger.warn('Worker shutdown timeout reached');
resolve();
}, 10000);
}),
]);
process.exit(0);
} catch (error) {
Logger.error('Error during worker shutdown:', error);
process.exit(1);
}
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async startWorkerProcess() {
const serviceType = process.env.SERVICE_TYPE;
if (!serviceType || !['http', 'grpc'].includes(serviceType)) {
Logger.error('Invalid SERVICE_TYPE:', serviceType);
process.exit(1);
}
Logger.info(`✌️ ${serviceType} worker started (PID: ${process.pid})`);
try {
if (serviceType === 'http') {
await this.startHttpService();
} else {
await this.startGrpcService();
}
process.send?.('ready');
} catch (error) {
Logger.error(`${serviceType} worker failed:`, error);
process.exit(1);
}
}
private async startHttpService() {
this.setupMiddlewares();
const { HttpServerService } = await import('./services/http');
this.httpServerService = Container.get(HttpServerService);
const appLoader = await import('./loaders/app');
await appLoader.default({ app: this.app });
const server = await this.httpServerService.initialize(
this.app,
config.port,
);
const serverLoader = await import('./loaders/server');
await (serverLoader.default as any)({ server });
this.setupWorkerShutdown('http');
}
private async startGrpcService() {
const { GrpcServerService } = await import('./services/grpc');
this.grpcServerService = Container.get(GrpcServerService);
await this.grpcServerService.initialize();
this.setupWorkerShutdown('grpc');
}
private setupWorkerShutdown(serviceType: string) {
process.on('message', (msg) => {
if (msg === 'shutdown') {
this.gracefulShutdown(serviceType);
}
});
const shutdown = () => this.gracefulShutdown(serviceType);
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async gracefulShutdown(serviceType: string) {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
try {
if (serviceType === 'http') {
await this.httpServerService?.shutdown();
} else {
await this.grpcServerService?.shutdown();
}
process.exit(0);
} catch (error) {
Logger.error(`[${serviceType}] Error during shutdown:`, error);
process.exit(1);
}
}
}
const app = new Application();
app.start().catch((error) => {
Logger.error('Application failed to start:', error);
process.exit(1);
});