From f8dfee8945008b4cf66544a01766b4cdc7625451 Mon Sep 17 00:00:00 2001 From: whyour Date: Tue, 30 May 2023 16:32:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BB=BB=E5=8A=A1=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=89=A7=E8=A1=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- back/schedule/addCron.ts | 4 +- back/schedule/health.ts | 1 - back/services/cron.ts | 116 +++++++++++---------- back/services/dependence.ts | 191 +++++++++++++++++----------------- back/services/schedule.ts | 114 ++++++++++---------- back/services/subscription.ts | 31 ++---- back/services/user.ts | 1 - back/shared/pLimit.ts | 10 ++ back/shared/runCron.ts | 37 +++++++ package.json | 1 + pnpm-lock.yaml | 15 +++ 11 files changed, 285 insertions(+), 236 deletions(-) create mode 100644 back/shared/pLimit.ts create mode 100644 back/shared/runCron.ts diff --git a/back/schedule/addCron.ts b/back/schedule/addCron.ts index c9b4f451..f9e8c3ae 100644 --- a/back/schedule/addCron.ts +++ b/back/schedule/addCron.ts @@ -2,7 +2,7 @@ import { ServerUnaryCall, sendUnaryData } from '@grpc/grpc-js'; import { AddCronRequest, AddCronResponse } from '../protos/cron'; import nodeSchedule from 'node-schedule'; import { scheduleStacks } from './data'; -import { exec } from 'child_process'; +import { runCron } from '../shared/runCron'; const addCron = ( call: ServerUnaryCall, @@ -16,7 +16,7 @@ const addCron = ( scheduleStacks.set( id, nodeSchedule.scheduleJob(id, schedule, async () => { - exec(`ID=${id} ${command}`); + runCron(`ID=${id} ${command}`) }), ); } diff --git a/back/schedule/health.ts b/back/schedule/health.ts index 4d1ee698..32be7eef 100644 --- a/back/schedule/health.ts +++ b/back/schedule/health.ts @@ -1,6 +1,5 @@ import { ServerUnaryCall, sendUnaryData } from '@grpc/grpc-js'; import { HealthCheckRequest, HealthCheckResponse } from '../protos/health'; -import { exec } from 'child_process'; import config from '../config'; import { promiseExec } from '../config/util'; diff --git a/back/services/cron.ts b/back/services/cron.ts index e85cfd0a..999fba56 100644 --- a/back/services/cron.ts +++ b/back/services/cron.ts @@ -16,6 +16,7 @@ import { Op, where, col as colFn, FindOptions } from 'sequelize'; import path from 'path'; import { TASK_PREFIX, QL_PREFIX } from '../config/const'; import cronClient from '../schedule/client'; +import { runCronWithLimit } from '../shared/pLimit'; @Service() export default class CronService { @@ -365,10 +366,9 @@ export default class CronService { { status: CrontabStatus.queued }, { where: { id: ids } }, ); - concurrentRun( - ids.map((id) => async () => await this.runSingle(id)), - 10, - ); + ids.forEach(id => { + this.runSingle(id) + }) } public async stop(ids: number[]) { @@ -390,65 +390,67 @@ export default class CronService { } private async runSingle(cronId: number): Promise { - return new Promise(async (resolve: any) => { - const cron = await this.getDb({ id: cronId }); - if (cron.status !== CrontabStatus.queued) { - resolve(); - return; - } - - let { id, command, log_path } = cron; - const absolutePath = path.resolve(config.logPath, `${log_path}`); - const logFileExist = log_path && (await fileExist(absolutePath)); - - this.logger.silly('Running job'); - this.logger.silly('ID: ' + id); - this.logger.silly('Original command: ' + command); - - let cmdStr = command; - if (!cmdStr.startsWith(TASK_PREFIX) && !cmdStr.startsWith(QL_PREFIX)) { - cmdStr = `${TASK_PREFIX}${cmdStr}`; - } - if ( - cmdStr.endsWith('.js') || - cmdStr.endsWith('.py') || - cmdStr.endsWith('.pyc') || - cmdStr.endsWith('.sh') || - cmdStr.endsWith('.ts') - ) { - cmdStr = `${cmdStr} now`; - } - - const cp = spawn(`ID=${id} ${cmdStr}`, { shell: '/bin/bash' }); - - await CrontabModel.update( - { status: CrontabStatus.running, pid: cp.pid }, - { where: { id } }, - ); - cp.stderr.on('data', (data) => { - if (logFileExist) { - fs.appendFileSync(`${absolutePath}`, `${data.toString()}`); + return runCronWithLimit(() => { + return new Promise(async (resolve: any) => { + const cron = await this.getDb({ id: cronId }); + if (cron.status !== CrontabStatus.queued) { + resolve(); + return; } - }); - cp.on('error', (err) => { - if (logFileExist) { - fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`); - } - }); - cp.on('exit', async (code, signal) => { - this.logger.info( - `任务 ${command} 进程id: ${cp.pid} 退出,退出码 ${code}`, - ); - }); - cp.on('close', async (code) => { + let { id, command, log_path } = cron; + const absolutePath = path.resolve(config.logPath, `${log_path}`); + const logFileExist = log_path && (await fileExist(absolutePath)); + + this.logger.silly('Running job'); + this.logger.silly('ID: ' + id); + this.logger.silly('Original command: ' + command); + + let cmdStr = command; + if (!cmdStr.startsWith(TASK_PREFIX) && !cmdStr.startsWith(QL_PREFIX)) { + cmdStr = `${TASK_PREFIX}${cmdStr}`; + } + if ( + cmdStr.endsWith('.js') || + cmdStr.endsWith('.py') || + cmdStr.endsWith('.pyc') || + cmdStr.endsWith('.sh') || + cmdStr.endsWith('.ts') + ) { + cmdStr = `${cmdStr} now`; + } + + const cp = spawn(`ID=${id} ${cmdStr}`, { shell: '/bin/bash' }); + await CrontabModel.update( - { status: CrontabStatus.idle, pid: undefined }, + { status: CrontabStatus.running, pid: cp.pid }, { where: { id } }, ); - resolve(); + cp.stderr.on('data', (data) => { + if (logFileExist) { + fs.appendFileSync(`${absolutePath}`, `${data.toString()}`); + } + }); + cp.on('error', (err) => { + if (logFileExist) { + fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`); + } + }); + + cp.on('exit', async (code, signal) => { + this.logger.info( + `任务 ${command} 进程id: ${cp.pid} 退出,退出码 ${code}`, + ); + }); + cp.on('close', async (code) => { + await CrontabModel.update( + { status: CrontabStatus.idle, pid: undefined }, + { where: { id } }, + ); + resolve(); + }); }); - }); + }) } public async disabled(ids: number[]) { diff --git a/back/services/dependence.ts b/back/services/dependence.ts index 52e2c24c..54c178e5 100644 --- a/back/services/dependence.ts +++ b/back/services/dependence.ts @@ -14,13 +14,14 @@ import SockService from './sock'; import { FindOptions, Op } from 'sequelize'; import { concurrentRun } from '../config/util'; import dayjs from 'dayjs'; +import { runCronWithLimit } from 'back/shared/pLimit'; @Service() export default class DependenceService { constructor( @Inject('logger') private logger: winston.Logger, private sockService: SockService, - ) {} + ) { } public async create(payloads: Dependence[]): Promise { const tabs = payloads.map((x) => { @@ -104,20 +105,17 @@ export default class DependenceService { isInstall: boolean = true, force: boolean = false, ) { - concurrentRun( - docs.map((dep) => async () => { - const status = isInstall - ? DependenceStatus.installing - : DependenceStatus.removing; - await DependenceModel.update({ status }, { where: { id: dep.id } }); - return await this.installOrUninstallDependencies( - [dep], - isInstall, - force, - ); - }), - 1, - ); + docs.forEach(async (dep) => { + const status = isInstall + ? DependenceStatus.installing + : DependenceStatus.removing; + await DependenceModel.update({ status }, { where: { id: dep.id } }); + this.installOrUninstallDependencies( + [dep], + isInstall, + force, + ) + }) } public async reInstall(ids: number[]): Promise { @@ -157,72 +155,28 @@ export default class DependenceService { isInstall: boolean = true, force: boolean = false, ) { - return new Promise(async (resolve) => { - if (dependencies.length === 0) { - resolve(null); - return; - } - const socketMessageType = !force - ? 'installDependence' - : 'uninstallDependence'; - const depNames = dependencies.map((x) => x.name).join(' '); - const depRunCommand = ( - isInstall - ? InstallDependenceCommandTypes - : unInstallDependenceCommandTypes - )[dependencies[0].type as any]; - const actionText = isInstall ? '安装' : '删除'; - const depIds = dependencies.map((x) => x.id) as number[]; - const startTime = dayjs(); + return runCronWithLimit(() => { + return new Promise(async (resolve) => { + if (dependencies.length === 0) { + resolve(null); + return; + } + const socketMessageType = !force + ? 'installDependence' + : 'uninstallDependence'; + const depNames = dependencies.map((x) => x.name).join(' '); + const depRunCommand = ( + isInstall + ? InstallDependenceCommandTypes + : unInstallDependenceCommandTypes + )[dependencies[0].type as any]; + const actionText = isInstall ? '安装' : '删除'; + const depIds = dependencies.map((x) => x.id) as number[]; + const startTime = dayjs(); - const message = `开始${actionText}依赖 ${depNames},开始时间 ${startTime.format( - 'YYYY-MM-DD HH:mm:ss', - )}\n\n`; - this.sockService.sendMessage({ - type: socketMessageType, - message, - references: depIds, - }); - await this.updateLog(depIds, message); - - const cp = spawn(`${depRunCommand} ${depNames}`, { shell: '/bin/bash' }); - - cp.stdout.on('data', async (data) => { - this.sockService.sendMessage({ - type: socketMessageType, - message: data.toString(), - references: depIds, - }); - await this.updateLog(depIds, data.toString()); - }); - - cp.stderr.on('data', async (data) => { - this.sockService.sendMessage({ - type: socketMessageType, - message: data.toString(), - references: depIds, - }); - await this.updateLog(depIds, data.toString()); - }); - - cp.on('error', async (err) => { - this.sockService.sendMessage({ - type: socketMessageType, - message: JSON.stringify(err), - references: depIds, - }); - await this.updateLog(depIds, JSON.stringify(err)); - resolve(null); - }); - - cp.on('close', async (code) => { - const endTime = dayjs(); - const isSucceed = code === 0; - const resultText = isSucceed ? '成功' : '失败'; - - const message = `\n依赖${actionText}${resultText},结束时间 ${endTime.format( + const message = `开始${actionText}依赖 ${depNames},开始时间 ${startTime.format( 'YYYY-MM-DD HH:mm:ss', - )},耗时 ${endTime.diff(startTime, 'second')} 秒`; + )}\n\n`; this.sockService.sendMessage({ type: socketMessageType, message, @@ -230,25 +184,70 @@ export default class DependenceService { }); await this.updateLog(depIds, message); - let status = null; - if (isSucceed) { - status = isInstall - ? DependenceStatus.installed - : DependenceStatus.removed; - } else { - status = isInstall - ? DependenceStatus.installFailed - : DependenceStatus.removeFailed; - } - await DependenceModel.update({ status }, { where: { id: depIds } }); + const cp = spawn(`${depRunCommand} ${depNames}`, { shell: '/bin/bash' }); - // 如果删除依赖成功或者强制删除 - if ((isSucceed || force) && !isInstall) { - this.removeDb(depIds); - } + cp.stdout.on('data', async (data) => { + this.sockService.sendMessage({ + type: socketMessageType, + message: data.toString(), + references: depIds, + }); + await this.updateLog(depIds, data.toString()); + }); - resolve(null); + cp.stderr.on('data', async (data) => { + this.sockService.sendMessage({ + type: socketMessageType, + message: data.toString(), + references: depIds, + }); + await this.updateLog(depIds, data.toString()); + }); + + cp.on('error', async (err) => { + this.sockService.sendMessage({ + type: socketMessageType, + message: JSON.stringify(err), + references: depIds, + }); + await this.updateLog(depIds, JSON.stringify(err)); + }); + + cp.on('close', async (code) => { + const endTime = dayjs(); + const isSucceed = code === 0; + const resultText = isSucceed ? '成功' : '失败'; + + const message = `\n依赖${actionText}${resultText},结束时间 ${endTime.format( + 'YYYY-MM-DD HH:mm:ss', + )},耗时 ${endTime.diff(startTime, 'second')} 秒`; + this.sockService.sendMessage({ + type: socketMessageType, + message, + references: depIds, + }); + await this.updateLog(depIds, message); + + let status = null; + if (isSucceed) { + status = isInstall + ? DependenceStatus.installed + : DependenceStatus.removed; + } else { + status = isInstall + ? DependenceStatus.installFailed + : DependenceStatus.removeFailed; + } + await DependenceModel.update({ status }, { where: { id: depIds } }); + + // 如果删除依赖成功或者强制删除 + if ((isSucceed || force) && !isInstall) { + this.removeDb(depIds); + } + + resolve(null); + }); }); - }); + }) } } diff --git a/back/services/schedule.ts b/back/services/schedule.ts index 9cf347b5..9f921470 100644 --- a/back/services/schedule.ts +++ b/back/services/schedule.ts @@ -9,6 +9,7 @@ import { Task, } from 'toad-scheduler'; import dayjs from 'dayjs'; +import { runCronWithLimit } from '../shared/pLimit'; interface ScheduleTaskType { id: number; @@ -40,73 +41,74 @@ export default class ScheduleService { private maxBuffer = 200 * 1024 * 1024; - constructor(@Inject('logger') private logger: winston.Logger) {} + constructor(@Inject('logger') private logger: winston.Logger) { } async runTask( command: string, callbacks: TaskCallbacks = {}, completionTime: 'start' | 'end' = 'end', ) { - return new Promise(async (resolve, reject) => { - try { - const startTime = dayjs(); - await callbacks.onBefore?.(startTime); + return runCronWithLimit(() => { + return new Promise(async (resolve, reject) => { + try { + const startTime = dayjs(); + await callbacks.onBefore?.(startTime); - const cp = spawn(command, { shell: '/bin/bash' }); + const cp = spawn(command, { shell: '/bin/bash' }); - // TODO: - callbacks.onStart?.(cp, startTime); - completionTime === 'start' && resolve(cp.pid); + callbacks.onStart?.(cp, startTime); + completionTime === 'start' && resolve(cp.pid); - cp.stdout.on('data', async (data) => { - await callbacks.onLog?.(data.toString()); - }); + cp.stdout.on('data', async (data) => { + await callbacks.onLog?.(data.toString()); + }); - cp.stderr.on('data', async (data) => { - this.logger.info( - '[执行任务失败] %s,时间:%s, 错误信息:%j', + cp.stderr.on('data', async (data) => { + this.logger.info( + '[执行任务失败] %s,时间:%s, 错误信息:%j', + command, + new Date().toLocaleString(), + data.toString(), + ); + await callbacks.onError?.(data.toString()); + }); + + cp.on('error', async (err) => { + this.logger.error( + '[创建任务失败] %s,时间:%s, 错误信息:%j', + command, + new Date().toLocaleString(), + err, + ); + await callbacks.onError?.(JSON.stringify(err)); + }); + + cp.on('exit', async (code, signal) => { + this.logger.info( + `[任务退出] ${command} 进程id: ${cp.pid},退出码 ${code}`, + ); + }); + + cp.on('close', async (code) => { + const endTime = dayjs(); + await callbacks.onEnd?.( + cp, + endTime, + endTime.diff(startTime, 'seconds'), + ); + resolve(null); + }); + } catch (error) { + await this.logger.error( + '执行任务%s失败,时间:%s, 错误信息:%j', command, new Date().toLocaleString(), - data.toString(), + error, ); - await callbacks.onError?.(data.toString()); - }); - - cp.on('error', async (err) => { - this.logger.error( - '[创建任务失败] %s,时间:%s, 错误信息:%j', - command, - new Date().toLocaleString(), - err, - ); - await callbacks.onError?.(JSON.stringify(err)); - }); - - cp.on('exit', async (code, signal) => { - this.logger.info( - `[任务退出] ${command} 进程id: ${cp.pid},退出码 ${code}`, - ); - }); - - cp.on('close', async (code) => { - const endTime = dayjs(); - await callbacks.onEnd?.( - cp, - endTime, - endTime.diff(startTime, 'seconds'), - ); - resolve(null); - }); - } catch (error) { - await this.logger.error( - '执行任务%s失败,时间:%s, 错误信息:%j', - command, - new Date().toLocaleString(), - error, - ); - await callbacks.onError?.(JSON.stringify(error)); - } - }); + await callbacks.onError?.(JSON.stringify(error)); + } + }); + }) } async createCronTask( @@ -126,12 +128,12 @@ export default class ScheduleService { this.scheduleStacks.set( _id, nodeSchedule.scheduleJob(_id, schedule, async () => { - await this.runTask(command, callbacks); + this.runTask(command, callbacks); }), ); if (runImmediately) { - await this.runTask(command, callbacks); + this.runTask(command, callbacks); } } diff --git a/back/services/subscription.ts b/back/services/subscription.ts index 6cb3dfa7..925a7018 100644 --- a/back/services/subscription.ts +++ b/back/services/subscription.ts @@ -8,9 +8,6 @@ import { } from '../data/subscription'; import { ChildProcessWithoutNullStreams, - exec, - execSync, - spawn, } from 'child_process'; import fs from 'fs'; import { @@ -20,6 +17,7 @@ import { createFile, killTask, handleLogPath, + promiseExec, } from '../config/util'; import { promises, existsSync } from 'fs'; import { FindOptions, Op } from 'sequelize'; @@ -39,7 +37,7 @@ export default class SubscriptionService { private scheduleService: ScheduleService, private sockService: SockService, private sshKeyService: SshKeyService, - ) {} + ) { } public async list(searchText?: string): Promise { let query = {}; @@ -110,18 +108,6 @@ export default class SubscriptionService { this.sshKeyService.setSshConfig(docs); } - private async promiseExec(command: string): Promise { - return new Promise((resolve, reject) => { - exec( - command, - { maxBuffer: 200 * 1024 * 1024, encoding: 'utf8' }, - (err, stdout, stderr) => { - resolve(stdout || stderr || JSON.stringify(err)); - }, - ); - }); - } - private taskCallbacks(doc: Subscription): TaskCallbacks { return { onBefore: async (startTime) => { @@ -144,7 +130,7 @@ export default class SubscriptionService { try { if (doc.sub_before) { fs.appendFileSync(absolutePath, `\n## 执行before命令...\n\n`); - beforeStr = await this.promiseExec(doc.sub_before); + beforeStr = await promiseExec(doc.sub_before); } } catch (error: any) { beforeStr = @@ -171,7 +157,7 @@ export default class SubscriptionService { try { if (sub.sub_after) { fs.appendFileSync(absolutePath, `\n\n## 执行after命令...\n\n`); - afterStr = await this.promiseExec(sub.sub_after); + afterStr = await promiseExec(sub.sub_after); } } catch (error: any) { afterStr = @@ -290,10 +276,9 @@ export default class SubscriptionService { { status: SubscriptionStatus.queued }, { where: { id: ids } }, ); - concurrentRun( - ids.map((id) => async () => await this.runSingle(id)), - 10, - ); + ids.forEach(id => { + this.runSingle(id) + }) } public async stop(ids: number[]) { @@ -330,7 +315,7 @@ export default class SubscriptionService { const command = formatCommand(subscription); - await this.scheduleService.runTask( + this.scheduleService.runTask( command, this.taskCallbacks(subscription), ); diff --git a/back/services/user.ts b/back/services/user.ts index c9fcd77f..844747cb 100644 --- a/back/services/user.ts +++ b/back/services/user.ts @@ -15,7 +15,6 @@ import { NotificationInfo } from '../data/notify'; import NotificationService from './notify'; import { Request } from 'express'; import ScheduleService from './schedule'; -import { spawn } from 'child_process'; import SockService from './sock'; import dayjs from 'dayjs'; diff --git a/back/shared/pLimit.ts b/back/shared/pLimit.ts new file mode 100644 index 00000000..bade51ed --- /dev/null +++ b/back/shared/pLimit.ts @@ -0,0 +1,10 @@ +import pLimit from "p-limit"; +import os from 'os'; + +const cronLimit = pLimit(os.cpus.length); + +export function runCronWithLimit(fn: () => Promise): Promise { + return cronLimit(() => { + return fn(); + }); +} \ No newline at end of file diff --git a/back/shared/runCron.ts b/back/shared/runCron.ts new file mode 100644 index 00000000..0ee3587c --- /dev/null +++ b/back/shared/runCron.ts @@ -0,0 +1,37 @@ +import { spawn } from "child_process"; +import { runCronWithLimit } from "./pLimit"; +import Logger from '../loaders/logger'; + +export function runCron(cmd: string): Promise { + return runCronWithLimit(() => { + return new Promise(async (resolve: any) => { + Logger.silly('运行命令: ' + cmd); + + const cp = spawn(cmd, { shell: '/bin/bash' }); + + cp.stderr.on('data', (data) => { + Logger.info( + '[执行任务失败] %s,时间:%s, 错误信息:%j', + cmd, + new Date().toLocaleString(), + data.toString(), + ); + }); + cp.on('error', (err) => { + Logger.error( + '[创建任务失败] %s,时间:%s, 错误信息:%j', + cmd, + new Date().toLocaleString(), + err, + ); + }); + + cp.on('close', async (code) => { + Logger.info( + `[任务退出] ${cmd} 进程id: ${cp.pid} 退出,退出码 ${code}`, + ); + resolve(); + }); + }); + }) +} \ No newline at end of file diff --git a/package.json b/package.json index 8a75d09c..c5e39239 100644 --- a/package.json +++ b/package.json @@ -81,6 +81,7 @@ "nedb": "^1.8.0", "node-schedule": "^2.1.0", "nodemailer": "^6.7.2", + "p-limit": "^4.0.0", "protobufjs": "^7.2.3", "pstree.remy": "^1.1.8", "reflect-metadata": "^0.1.13", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5cd5d935..f1ccf895 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -79,6 +79,9 @@ dependencies: nodemailer: specifier: ^6.7.2 version: 6.9.1 + p-limit: + specifier: ^4.0.0 + version: 4.0.0 protobufjs: specifier: ^7.2.3 version: 7.2.3 @@ -10844,6 +10847,13 @@ packages: yocto-queue: 0.1.0 dev: true + /p-limit@4.0.0: + resolution: {integrity: sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dependencies: + yocto-queue: 1.0.0 + dev: false + /p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} @@ -15471,6 +15481,11 @@ packages: engines: {node: '>=10'} dev: true + /yocto-queue@1.0.0: + resolution: {integrity: sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g==} + engines: {node: '>=12.20'} + dev: false + /yorkie@2.0.0: resolution: {integrity: sha512-jcKpkthap6x63MB4TxwCyuIGkV0oYP/YRyuQU5UO0Yz/E/ZAu+653/uov+phdmO54n6BcvFRyyt0RRrWdN2mpw==} engines: {node: '>=4'}