qinglong/back/app.ts
Copilot 90ddf0fb57
Fix race condition preventing scheduled tasks from executing in clustered mode (#2819)
* Initial plan

* Fix race condition causing scheduled tasks not to run

Added synchronization to ensure gRPC worker is ready before HTTP worker starts. This prevents the race condition where autosave_crontab() tries to register cron jobs before the gRPC server is ready to accept them.

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Add timeout protection for gRPC worker initialization

Added 30-second timeout to prevent system hang if gRPC worker fails to start. This provides better error handling and prevents indefinite waiting.

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Refactor worker ready logic and improve restart handling

- Extracted waitForWorkerReady() method for better code reusability
- Improved worker restart logic to wait for gRPC worker readiness
- This addresses code review feedback for better maintainability

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Improve logging accuracy for worker restarts

- Fixed log messages to accurately reflect worker state
- Added proper logging after gRPC worker is confirmed ready
- Improved HTTP worker restart logging with PID
- Addresses code review feedback for better clarity

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Revert unnecessary pnpm-lock.yaml changes

The pnpm-lock.yaml was unintentionally updated when installing dependencies for testing. No package dependencies were actually changed - only existing code was modified in back/app.ts. Reverting to original state.

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

* Re-register cron jobs when gRPC worker restarts

When the gRPC worker restarts, the scheduled tasks need to be re-added to the new gRPC server instance. This fix:

1. Tracks the HTTP worker reference in the master process
2. Sends a 'reregister-crons' message to the HTTP worker after gRPC restarts
3. HTTP worker calls autosave_crontab() to re-register all cron jobs with the new gRPC server

This ensures scheduled tasks continue to work after a gRPC worker restart.

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
2025-11-12 00:59:22 +08:00

307 lines
9.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'reflect-metadata';
import cluster, { type Worker } from 'cluster';
import compression from 'compression';
import cors from 'cors';
import express from 'express';
import helmet from 'helmet';
import { Container } from 'typedi';
import config from './config';
import Logger from './loaders/logger';
import { monitoringMiddleware } from './middlewares/monitoring';
import { type GrpcServerService } from './services/grpc';
import { type HttpServerService } from './services/http';
interface WorkerMetadata {
id: number;
pid: number;
serviceType: string;
startTime: Date;
}
class Application {
private app: express.Application;
private httpServerService?: HttpServerService;
private grpcServerService?: GrpcServerService;
private isShuttingDown = false;
private workerMetadataMap = new Map<number, WorkerMetadata>();
private httpWorker?: Worker;
constructor() {
this.app = express();
// 创建一个全局中间件删除查询参数中的t
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.query.t) {
delete req.query.t;
}
next();
});
}
async start() {
try {
if (cluster.isPrimary) {
await this.initializeDatabase();
}
if (cluster.isPrimary) {
this.startMasterProcess();
} else {
await this.startWorkerProcess();
}
} catch (error) {
Logger.error('Failed to start application:', error);
process.exit(1);
}
}
private startMasterProcess() {
// Fork gRPC worker first and wait for it to be ready
const grpcWorker = this.forkWorker('grpc');
// Wait for gRPC worker to signal it's ready before starting HTTP worker
this.waitForWorkerReady(grpcWorker, 30000)
.then(() => {
Logger.info('gRPC worker is ready, starting HTTP worker');
this.httpWorker = this.forkWorker('http');
})
.catch((error) => {
Logger.error('Failed to wait for gRPC worker:', error);
process.exit(1);
});
cluster.on('exit', (worker, code, signal) => {
const metadata = this.workerMetadataMap.get(worker.id);
if (metadata) {
if (!this.isShuttingDown) {
Logger.error(
`${metadata.serviceType} worker ${worker.process.pid} died (${signal || code
}). Restarting...`,
);
// If gRPC worker died, restart it and wait for it to be ready
if (metadata.serviceType === 'grpc') {
const newGrpcWorker = this.forkWorker('grpc');
this.waitForWorkerReady(newGrpcWorker, 30000)
.then(() => {
Logger.info('gRPC worker restarted and ready');
// Re-register cron jobs by notifying the HTTP worker
if (this.httpWorker) {
try {
this.httpWorker.send('reregister-crons');
Logger.info('Sent reregister-crons message to HTTP worker');
} catch (error) {
Logger.error('Failed to send reregister-crons message:', error);
}
}
})
.catch((error) => {
Logger.error('Failed to restart gRPC worker:', error);
process.exit(1);
});
} else {
// For HTTP worker, just restart it
const newWorker = this.forkWorker(metadata.serviceType);
this.httpWorker = newWorker;
Logger.info(`Restarted ${metadata.serviceType} worker (PID: ${newWorker.process.pid})`);
}
}
this.workerMetadataMap.delete(worker.id);
}
});
this.setupMasterShutdown();
}
private waitForWorkerReady(worker: Worker, timeoutMs: number): Promise<void> {
return new Promise<void>((resolve, reject) => {
const messageHandler = (msg: any) => {
if (msg === 'ready') {
worker.removeListener('message', messageHandler);
clearTimeout(timeoutId);
resolve();
}
};
worker.on('message', messageHandler);
// Timeout after specified milliseconds
const timeoutId = setTimeout(() => {
worker.removeListener('message', messageHandler);
reject(new Error(`Worker failed to start within ${timeoutMs / 1000} seconds`));
}, timeoutMs);
});
}
private forkWorker(serviceType: string): Worker {
const worker = cluster.fork({ SERVICE_TYPE: serviceType });
this.workerMetadataMap.set(worker.id, {
id: worker.id,
pid: worker.process.pid!,
serviceType,
startTime: new Date(),
});
return worker;
}
private async initializeDatabase() {
const dbLoader = await import('./loaders/db');
await dbLoader.default();
}
private setupMiddlewares() {
this.app.use(helmet({
contentSecurityPolicy: false,
}));
this.app.use(cors(config.cors));
this.app.use(compression());
this.app.use(monitoringMiddleware);
}
private setupMasterShutdown() {
const shutdown = async () => {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
const workers = Object.values(cluster.workers || {});
const workerPromises: Promise<void>[] = [];
workers.forEach((worker) => {
if (worker) {
const exitPromise = new Promise<void>((resolve) => {
worker.once('exit', () => {
Logger.info(`Worker ${worker.process.pid} exited`);
resolve();
});
try {
worker.send('shutdown');
} catch (error) {
Logger.warn(
`Failed to send shutdown to worker ${worker.process.pid}:`,
error,
);
}
});
workerPromises.push(exitPromise);
}
});
try {
await Promise.race([
Promise.all(workerPromises),
new Promise<void>((resolve) => {
setTimeout(() => {
Logger.warn('Worker shutdown timeout reached');
resolve();
}, 10000);
}),
]);
process.exit(0);
} catch (error) {
Logger.error('Error during worker shutdown:', error);
process.exit(1);
}
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async startWorkerProcess() {
const serviceType = process.env.SERVICE_TYPE;
if (!serviceType || !['http', 'grpc'].includes(serviceType)) {
Logger.error('Invalid SERVICE_TYPE:', serviceType);
process.exit(1);
}
Logger.info(`✌️ ${serviceType} worker started (PID: ${process.pid})`);
try {
if (serviceType === 'http') {
await this.startHttpService();
} else {
await this.startGrpcService();
}
process.send?.('ready');
} catch (error) {
Logger.error(`${serviceType} worker failed:`, error);
process.exit(1);
}
}
private async startHttpService() {
this.setupMiddlewares();
const { HttpServerService } = await import('./services/http');
this.httpServerService = Container.get(HttpServerService);
const appLoader = await import('./loaders/app');
await appLoader.default({ app: this.app });
const server = await this.httpServerService.initialize(
this.app,
config.port,
);
const serverLoader = await import('./loaders/server');
await (serverLoader.default as any)({ server });
this.setupWorkerShutdown('http');
}
private async startGrpcService() {
const { GrpcServerService } = await import('./services/grpc');
this.grpcServerService = Container.get(GrpcServerService);
await this.grpcServerService.initialize();
this.setupWorkerShutdown('grpc');
}
private setupWorkerShutdown(serviceType: string) {
process.on('message', async (msg) => {
if (msg === 'shutdown') {
this.gracefulShutdown(serviceType);
} else if (msg === 'reregister-crons' && serviceType === 'http') {
// Re-register cron jobs when gRPC worker restarts
try {
Logger.info('Received reregister-crons message, re-registering cron jobs...');
const CronService = (await import('./services/cron')).default;
const cronService = Container.get(CronService);
await cronService.autosave_crontab();
Logger.info('Cron jobs re-registered successfully');
} catch (error) {
Logger.error('Failed to re-register cron jobs:', error);
}
}
});
const shutdown = () => this.gracefulShutdown(serviceType);
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
private async gracefulShutdown(serviceType: string) {
if (this.isShuttingDown) return;
this.isShuttingDown = true;
try {
if (serviceType === 'http') {
await this.httpServerService?.shutdown();
} else {
await this.grpcServerService?.shutdown();
}
process.exit(0);
} catch (error) {
Logger.error(`[${serviceType}] Error during shutdown:`, error);
process.exit(1);
}
}
}
const app = new Application();
app.start().catch((error) => {
Logger.error('Application failed to start:', error);
process.exit(1);
});