From ec5b88547612a36aade73b56fdd7510152f76af8 Mon Sep 17 00:00:00 2001 From: whyour Date: Fri, 6 Oct 2023 02:34:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=BB=E5=8A=A1=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=89=A7=E8=A1=8C=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/api/log.ts | 2 +- back/api/script.ts | 7 ++-- back/config/util.ts | 10 ++++-- back/protos/cron.proto | 1 + back/protos/cron.ts | 16 ++++++++- back/schedule/addCron.ts | 13 +++---- back/services/cron.ts | 25 ++++++------- back/services/schedule.ts | 41 +++++++++++++++------- back/services/script.ts | 7 ++-- back/services/subscription.ts | 6 +++- back/services/system.ts | 7 ++-- back/shared/pLimit.ts | 66 ++++++++++++++++++++++++----------- back/shared/runCron.ts | 10 +++--- package.json | 2 +- pnpm-lock.yaml | 21 +++++++++-- src/pages/crontab/modal.tsx | 6 +++- 16 files changed, 162 insertions(+), 78 deletions(-) diff --git a/back/api/log.ts b/back/api/log.ts index 797966a9..521c089c 100644 --- a/back/api/log.ts +++ b/back/api/log.ts @@ -65,7 +65,7 @@ export default (app: Router) => { }; const filePath = join(config.logPath, path, filename); if (type === 'directory') { - emptyDir(filePath); + await emptyDir(filePath); } else { fs.unlinkSync(filePath); } diff --git a/back/api/script.ts b/back/api/script.ts index 3b087d2d..8cee7095 100644 --- a/back/api/script.ts +++ b/back/api/script.ts @@ -183,7 +183,7 @@ export default (app: Router) => { }; const filePath = join(config.scriptPath, path, filename); if (type === 'directory') { - emptyDir(filePath); + await emptyDir(filePath); } else { fs.unlinkSync(filePath); } @@ -260,7 +260,6 @@ export default (app: Router) => { }), }), async (req: Request, res: Response, next: NextFunction) => { - const logger: Logger = Container.get('logger'); try { let { filename, path, pid } = req.body; const { name, ext } = parse(filename); @@ -269,7 +268,9 @@ export default (app: Router) => { const scriptService = Container.get(ScriptService); const result = await scriptService.stopScript(filePath, pid); - emptyDir(logPath); + setTimeout(() => { + emptyDir(logPath); + }, 3000); res.send(result); } catch (e) { return next(e); diff --git a/back/config/util.ts b/back/config/util.ts index 58b42ebc..28229af8 100644 --- a/back/config/util.ts +++ b/back/config/util.ts @@ -298,13 +298,17 @@ export function readDir( return result; } -export function emptyDir(path: string) { +export async function emptyDir(path: string) { + const pathExist = await fileExist(path); + if (!pathExist) { + return; + } const files = fs.readdirSync(path); - files.forEach((file) => { + files.forEach(async (file) => { const filePath = `${path}/${file}`; const stats = fs.statSync(filePath); if (stats.isDirectory()) { - emptyDir(filePath); + await emptyDir(filePath); } else { fs.unlinkSync(filePath); } diff --git a/back/protos/cron.proto b/back/protos/cron.proto index b09ba065..e88d6b1d 100644 --- a/back/protos/cron.proto +++ b/back/protos/cron.proto @@ -14,6 +14,7 @@ message ICron { string schedule = 2; string command = 3; repeated ISchedule extra_schedules = 4; + string name = 5; } message AddCronRequest { repeated ICron crons = 1; } diff --git a/back/protos/cron.ts b/back/protos/cron.ts index e60f686b..fe74d148 100644 --- a/back/protos/cron.ts +++ b/back/protos/cron.ts @@ -24,6 +24,7 @@ export interface ICron { schedule: string; command: string; extraSchedules: ISchedule[]; + name: string; } export interface AddCronRequest { @@ -97,7 +98,7 @@ export const ISchedule = { }; function createBaseICron(): ICron { - return { id: "", schedule: "", command: "", extraSchedules: [] }; + return { id: "", schedule: "", command: "", extraSchedules: [], name: "" }; } export const ICron = { @@ -114,6 +115,9 @@ export const ICron = { for (const v of message.extraSchedules) { ISchedule.encode(v!, writer.uint32(34).fork()).ldelim(); } + if (message.name !== "") { + writer.uint32(42).string(message.name); + } return writer; }, @@ -152,6 +156,13 @@ export const ICron = { message.extraSchedules.push(ISchedule.decode(reader, reader.uint32())); continue; + case 5: + if (tag !== 42) { + break; + } + + message.name = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -169,6 +180,7 @@ export const ICron = { extraSchedules: Array.isArray(object?.extraSchedules) ? object.extraSchedules.map((e: any) => ISchedule.fromJSON(e)) : [], + name: isSet(object.name) ? String(object.name) : "", }; }, @@ -182,6 +194,7 @@ export const ICron = { } else { obj.extraSchedules = []; } + message.name !== undefined && (obj.name = message.name); return obj; }, @@ -195,6 +208,7 @@ export const ICron = { message.schedule = object.schedule ?? ""; message.command = object.command ?? ""; message.extraSchedules = object.extraSchedules?.map((e) => ISchedule.fromPartial(e)) || []; + message.name = object.name ?? ""; return message; }, }; diff --git a/back/schedule/addCron.ts b/back/schedule/addCron.ts index e5289c1d..1889888a 100644 --- a/back/schedule/addCron.ts +++ b/back/schedule/addCron.ts @@ -3,7 +3,6 @@ import { AddCronRequest, AddCronResponse } from '../protos/cron'; import nodeSchedule from 'node-schedule'; import { scheduleStacks } from './data'; import { runCron } from '../shared/runCron'; -import { QL_PREFIX, TASK_PREFIX } from '../config/const'; import Logger from '../loaders/logger'; const addCron = ( @@ -11,14 +10,15 @@ const addCron = ( callback: sendUnaryData, ) => { for (const item of call.request.crons) { - const { id, schedule, command, extraSchedules } = item; + const { id, schedule, command, extraSchedules, name } = item; if (scheduleStacks.has(id)) { scheduleStacks.get(id)?.forEach((x) => x.cancel()); } Logger.info( - '[schedule][创建定时任务], 任务ID: %s, cron: %s, 执行命令: %s', + '[schedule][创建定时任务], 任务ID: %s, 名称: %s, cron: %s, 执行命令: %s', id, + name, schedule, command, ); @@ -26,8 +26,9 @@ const addCron = ( if (extraSchedules?.length) { extraSchedules.forEach(x => { Logger.info( - '[schedule][创建定时任务], 任务ID: %s, cron: %s, 执行命令: %s', + '[schedule][创建定时任务], 任务ID: %s, 名称: %s, cron: %s, 执行命令: %s', id, + name, x.schedule, command, ); @@ -37,13 +38,13 @@ const addCron = ( scheduleStacks.set(id, [ nodeSchedule.scheduleJob(id, schedule, async () => { Logger.info(`[schedule][准备运行任务] 命令: ${command}`); - runCron(command); + runCron(command, { name, schedule, extraSchedules }); }), ...(extraSchedules?.length ? extraSchedules.map((x) => nodeSchedule.scheduleJob(id, x.schedule, async () => { Logger.info(`[schedule][准备运行任务] 命令: ${command}`); - runCron(command); + runCron(command, { name, schedule, extraSchedules }); }), ) : []), diff --git a/back/services/cron.ts b/back/services/cron.ts index 078527fd..6ac69228 100644 --- a/back/services/cron.ts +++ b/back/services/cron.ts @@ -35,7 +35,7 @@ export default class CronService { const doc = await this.insert(tab); if (this.isSixCron(doc) || doc.extra_schedules?.length) { await cronClient.addCron([ - { id: String(doc.id), schedule: doc.schedule!, command: this.makeCommand(doc), extraSchedules: doc.extra_schedules || [] }, + { name: doc.name || '', id: String(doc.id), schedule: doc.schedule!, command: this.makeCommand(doc), extraSchedules: doc.extra_schedules || [] }, ]); } await this.set_crontab(); @@ -60,6 +60,7 @@ export default class CronService { if (this.isSixCron(newDoc) || newDoc.extra_schedules?.length) { await cronClient.addCron([ { + name: doc.name || '', id: String(newDoc.id), schedule: newDoc.schedule!, command: this.makeCommand(newDoc), @@ -391,15 +392,18 @@ export default class CronService { ); } - private async runSingle(cronId: number): Promise { - return taskLimit.runWithCpuLimit(() => { + private async runSingle(cronId: number): Promise { + return taskLimit.runWithCronLimit(() => { return new Promise(async (resolve: any) => { const cron = await this.getDb({ id: cronId }); + const params = { name: cron.name, command: cron.command, schedule: cron.schedule, extraSchedules: cron.extra_schedules }; if (cron.status !== CrontabStatus.queued) { - resolve(); + resolve(params); return; } + this.logger.info(`[panel][开始执行任务] 参数 ${JSON.stringify(params)}`); + let { id, command, log_path } = cron; const uniqPath = await getUniqPath(command, `${id}`); const logTime = dayjs().format('YYYY-MM-DD-HH-mm-ss-SSS'); @@ -410,10 +414,6 @@ export default class CronService { const logPath = `${uniqPath}/${logTime}.log`; const absolutePath = path.resolve(config.logPath, `${logPath}`); - this.logger.silly('Running job'); - this.logger.silly('ID: ' + id); - this.logger.silly('Original command: ' + command); - const cp = spawn(`real_log_path=${logPath} no_delay=true ${this.makeCommand(cron)}`, { shell: '/bin/bash' }); await CrontabModel.update( @@ -427,17 +427,12 @@ export default class CronService { fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`); }); - cp.on('exit', async (code, signal) => { - this.logger.info( - `[panel][任务退出] 任务 ${command} 进程id: ${cp.pid}, 退出码 ${code}`, - ); - }); cp.on('close', async (code) => { await CrontabModel.update( { status: CrontabStatus.idle, pid: undefined }, { where: { id } }, ); - resolve(); + resolve({ ...params, pid: cp.pid, code }); }); }); }); @@ -455,6 +450,7 @@ export default class CronService { const sixCron = docs .filter((x) => this.isSixCron(x)) .map((doc) => ({ + name: doc.name || '', id: String(doc.id), schedule: doc.schedule!, command: this.makeCommand(doc), @@ -586,6 +582,7 @@ export default class CronService { const sixCron = tabs.data .filter((x) => this.isSixCron(x) && x.isDisabled !== 1) .map((doc) => ({ + name: doc.name || '', id: String(doc.id), schedule: doc.schedule!, command: this.makeCommand(doc), diff --git a/back/services/schedule.ts b/back/services/schedule.ts index 0e6cf4a2..efa1db2a 100644 --- a/back/services/schedule.ts +++ b/back/services/schedule.ts @@ -42,15 +42,22 @@ export default class ScheduleService { private maxBuffer = 200 * 1024 * 1024; - constructor(@Inject('logger') private logger: winston.Logger) {} + constructor(@Inject('logger') private logger: winston.Logger) { } async runTask( command: string, callbacks: TaskCallbacks = {}, + params: { + schedule?: string; + name?: string; + command?: string; + }, completionTime: 'start' | 'end' = 'end', ) { - return taskLimit.runWithCpuLimit(() => { + return taskLimit.runWithCronLimit(() => { return new Promise(async (resolve, reject) => { + this.logger.info(`[panel][开始执行任务] 参数 ${JSON.stringify({ ...params, command })}`); + try { const startTime = dayjs(); await callbacks.onBefore?.(startTime); @@ -82,12 +89,6 @@ export default class ScheduleService { await callbacks.onError?.(JSON.stringify(err)); }); - cp.on('exit', async (code, signal) => { - this.logger.info( - `[panel][任务退出] ${command} 进程id: ${cp.pid}, 退出码 ${code}`, - ); - }); - cp.on('close', async (code) => { const endTime = dayjs(); await callbacks.onEnd?.( @@ -95,7 +96,7 @@ export default class ScheduleService { endTime, endTime.diff(startTime, 'seconds'), ); - resolve(null); + resolve({ ...params, pid: cp.pid, code }); }); } catch (error) { this.logger.error( @@ -126,12 +127,20 @@ export default class ScheduleService { this.scheduleStacks.set( _id, nodeSchedule.scheduleJob(_id, schedule, async () => { - this.runTask(command, callbacks); + this.runTask(command, callbacks, { + name, + schedule, + command, + }); }), ); if (runImmediately) { - this.runTask(command, callbacks); + this.runTask(command, callbacks, { + name, + schedule, + command, + }); } } @@ -160,7 +169,10 @@ export default class ScheduleService { const task = new Task( name, () => { - this.runTask(command, callbacks); + this.runTask(command, callbacks, { + name, + command, + }); }, (err) => { this.logger.error( @@ -180,7 +192,10 @@ export default class ScheduleService { this.intervalSchedule.addIntervalJob(job); if (runImmediately) { - this.runTask(command, callbacks); + this.runTask(command, callbacks, { + name, + command, + }); } } diff --git a/back/services/script.ts b/back/services/script.ts index 95681549..03af4929 100644 --- a/back/services/script.ts +++ b/back/services/script.ts @@ -16,14 +16,14 @@ export default class ScriptService { private sockService: SockService, private cronService: CronService, private scheduleService: ScheduleService, - ) {} + ) { } private taskCallbacks(filePath: string): TaskCallbacks { return { onEnd: async (cp, endTime, diff) => { try { fs.unlinkSync(filePath); - } catch (error) {} + } catch (error) { } }, onError: async (message: string) => { this.sockService.sendMessage({ @@ -46,6 +46,7 @@ export default class ScriptService { const pid = await this.scheduleService.runTask( command, this.taskCallbacks(filePath), + { command }, 'start', ); @@ -59,7 +60,7 @@ export default class ScriptService { } try { await killTask(pid); - } catch (error) {} + } catch (error) { } return { code: 200 }; } diff --git a/back/services/subscription.ts b/back/services/subscription.ts index 960bdfef..14b20ad1 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -320,7 +320,11 @@ export default class SubscriptionService { const command = formatCommand(subscription); - this.scheduleService.runTask(command, this.taskCallbacks(subscription)); + this.scheduleService.runTask(command, this.taskCallbacks(subscription), { + name: subscription.name, + schedule: subscription.schedule, + command + }); } public async disabled(ids: number[]) { diff --git a/back/services/system.ts b/back/services/system.ts index 88c61760..c2f67ae7 100644 --- a/back/services/system.ts +++ b/back/services/system.ts @@ -39,7 +39,7 @@ export default class SystemService { @Inject('logger') private logger: winston.Logger, private scheduleService: ScheduleService, private sockService: SockService, - ) {} + ) { } public async getSystemConfig() { const doc = await this.getDb({ type: AuthDataType.systemConfig }); @@ -114,7 +114,7 @@ export default class SystemService { }, ); lastVersionContent = await parseContentVersion(result.body); - } catch (error) {} + } catch (error) { } if (!lastVersionContent) { lastVersionContent = currentVersionContent; @@ -232,6 +232,9 @@ export default class SystemService { this.scheduleService.runTask( `real_log_path=${logPath} real_time=true ${command}`, callback, + { + command, + } ); } diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts index e3841cb8..2dbbfaf6 100644 --- a/back/shared/pLimit.ts +++ b/back/shared/pLimit.ts @@ -1,29 +1,57 @@ -import pLimit from 'p-limit'; +import PQueue, { QueueAddOptions } from 'p-queue-cjs'; import os from 'os'; import { AuthDataType, AuthModel } from '../data/auth'; import Logger from '../loaders/logger'; -import dayjs from 'dayjs'; class TaskLimit { - private oneLimit = pLimit(1); - private updateLogLimit = pLimit(1); - private cpuLimit = pLimit(Math.max(os.cpus().length, 4)); + private oneLimit = new PQueue({ concurrency: 1 }); + private updateLogLimit = new PQueue({ concurrency: 1 }); + private cronLimit = new PQueue({ concurrency: Math.max(os.cpus().length, 4) }); - get cpuLimitActiveCount() { - return this.cpuLimit.activeCount; + get cronLimitActiveCount() { + return this.cronLimit.pending; } - get cpuLimitPendingCount() { - return this.cpuLimit.pendingCount; + get cronLimitPendingCount() { + return this.cronLimit.size; } constructor() { this.setCustomLimit(); } + private handleEvents() { + this.cronLimit.on('add', () => { + Logger.info( + `[schedule][任务加入队列] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}`, + ); + }) + this.cronLimit.on('active', () => { + Logger.info( + `[schedule][开始处理任务] 运行中任务数: ${this.cronLimitActiveCount + 1}, 等待中任务数: ${this.cronLimitPendingCount}`, + ); + }) + this.cronLimit.on('completed', (param) => { + Logger.info( + `[schedule][任务处理完成] 运行中任务数: ${this.cronLimitActiveCount - 1}, 等待中任务数: ${this.cronLimitPendingCount}, 参数 ${JSON.stringify(param)}`, + ); + }); + this.cronLimit.on('error', error => { + Logger.error( + `[schedule][处理任务错误] 运行中任务数: ${this.cronLimitActiveCount}, 等待中任务数: ${this.cronLimitPendingCount}, 参数 ${JSON.stringify(error)}`, + ); + }); + this.cronLimit.on('idle', () => { + Logger.info( + `[schedule][任务队列] 空闲中...`, + ); + }); + } + public async setCustomLimit(limit?: number) { if (limit) { - this.cpuLimit = pLimit(limit); + this.cronLimit = new PQueue({ concurrency: limit });; + this.handleEvents(); return; } await AuthModel.sync(); @@ -31,23 +59,21 @@ class TaskLimit { where: { type: AuthDataType.systemConfig }, }); if (doc?.info?.cronConcurrency) { - this.cpuLimit = pLimit(doc?.info?.cronConcurrency); + this.cronLimit = new PQueue({ concurrency: doc?.info?.cronConcurrency }); + this.handleEvents(); } } - public runWithCpuLimit(fn: () => Promise): Promise { - Logger.info( - `[schedule][任务加入队列] 运行中任务数: ${this.cpuLimitActiveCount}, 等待中任务数: ${this.cpuLimitPendingCount}`, - ); - return this.cpuLimit(fn); + public async runWithCronLimit(fn: () => Promise, options?: Partial): Promise { + return this.cronLimit.add(fn, options); } - public runOneByOne(fn: () => Promise): Promise { - return this.oneLimit(fn); + public runOneByOne(fn: () => Promise, options?: Partial): Promise { + return this.oneLimit.add(fn, options); } - public updateDepLog(fn: () => Promise): Promise { - return this.updateLogLimit(fn); + public updateDepLog(fn: () => Promise, options?: Partial): Promise { + return this.updateLogLimit.add(fn, options); } } diff --git a/back/shared/runCron.ts b/back/shared/runCron.ts index a1cdaaa2..0160b214 100644 --- a/back/shared/runCron.ts +++ b/back/shared/runCron.ts @@ -2,11 +2,10 @@ import { spawn } from 'cross-spawn'; import taskLimit from './pLimit'; import Logger from '../loaders/logger'; -export function runCron(cmd: string): Promise { - return taskLimit.runWithCpuLimit(() => { +export function runCron(cmd: string, options?: { schedule: string; extraSchedules: Array<{ schedule: string }>; name: string }): Promise { + return taskLimit.runWithCronLimit(() => { return new Promise(async (resolve: any) => { - Logger.info(`[schedule][开始执行任务] 运行命令: ${cmd}`); - + Logger.info(`[schedule][开始执行任务] 参数 ${JSON.stringify({ ...options, command: cmd })}`); const cp = spawn(cmd, { shell: '/bin/bash' }); cp.stderr.on('data', (data) => { @@ -25,8 +24,7 @@ export function runCron(cmd: string): Promise { }); cp.on('close', async (code) => { - Logger.info(`[schedule][任务退出] ${cmd} 进程id: ${cp.pid} 退出, 退出码 ${code}`); - resolve(); + resolve({ ...options, command: cmd, pid: cp.pid, code }); }); }); }); diff --git a/package.json b/package.json index 621e091f..f681f4cb 100644 --- a/package.json +++ b/package.json @@ -83,7 +83,7 @@ "nedb": "^1.8.0", "node-schedule": "^2.1.0", "nodemailer": "^6.7.2", - "p-limit": "3.1.0", + "p-queue-cjs": "7.3.4", "protobufjs": "^7.2.3", "pstree.remy": "^1.1.8", "reflect-metadata": "^0.1.13", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6d670c90..240ce1dd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -85,9 +85,9 @@ dependencies: nodemailer: specifier: ^6.7.2 version: 6.9.3 - p-limit: - specifier: 3.1.0 - version: 3.1.0 + p-queue-cjs: + specifier: 7.3.4 + version: 7.3.4 protobufjs: specifier: ^7.2.3 version: 7.2.3 @@ -11633,6 +11633,7 @@ packages: engines: {node: '>=10'} dependencies: yocto-queue: 0.1.0 + dev: true /p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} @@ -11654,6 +11655,19 @@ packages: dependencies: aggregate-error: 3.1.0 + /p-queue-cjs@7.3.4: + resolution: {integrity: sha512-vP0BvEAgmUEShxWBCETvxiUDnwSiCLfBRqmhdKlNvcXF/7x2yemtYLcxT1pfYELZLlyGL3grvGnq+KF5OwNZfA==} + engines: {node: '>=12'} + dependencies: + eventemitter3: 4.0.7 + p-timeout-cjs: 5.0.5 + dev: false + + /p-timeout-cjs@5.0.5: + resolution: {integrity: sha512-tjXKZjvzLUGerHxY0+hf0XROyF2XtuZpLNtUlkOOW+nVjDIoH0pKi3hh4X4+MXLqYavcIITLjNC0GZHUCRrwpA==} + engines: {node: '>=12'} + dev: false + /p-try@2.2.0: resolution: {integrity: sha512-R4nPAVTAU0B9D35/Gk3uJf/7XYbQcyohSKdvAxIRSNghFl4e71hVoGnBNQz9cWaXxO2I10KTC+3jMdvvoKw6dQ==} engines: {node: '>=6'} @@ -16299,6 +16313,7 @@ packages: /yocto-queue@0.1.0: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + dev: true /yorkie@2.0.0: resolution: {integrity: sha512-jcKpkthap6x63MB4TxwCyuIGkV0oYP/YRyuQU5UO0Yz/E/ZAu+653/uov+phdmO54n6BcvFRyyt0RRrWdN2mpw==} diff --git a/src/pages/crontab/modal.tsx b/src/pages/crontab/modal.tsx index 1a5e259a..7f5924a9 100644 --- a/src/pages/crontab/modal.tsx +++ b/src/pages/crontab/modal.tsx @@ -74,7 +74,11 @@ const CronModal = ({ name="form_in_modal" initialValues={cron} > - +