qinglong/back/shared/pLimit.ts
2026-06-21 23:53:32 +08:00

247 lines
7.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import PQueue, { QueueAddOptions } from 'p-queue-cjs';
import os from 'os';
import { AuthDataType, SystemModel } from '../data/system';
import Logger from '../loaders/logger';
import { Dependence } from '../data/dependence';
import NotificationService from '../services/notify';
import { t, tf } from '../shared/i18n';
import {
ICronFn,
IDependencyFn,
ISchedule,
IScheduleFn,
TCron,
} from './interface';
import config from '../config';
import { credentials } from '@grpc/grpc-js';
import { ApiClient } from '../protos/api';
import { getGrpcCerts } from '../config/grpcCerts';
class TaskLimit {
private dependenyLimit = new PQueue({ concurrency: 1 });
private queuedDependencyIds = new Set<number>([]);
private queuedCrons = new Map<string, ICronFn<any>[]>();
private repeatCronNotifyMap = new Map<string, number>();
private updateLogLimit = new PQueue({ concurrency: 1 });
private cronLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private manualCronoLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private subscriptionLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private scriptLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private systemLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4),
});
private _client: ApiClient | null = null;
private get client(): ApiClient {
if (!this._client) {
const tlsConfig = getGrpcCerts();
const creds = tlsConfig
? credentials.createSsl(
Buffer.from(tlsConfig.caCert),
Buffer.from(tlsConfig.clientKey),
Buffer.from(tlsConfig.clientCert),
)
: credentials.createInsecure();
this._client = new ApiClient(
`localhost:${config.grpcPort}`,
creds,
{ 'grpc.enable_http_proxy': 0 },
);
}
return this._client;
}
get cronLimitActiveCount() {
return this.cronLimit.pending;
}
get cronLimitPendingCount() {
return this.cronLimit.size;
}
get firstDependencyId() {
return [...this.queuedDependencyIds.values()][0];
}
private notificationService: NotificationService = new NotificationService();
constructor() {
this.setCustomLimit();
this.handleEvents();
}
private handleEvents() {
this.cronLimit.on('add', () => {
Logger.info(
`[schedule][任务加入队列] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`,
);
});
this.cronLimit.on('active', () => {
Logger.info(
`[schedule][开始处理任务] 运行中任务数: ${
this.cronLimitActiveCount + 1
}, 等待中任务数: ${this.cronLimitPendingCount}`,
);
});
this.cronLimit.on('completed', (param) => {
Logger.info(`[schedule][任务处理成功] 参数 ${JSON.stringify(param)}`);
});
this.cronLimit.on('error', (error) => {
Logger.error(`[schedule][任务处理错误] 参数 ${JSON.stringify(error)}`);
});
this.cronLimit.on('next', () => {
Logger.info(
`[schedule][任务处理结束] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`,
);
});
this.cronLimit.on('idle', () => {
Logger.info(`[schedule][任务队列] 空闲中...`);
});
}
public removeQueuedDependency(dependency: Dependence) {
if (this.queuedDependencyIds.has(dependency.id!)) {
this.queuedDependencyIds.delete(dependency.id!);
}
}
public removeQueuedCron(id: string) {
if (this.queuedCrons.has(id)) {
const runs = this.queuedCrons.get(id);
if (runs && runs.length > 0) {
runs.pop();
this.queuedCrons.set(id, runs);
}
}
}
public async setCustomLimit(limit?: number) {
if (limit) {
this.cronLimit.concurrency = limit;
this.manualCronoLimit.concurrency = limit;
return;
}
await SystemModel.sync();
const doc = await SystemModel.findOne({
where: { type: AuthDataType.systemConfig },
});
if (doc?.info?.cronConcurrency) {
this.cronLimit.concurrency = doc.info.cronConcurrency;
this.manualCronoLimit.concurrency = doc.info.cronConcurrency;
}
}
public async runWithCronLimit<T>(
cron: TCron,
fn: ICronFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
fn.cron = cron;
let runs = this.queuedCrons.get(cron.id);
const result = runs?.length ? [...runs, fn] : [fn];
const repeatTimes = this.repeatCronNotifyMap.get(cron.id) || 0;
if (result?.length > 5) {
if (repeatTimes < 3) {
this.repeatCronNotifyMap.set(cron.id, repeatTimes + 1);
this.client.systemNotify(
{
title: t('任务重复运行'),
content: tf(
'任务:%s命令%s定时%s处于运行中的超过 %d 个,请检查定时设置',
cron.name || '',
cron.command || '',
cron.schedule || '',
5,
),
},
(err, res) => {
if (err) {
Logger.error(
`[schedule][任务重复运行] 通知失败 ${JSON.stringify(err)}`,
);
}
},
);
}
Logger.warn(`[schedule][任务重复运行] 参数 ${JSON.stringify(cron)}`);
return;
}
this.queuedCrons.set(cron.id, result);
return this.cronLimit.add(fn, options);
}
public async manualRunWithCronLimit<T>(
fn: () => Promise<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
return this.manualCronoLimit.add(fn, options);
}
public async runWithSubscriptionLimit<T>(
schedule: TCron,
fn: IScheduleFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
fn.schedule = schedule;
return this.subscriptionLimit.add(fn, options);
}
public async runWithSystemLimit<T>(
schedule: TCron,
fn: IScheduleFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
fn.schedule = schedule;
return this.systemLimit.add(fn, options);
}
public async runWithScriptLimit<T>(
schedule: ISchedule,
fn: IScheduleFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
fn.schedule = schedule;
return this.scriptLimit.add(fn, options);
}
public async waitDependencyQueueDone(): Promise<void> {
if (this.dependenyLimit.size === 0 && this.dependenyLimit.pending === 0) {
return;
}
return new Promise((resolve) => {
const onIdle = () => {
this.dependenyLimit.removeListener('idle', onIdle);
resolve();
};
this.dependenyLimit.on('idle', onIdle);
});
}
public runDependeny<T>(
dependency: Dependence,
fn: IDependencyFn<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
this.queuedDependencyIds.add(dependency.id!);
fn.dependency = dependency;
return this.dependenyLimit.add(fn, options);
}
public updateDepLog<T>(
fn: () => Promise<T>,
options?: Partial<QueueAddOptions>,
): Promise<T | void> {
return this.updateLogLimit.add(fn, options);
}
}
export default new TaskLimit();