From 65f7483688fd131854079fae48508a5b29b396b6 Mon Sep 17 00:00:00 2001 From: whyour Date: Sun, 25 Aug 2024 16:28:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=BB=E5=8A=A1=E9=A2=91?= =?UTF-8?q?=E7=B9=81=E8=BF=90=E8=A1=8C=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + back/config/index.ts | 2 ++ back/loaders/initData.ts | 7 +++++++ back/services/notify.ts | 27 +++++++++++++++++++++++++-- back/services/subscription.ts | 1 - back/services/system.ts | 1 - back/shared/pLimit.ts | 26 ++++++++++++-------------- back/shared/runCron.ts | 2 +- 8 files changed, 48 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 09d1e8f8..773d481a 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ __pycache__ /shell/preload/env.* /shell/preload/notify.* +/shell/preload/*-notify.json diff --git a/back/config/index.ts b/back/config/index.ts index 1f28febf..de4f6ceb 100644 --- a/back/config/index.ts +++ b/back/config/index.ts @@ -50,6 +50,7 @@ const extraFile = path.join(configPath, 'extra.sh'); const confBakDir = path.join(dataPath, 'config/bak/'); const sampleFile = path.join(samplePath, 'config.sample.sh'); const sqliteFile = path.join(samplePath, 'database.sqlite'); +const systemNotifyFile = path.join(preloadPath, 'system-notify.json'); const authError = '错误的用户名密码,请重试'; const loginFaild = '请先登录!'; @@ -132,4 +133,5 @@ export default { sqliteFile, sshdPath, systemLogPath, + systemNotifyFile, }; diff --git a/back/loaders/initData.ts b/back/loaders/initData.ts index 5d19b4e1..f864088b 100644 --- a/back/loaders/initData.ts +++ b/back/loaders/initData.ts @@ -11,17 +11,24 @@ import { CrontabViewModel, CronViewType } from '../data/cronView'; import { initPosition } from '../data/env'; import { AuthDataType, SystemModel } from '../data/system'; import SystemService from '../services/system'; +import UserService from '../services/user'; +import { writeFile } from 'fs/promises'; export default async () => { const cronService = Container.get(CronService); const envService = Container.get(EnvService); const dependenceService = Container.get(DependenceService); const systemService = Container.get(SystemService); + const userService = Container.get(UserService); // 初始化增加系统配置 await SystemModel.upsert({ type: AuthDataType.systemConfig }); await SystemModel.upsert({ type: AuthDataType.notification }); + // 初始化通知配置 + const notifyConfig = await userService.getNotificationMode(); + await writeFile(config.systemNotifyFile, JSON.stringify(notifyConfig)); + const installDependencies = () => { // 初始化时安装所有处于安装中,安装成功,安装失败的依赖 DependenceModel.findAll({ diff --git a/back/services/notify.ts b/back/services/notify.ts index 0f220aad..fcdd3c12 100644 --- a/back/services/notify.ts +++ b/back/services/notify.ts @@ -4,9 +4,11 @@ import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent'; import nodemailer from 'nodemailer'; import { Inject, Service } from 'typedi'; import winston from 'winston'; -import { parseBody, parseHeaders } from '../config/util'; +import { parseBody, parseHeaders, safeJSONParse } from '../config/util'; import { NotificationInfo } from '../data/notify'; import UserService from './user'; +import { readFile } from 'fs/promises'; +import config from '../config'; @Service() export default class NotificationService { @@ -43,7 +45,28 @@ export default class NotificationService { retry: 1, }; - constructor(@Inject('logger') private logger: winston.Logger) {} + constructor() {} + + public async externalNotify( + title: string, + content: string, + ): Promise { + const { type, ...rest } = safeJSONParse( + await readFile(config.systemNotifyFile, 'utf-8'), + ); + if (type) { + this.title = title; + this.content = content; + this.params = rest; + const notificationModeAction = this.modeMap.get(type); + try { + return await notificationModeAction?.call(this); + } catch (error: any) { + throw error; + } + } + return false; + } public async notify( title: string, diff --git a/back/services/subscription.ts b/back/services/subscription.ts index 5286648d..6f917e54 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -306,7 +306,6 @@ export default class SubscriptionService { for (const doc of docs) { if (doc.pid) { try { - taskLimit.removeQueuedCron(String(doc.id)); await killTask(doc.pid); } catch (error) { this.logger.error(error); diff --git a/back/services/system.ts b/back/services/system.ts index 33f3e64c..442297c6 100644 --- a/back/services/system.ts +++ b/back/services/system.ts @@ -382,7 +382,6 @@ export default class SystemService { return { code: 400, message: '参数错误' }; } - taskLimit.removeQueuedCron(command.replace(/ /g, '-')); if (pid) { await killTask(pid); return { code: 200 }; diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts index 6ccebcd5..d6550613 100644 --- a/back/shared/pLimit.ts +++ b/back/shared/pLimit.ts @@ -3,9 +3,7 @@ import os from 'os'; import { AuthDataType, SystemModel } from '../data/system'; import Logger from '../loaders/logger'; import { Dependence } from '../data/dependence'; -import { ICron } from '../protos/cron'; import NotificationService from '../services/notify'; -import { Inject } from 'typedi'; import { ICronFn, IDependencyFn, @@ -17,7 +15,8 @@ import { class TaskLimit { private dependenyLimit = new PQueue({ concurrency: 1 }); private queuedDependencyIds = new Set([]); - private queuedCrons = new Map(); + private queuedCrons = new Map[]>(); + private repeatCronNotifyMap = new Map(); private updateLogLimit = new PQueue({ concurrency: 1 }); private cronLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), @@ -34,8 +33,6 @@ class TaskLimit { private systemLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4), }); - @Inject((type) => NotificationService) - private notificationService!: NotificationService; get cronLimitActiveCount() { return this.cronLimit.pending; @@ -49,6 +46,8 @@ class TaskLimit { return [...this.queuedDependencyIds.values()][0]; } + private notificationService: NotificationService = new NotificationService(); + constructor() { this.setCustomLimit(); this.handleEvents(); @@ -120,20 +119,19 @@ class TaskLimit { fn: ICronFn, options?: Partial, ): Promise { + fn.cron = cron; let runs = this.queuedCrons.get(cron.id); - if (!runs?.length) { - runs = []; - } - runs.push(cron); - if (runs.length >= 5) { - this.notificationService.notify( + const result = runs?.length ? [...runs, fn] : [fn]; + const repeatTimes = this.repeatCronNotifyMap.get(cron.id) || 0; + if (result?.length > 5 && repeatTimes < 3) { + this.repeatCronNotifyMap.set(cron.id, repeatTimes + 1); + this.notificationService.externalNotify( '任务重复运行', - `任务 ${cron.name} ${cron.command} 处于运行中的已达 5 个,请检查系统日志`, + `任务:${cron.name},命令:${cron.command},定时:${cron.schedule},处于运行中的超过 5 个,请检查定时设置`, ); return; } - this.queuedCrons.set(cron.id, runs); - fn.cron = cron; + this.queuedCrons.set(cron.id, result); return this.cronLimit.add(fn, options); } diff --git a/back/shared/runCron.ts b/back/shared/runCron.ts index 8485c312..3d9fe56b 100644 --- a/back/shared/runCron.ts +++ b/back/shared/runCron.ts @@ -6,7 +6,6 @@ import { ICron } from '../protos/cron'; export function runCron(cmd: string, cron: ICron): Promise { return taskLimit.runWithCronLimit(cron, () => { return new Promise(async (resolve: any) => { - taskLimit.removeQueuedCron(cron.id); Logger.info( `[schedule][开始执行任务] 参数 ${JSON.stringify({ ...cron, @@ -31,6 +30,7 @@ export function runCron(cmd: string, cron: ICron): Promise { }); cp.on('exit', async (code) => { + taskLimit.removeQueuedCron(cron.id); resolve({ ...cron, command: cmd, pid: cp.pid, code }); }); });