From 8b8eae211b6f37ddf98fa5e18077e3ad111d0c65 Mon Sep 17 00:00:00 2001 From: whyour Date: Fri, 23 Aug 2024 09:37:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BB=BB=E5=8A=A1=E9=87=8D?= =?UTF-8?q?=E5=A4=8D=E8=BF=90=E8=A1=8C=E6=8F=90=E9=86=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/schedule/addCron.ts | 16 ++++++------- back/services/schedule.ts | 19 ++++++++++++--- back/services/script.ts | 4 +++- back/services/subscription.ts | 3 +++ back/services/system.ts | 4 ++++ back/shared/pLimit.ts | 44 ++++++++++++++++++++++++++++++++++- back/shared/runCron.ts | 15 ++++++++---- 7 files changed, 88 insertions(+), 17 deletions(-) diff --git a/back/schedule/addCron.ts b/back/schedule/addCron.ts index 1889888a..0d4281ba 100644 --- a/back/schedule/addCron.ts +++ b/back/schedule/addCron.ts @@ -24,7 +24,7 @@ const addCron = ( ); if (extraSchedules?.length) { - extraSchedules.forEach(x => { + extraSchedules.forEach((x) => { Logger.info( '[schedule][创建定时任务], 任务ID: %s, 名称: %s, cron: %s, 执行命令: %s', id, @@ -32,21 +32,21 @@ const addCron = ( x.schedule, command, ); - }) + }); } scheduleStacks.set(id, [ nodeSchedule.scheduleJob(id, schedule, async () => { Logger.info(`[schedule][准备运行任务] 命令: ${command}`); - runCron(command, { name, schedule, extraSchedules }); + runCron(command, item); }), ...(extraSchedules?.length ? extraSchedules.map((x) => - nodeSchedule.scheduleJob(id, x.schedule, async () => { - Logger.info(`[schedule][准备运行任务] 命令: ${command}`); - runCron(command, { name, schedule, extraSchedules }); - }), - ) + nodeSchedule.scheduleJob(id, x.schedule, async () => { + Logger.info(`[schedule][准备运行任务] 命令: ${command}`); + runCron(command, item); + }), + ) : []), ]); } diff --git a/back/services/schedule.ts b/back/services/schedule.ts index 47f1f591..5365d8d6 100644 --- a/back/services/schedule.ts +++ b/back/services/schedule.ts @@ -42,7 +42,7 @@ export default class ScheduleService { private maxBuffer = 200 * 1024 * 1024; - constructor(@Inject('logger') private logger: winston.Logger) { } + constructor(@Inject('logger') private logger: winston.Logger) {} async runTask( command: string, @@ -51,12 +51,19 @@ export default class ScheduleService { schedule?: string; name?: string; command?: string; + id: string; }, completionTime: 'start' | 'end' = 'end', ) { - return taskLimit.runWithCronLimit(() => { + return taskLimit.runWithCronLimit(params, () => { return new Promise(async (resolve, reject) => { - this.logger.info(`[panel][开始执行任务] 参数 ${JSON.stringify({ ...params, command })}`); + taskLimit.removeQueuedCron(params.id); + this.logger.info( + `[panel][开始执行任务] 参数 ${JSON.stringify({ + ...params, + command, + })}`, + ); try { const startTime = dayjs(); @@ -131,6 +138,7 @@ export default class ScheduleService { name, schedule, command, + id: _id, }); }), ); @@ -140,6 +148,7 @@ export default class ScheduleService { name, schedule, command, + id: _id, }); } } @@ -148,6 +157,7 @@ export default class ScheduleService { const _id = this.formatId(id); this.logger.info('[panel][取消定时任务], 任务名: %s', name); if (this.scheduleStacks.has(_id)) { + taskLimit.removeQueuedCron(_id); this.scheduleStacks.get(_id)?.cancel(); this.scheduleStacks.delete(_id); } @@ -172,6 +182,7 @@ export default class ScheduleService { this.runTask(command, callbacks, { name, command, + id: _id, }); }, (err) => { @@ -195,6 +206,7 @@ export default class ScheduleService { this.runTask(command, callbacks, { name, command, + id: _id, }); } } @@ -202,6 +214,7 @@ export default class ScheduleService { async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) { const _id = this.formatId(id); this.logger.info('[取消interval任务], 任务ID: %s, 任务名: %s', _id, name); + taskLimit.removeQueuedCron(_id); this.intervalSchedule.removeById(_id); } diff --git a/back/services/script.ts b/back/services/script.ts index 04fed94b..a8d965ac 100644 --- a/back/services/script.ts +++ b/back/services/script.ts @@ -7,6 +7,7 @@ import ScheduleService, { TaskCallbacks } from './schedule'; import config from '../config'; import { TASK_COMMAND } from '../config/const'; import { getFileContentByName, getPid, killTask, rmPath } from '../config/util'; +import taskLimit from '../shared/pLimit'; @Service() export default class ScriptService { @@ -43,7 +44,7 @@ export default class ScriptService { const pid = await this.scheduleService.runTask( `real_time=true ${command}`, this.taskCallbacks(filePath), - { command }, + { command, id: relativePath.replace(/ /g, '-') }, 'start', ); @@ -53,6 +54,7 @@ export default class ScriptService { public async stopScript(filePath: string, pid: number) { if (!pid) { const relativePath = path.relative(config.scriptPath, filePath); + taskLimit.removeQueuedCron(relativePath.replace(/ /g, '-')); pid = (await getPid(`${TASK_COMMAND} ${relativePath} now`)) as number; } try { diff --git a/back/services/subscription.ts b/back/services/subscription.ts index 44728eb2..b7acbe2a 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -29,6 +29,7 @@ import { LOG_END_SYMBOL } from '../config/const'; import { formatCommand, formatUrl } from '../config/subscription'; import { CrontabModel } from '../data/cron'; import CrontabService from './cron'; +import taskLimit from '../shared/pLimit'; @Service() export default class SubscriptionService { @@ -301,6 +302,7 @@ export default class SubscriptionService { for (const doc of docs) { if (doc.pid) { try { + taskLimit.removeQueuedCron(String(doc.id)); await killTask(doc.pid); } catch (error) { this.logger.error(error); @@ -326,6 +328,7 @@ export default class SubscriptionService { name: subscription.name, schedule: subscription.schedule, command, + id: String(subscription.id), }); } diff --git a/back/services/system.ts b/back/services/system.ts index eceb1697..a463fb52 100644 --- a/back/services/system.ts +++ b/back/services/system.ts @@ -178,6 +178,7 @@ export default class SystemService { }, { command, + id: 'update-node-mirror', }, ); } @@ -252,6 +253,7 @@ export default class SystemService { }, { command, + id: 'update-linux-mirror', }, ); } @@ -363,6 +365,7 @@ export default class SystemService { } this.scheduleService.runTask(`real_time=true ${command}`, callback, { command, + id: command.replace(/ /g, '-'), }); } @@ -371,6 +374,7 @@ export default class SystemService { return { code: 400, message: '参数错误' }; } + taskLimit.removeQueuedCron(command.replace(/ /g, '-')); if (pid) { await killTask(pid); return { code: 200 }; diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts index 19d152d7..9215665d 100644 --- a/back/shared/pLimit.ts +++ b/back/shared/pLimit.ts @@ -3,14 +3,29 @@ import os from 'os'; import { AuthDataType, SystemModel } from '../data/system'; import Logger from '../loaders/logger'; import { Dependence } from '../data/dependence'; +import { ICron } from '../protos/cron'; +import NotificationService from '../services/notify'; +import { Inject } from 'typedi'; +export type Override< + T, + K extends Partial<{ [P in keyof T]: any }> | string, +> = K extends string + ? Omit & { [P in keyof T]: T[P] | unknown } + : Omit & K; +type TCron = Override, { id: string }>; interface IDependencyFn { (): Promise; dependency?: Dependence; } +interface ICronFn { + (): Promise; + cron?: TCron; +} class TaskLimit { private dependenyLimit = new PQueue({ concurrency: 1 }); private queuedDependencyIds = new Set([]); + private queuedCrons = new Map(); private updateLogLimit = new PQueue({ concurrency: 1 }); private cronLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), @@ -18,6 +33,8 @@ class TaskLimit { private manualCronoLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); + @Inject((type) => NotificationService) + private notificationService!: NotificationService; get cronLimitActiveCount() { return this.cronLimit.pending; @@ -71,6 +88,16 @@ class TaskLimit { } } + public removeQueuedCron(id: string) { + if (this.queuedCrons.has(id)) { + const runs = this.queuedCrons.get(id); + if (runs && runs.length > 0) { + runs.pop(); + this.queuedCrons.set(id, runs); + } + } + } + public async setCustomLimit(limit?: number) { if (limit) { this.cronLimit.concurrency = limit; @@ -88,9 +115,24 @@ class TaskLimit { } public async runWithCronLimit( - fn: () => Promise, + cron: TCron, + fn: ICronFn, options?: Partial, ): Promise { + let runs = this.queuedCrons.get(cron.id); + if (!runs?.length) { + runs = []; + } + runs.push(cron); + if (runs.length >= 5) { + this.notificationService.notify( + '任务重复运行', + `任务 ${cron.name} ${cron.command} 处于运行中的已达 5 个,请检查系统日志`, + ); + return; + } + this.queuedCrons.set(cron.id, runs); + fn.cron = cron; return this.cronLimit.add(fn, options); } diff --git a/back/shared/runCron.ts b/back/shared/runCron.ts index cc4c11ea..8485c312 100644 --- a/back/shared/runCron.ts +++ b/back/shared/runCron.ts @@ -1,11 +1,18 @@ import { spawn } from 'cross-spawn'; import taskLimit from './pLimit'; import Logger from '../loaders/logger'; +import { ICron } from '../protos/cron'; -export function runCron(cmd: string, options?: { schedule: string; extraSchedules: Array<{ schedule: string }>; name: string }): Promise { - return taskLimit.runWithCronLimit(() => { +export function runCron(cmd: string, cron: ICron): Promise { + return taskLimit.runWithCronLimit(cron, () => { return new Promise(async (resolve: any) => { - Logger.info(`[schedule][开始执行任务] 参数 ${JSON.stringify({ ...options, command: cmd })}`); + taskLimit.removeQueuedCron(cron.id); + Logger.info( + `[schedule][开始执行任务] 参数 ${JSON.stringify({ + ...cron, + command: cmd, + })}`, + ); const cp = spawn(cmd, { shell: '/bin/bash' }); cp.stderr.on('data', (data) => { @@ -24,7 +31,7 @@ export function runCron(cmd: string, options?: { schedule: string; extraSchedule }); cp.on('exit', async (code) => { - resolve({ ...options, command: cmd, pid: cp.pid, code }); + resolve({ ...cron, command: cmd, pid: cp.pid, code }); }); }); });