修复任务频繁运行通知

This commit is contained in:
whyour 2024-08-25 16:28:32 +08:00
parent 8b042d90f3
commit 65f7483688
8 changed files with 48 additions and 19 deletions

1
.gitignore vendored
View File

@ -26,3 +26,4 @@
__pycache__ __pycache__
/shell/preload/env.* /shell/preload/env.*
/shell/preload/notify.* /shell/preload/notify.*
/shell/preload/*-notify.json

View File

@ -50,6 +50,7 @@ const extraFile = path.join(configPath, 'extra.sh');
const confBakDir = path.join(dataPath, 'config/bak/'); const confBakDir = path.join(dataPath, 'config/bak/');
const sampleFile = path.join(samplePath, 'config.sample.sh'); const sampleFile = path.join(samplePath, 'config.sample.sh');
const sqliteFile = path.join(samplePath, 'database.sqlite'); const sqliteFile = path.join(samplePath, 'database.sqlite');
const systemNotifyFile = path.join(preloadPath, 'system-notify.json');
const authError = '错误的用户名密码,请重试'; const authError = '错误的用户名密码,请重试';
const loginFaild = '请先登录!'; const loginFaild = '请先登录!';
@ -132,4 +133,5 @@ export default {
sqliteFile, sqliteFile,
sshdPath, sshdPath,
systemLogPath, systemLogPath,
systemNotifyFile,
}; };

View File

@ -11,17 +11,24 @@ import { CrontabViewModel, CronViewType } from '../data/cronView';
import { initPosition } from '../data/env'; import { initPosition } from '../data/env';
import { AuthDataType, SystemModel } from '../data/system'; import { AuthDataType, SystemModel } from '../data/system';
import SystemService from '../services/system'; import SystemService from '../services/system';
import UserService from '../services/user';
import { writeFile } from 'fs/promises';
export default async () => { export default async () => {
const cronService = Container.get(CronService); const cronService = Container.get(CronService);
const envService = Container.get(EnvService); const envService = Container.get(EnvService);
const dependenceService = Container.get(DependenceService); const dependenceService = Container.get(DependenceService);
const systemService = Container.get(SystemService); const systemService = Container.get(SystemService);
const userService = Container.get(UserService);
// 初始化增加系统配置 // 初始化增加系统配置
await SystemModel.upsert({ type: AuthDataType.systemConfig }); await SystemModel.upsert({ type: AuthDataType.systemConfig });
await SystemModel.upsert({ type: AuthDataType.notification }); await SystemModel.upsert({ type: AuthDataType.notification });
// 初始化通知配置
const notifyConfig = await userService.getNotificationMode();
await writeFile(config.systemNotifyFile, JSON.stringify(notifyConfig));
const installDependencies = () => { const installDependencies = () => {
// 初始化时安装所有处于安装中,安装成功,安装失败的依赖 // 初始化时安装所有处于安装中,安装成功,安装失败的依赖
DependenceModel.findAll({ DependenceModel.findAll({

View File

@ -4,9 +4,11 @@ import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent';
import nodemailer from 'nodemailer'; import nodemailer from 'nodemailer';
import { Inject, Service } from 'typedi'; import { Inject, Service } from 'typedi';
import winston from 'winston'; import winston from 'winston';
import { parseBody, parseHeaders } from '../config/util'; import { parseBody, parseHeaders, safeJSONParse } from '../config/util';
import { NotificationInfo } from '../data/notify'; import { NotificationInfo } from '../data/notify';
import UserService from './user'; import UserService from './user';
import { readFile } from 'fs/promises';
import config from '../config';
@Service() @Service()
export default class NotificationService { export default class NotificationService {
@ -43,7 +45,28 @@ export default class NotificationService {
retry: 1, retry: 1,
}; };
constructor(@Inject('logger') private logger: winston.Logger) {} constructor() {}
public async externalNotify(
title: string,
content: string,
): Promise<boolean | undefined> {
const { type, ...rest } = safeJSONParse(
await readFile(config.systemNotifyFile, 'utf-8'),
);
if (type) {
this.title = title;
this.content = content;
this.params = rest;
const notificationModeAction = this.modeMap.get(type);
try {
return await notificationModeAction?.call(this);
} catch (error: any) {
throw error;
}
}
return false;
}
public async notify( public async notify(
title: string, title: string,

View File

@ -306,7 +306,6 @@ export default class SubscriptionService {
for (const doc of docs) { for (const doc of docs) {
if (doc.pid) { if (doc.pid) {
try { try {
taskLimit.removeQueuedCron(String(doc.id));
await killTask(doc.pid); await killTask(doc.pid);
} catch (error) { } catch (error) {
this.logger.error(error); this.logger.error(error);

View File

@ -382,7 +382,6 @@ export default class SystemService {
return { code: 400, message: '参数错误' }; return { code: 400, message: '参数错误' };
} }
taskLimit.removeQueuedCron(command.replace(/ /g, '-'));
if (pid) { if (pid) {
await killTask(pid); await killTask(pid);
return { code: 200 }; return { code: 200 };

View File

@ -3,9 +3,7 @@ import os from 'os';
import { AuthDataType, SystemModel } from '../data/system'; import { AuthDataType, SystemModel } from '../data/system';
import Logger from '../loaders/logger'; import Logger from '../loaders/logger';
import { Dependence } from '../data/dependence'; import { Dependence } from '../data/dependence';
import { ICron } from '../protos/cron';
import NotificationService from '../services/notify'; import NotificationService from '../services/notify';
import { Inject } from 'typedi';
import { import {
ICronFn, ICronFn,
IDependencyFn, IDependencyFn,
@ -17,7 +15,8 @@ import {
class TaskLimit { class TaskLimit {
private dependenyLimit = new PQueue({ concurrency: 1 }); private dependenyLimit = new PQueue({ concurrency: 1 });
private queuedDependencyIds = new Set<number>([]); private queuedDependencyIds = new Set<number>([]);
private queuedCrons = new Map<string, TCron[]>(); private queuedCrons = new Map<string, ICronFn<any>[]>();
private repeatCronNotifyMap = new Map<string, number>();
private updateLogLimit = new PQueue({ concurrency: 1 }); private updateLogLimit = new PQueue({ concurrency: 1 });
private cronLimit = new PQueue({ private cronLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4), concurrency: Math.max(os.cpus().length, 4),
@ -34,8 +33,6 @@ class TaskLimit {
private systemLimit = new PQueue({ private systemLimit = new PQueue({
concurrency: Math.max(os.cpus().length, 4), concurrency: Math.max(os.cpus().length, 4),
}); });
@Inject((type) => NotificationService)
private notificationService!: NotificationService;
get cronLimitActiveCount() { get cronLimitActiveCount() {
return this.cronLimit.pending; return this.cronLimit.pending;
@ -49,6 +46,8 @@ class TaskLimit {
return [...this.queuedDependencyIds.values()][0]; return [...this.queuedDependencyIds.values()][0];
} }
private notificationService: NotificationService = new NotificationService();
constructor() { constructor() {
this.setCustomLimit(); this.setCustomLimit();
this.handleEvents(); this.handleEvents();
@ -120,20 +119,19 @@ class TaskLimit {
fn: ICronFn<T>, fn: ICronFn<T>,
options?: Partial<QueueAddOptions>, options?: Partial<QueueAddOptions>,
): Promise<T | void> { ): Promise<T | void> {
fn.cron = cron;
let runs = this.queuedCrons.get(cron.id); let runs = this.queuedCrons.get(cron.id);
if (!runs?.length) { const result = runs?.length ? [...runs, fn] : [fn];
runs = []; const repeatTimes = this.repeatCronNotifyMap.get(cron.id) || 0;
} if (result?.length > 5 && repeatTimes < 3) {
runs.push(cron); this.repeatCronNotifyMap.set(cron.id, repeatTimes + 1);
if (runs.length >= 5) { this.notificationService.externalNotify(
this.notificationService.notify(
'任务重复运行', '任务重复运行',
`任务 ${cron.name} ${cron.command} 处于运行中的已达 5 个,请检查系统日志`, `任务${cron.name},命令:${cron.command},定时:${cron.schedule},处于运行中的超过 5 个,请检查定时设置`,
); );
return; return;
} }
this.queuedCrons.set(cron.id, runs); this.queuedCrons.set(cron.id, result);
fn.cron = cron;
return this.cronLimit.add(fn, options); return this.cronLimit.add(fn, options);
} }

View File

@ -6,7 +6,6 @@ import { ICron } from '../protos/cron';
export function runCron(cmd: string, cron: ICron): Promise<number | void> { export function runCron(cmd: string, cron: ICron): Promise<number | void> {
return taskLimit.runWithCronLimit(cron, () => { return taskLimit.runWithCronLimit(cron, () => {
return new Promise(async (resolve: any) => { return new Promise(async (resolve: any) => {
taskLimit.removeQueuedCron(cron.id);
Logger.info( Logger.info(
`[schedule][开始执行任务] 参数 ${JSON.stringify({ `[schedule][开始执行任务] 参数 ${JSON.stringify({
...cron, ...cron,
@ -31,6 +30,7 @@ export function runCron(cmd: string, cron: ICron): Promise<number | void> {
}); });
cp.on('exit', async (code) => { cp.on('exit', async (code) => {
taskLimit.removeQueuedCron(cron.id);
resolve({ ...cron, command: cmd, pid: cp.pid, code }); resolve({ ...cron, command: cmd, pid: cp.pid, code });
}); });
}); });