import PQueue, { QueueAddOptions } from 'p-queue-cjs'; import os from 'os'; import { AuthDataType, SystemModel } from '../data/system'; import Logger from '../loaders/logger'; import { Dependence } from '../data/dependence'; import NotificationService from '../services/notify'; import { ICronFn, IDependencyFn, ISchedule, IScheduleFn, TCron, } from './interface'; import config from '../config'; import { credentials } from '@grpc/grpc-js'; import { ApiClient } from '../protos/api'; class TaskLimit { private dependenyLimit = new PQueue({ concurrency: 1 }); private queuedDependencyIds = new Set([]); private queuedCrons = new Map[]>(); private repeatCronNotifyMap = new Map(); private updateLogLimit = new PQueue({ concurrency: 1 }); private cronLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); private manualCronoLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); private subscriptionLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); private scriptLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); private systemLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); private client = new ApiClient( `0.0.0.0:${config.grpcPort}`, credentials.createInsecure(), { 'grpc.enable_http_proxy': 0 }, ); get cronLimitActiveCount() { return this.cronLimit.pending; } get cronLimitPendingCount() { return this.cronLimit.size; } get firstDependencyId() { return [...this.queuedDependencyIds.values()][0]; } private notificationService: NotificationService = new NotificationService(); constructor() { this.setCustomLimit(); this.handleEvents(); } private handleEvents() { this.cronLimit.on('add', () => { Logger.info( `[schedule][任务加入队列] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`, ); }); this.cronLimit.on('active', () => { Logger.info( `[schedule][开始处理任务] 运行中任务数: ${ this.cronLimitActiveCount + 1 }, 等待中任务数: ${this.cronLimitPendingCount}`, ); }); this.cronLimit.on('completed', (param) => { Logger.info(`[schedule][任务处理成功] 参数 ${JSON.stringify(param)}`); }); this.cronLimit.on('error', (error) => { Logger.error(`[schedule][任务处理错误] 参数 ${JSON.stringify(error)}`); }); this.cronLimit.on('next', () => { Logger.info( `[schedule][任务处理结束] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`, ); }); this.cronLimit.on('idle', () => { Logger.info(`[schedule][任务队列] 空闲中...`); }); } public removeQueuedDependency(dependency: Dependence) { if (this.queuedDependencyIds.has(dependency.id!)) { this.queuedDependencyIds.delete(dependency.id!); } } public removeQueuedCron(id: string) { if (this.queuedCrons.has(id)) { const runs = this.queuedCrons.get(id); if (runs && runs.length > 0) { runs.pop(); this.queuedCrons.set(id, runs); } } } public async setCustomLimit(limit?: number) { if (limit) { this.cronLimit.concurrency = limit; this.manualCronoLimit.concurrency = limit; return; } await SystemModel.sync(); const doc = await SystemModel.findOne({ where: { type: AuthDataType.systemConfig }, }); if (doc?.info?.cronConcurrency) { this.cronLimit.concurrency = doc.info.cronConcurrency; this.manualCronoLimit.concurrency = doc.info.cronConcurrency; } } public async runWithCronLimit( cron: TCron, fn: ICronFn, options?: Partial, ): Promise { fn.cron = cron; let runs = this.queuedCrons.get(cron.id); const result = runs?.length ? [...runs, fn] : [fn]; const repeatTimes = this.repeatCronNotifyMap.get(cron.id) || 0; if (result?.length > 5) { if (repeatTimes < 3) { this.repeatCronNotifyMap.set(cron.id, repeatTimes + 1); this.client.systemNotify( { title: '任务重复运行', content: `任务:${cron.name},命令:${cron.command},定时:${cron.schedule},处于运行中的超过 5 个,请检查定时设置`, }, (err, res) => { if (err) { Logger.error( `[schedule][任务重复运行] 通知失败 ${JSON.stringify(err)}`, ); } }, ); } Logger.warn(`[schedule][任务重复运行] 参数 ${JSON.stringify(cron)}`); return; } this.queuedCrons.set(cron.id, result); return this.cronLimit.add(fn, options); } public async manualRunWithCronLimit( fn: () => Promise, options?: Partial, ): Promise { return this.manualCronoLimit.add(fn, options); } public async runWithSubscriptionLimit( schedule: TCron, fn: IScheduleFn, options?: Partial, ): Promise { fn.schedule = schedule; return this.subscriptionLimit.add(fn, options); } public async runWithSystemLimit( schedule: TCron, fn: IScheduleFn, options?: Partial, ): Promise { fn.schedule = schedule; return this.systemLimit.add(fn, options); } public async runWithScriptLimit( schedule: ISchedule, fn: IScheduleFn, options?: Partial, ): Promise { fn.schedule = schedule; return this.scriptLimit.add(fn, options); } public async waitDependencyQueueDone(): Promise { if (this.dependenyLimit.size === 0 && this.dependenyLimit.pending === 0) { return; } return new Promise((resolve) => { const onIdle = () => { this.dependenyLimit.removeListener('idle', onIdle); resolve(); }; this.dependenyLimit.on('idle', onIdle); }); } public runDependeny( dependency: Dependence, fn: IDependencyFn, options?: Partial, ): Promise { this.queuedDependencyIds.add(dependency.id!); fn.dependency = dependency; return this.dependenyLimit.add(fn, options); } public updateDepLog( fn: () => Promise, options?: Partial, ): Promise { return this.updateLogLimit.add(fn, options); } } export default new TaskLimit();