diff --git a/back/config/index.ts b/back/config/index.ts index 17c55ac4..6d129ea4 100644 --- a/back/config/index.ts +++ b/back/config/index.ts @@ -20,11 +20,16 @@ const dbPath = path.join(rootPath, 'db/'); const manualLogPath = path.join(rootPath, 'manual_log/'); const cronDbFile = path.join(rootPath, 'db/crontab.db'); const cookieDbFile = path.join(rootPath, 'db/cookie.db'); +const configFound = dotenv.config({ path: confFile }); if (envFound.error) { throw new Error("⚠️ Couldn't find .env file ⚠️"); } +if (configFound.error) { + throw new Error("⚠️ Couldn't find config.sh file ⚠️"); +} + export default { port: parseInt(process.env.PORT as string, 10), cronPort: parseInt(process.env.CRON_PORT as string, 10), diff --git a/back/data/cron.ts b/back/data/cron.ts index 39d7f1b0..96aabe9f 100644 --- a/back/data/cron.ts +++ b/back/data/cron.ts @@ -9,6 +9,7 @@ export class Crontab { status?: CrontabStatus; isSystem?: 1 | 0; pid?: number; + isDisabled?: 1 | 0; constructor(options: Crontab) { this.name = options.name; @@ -21,6 +22,7 @@ export class Crontab { this.timestamp = new Date().toString(); this.isSystem = options.isSystem || 0; this.pid = options.pid; + this.isDisabled = options.isDisabled || 0; } } @@ -28,4 +30,5 @@ export enum CrontabStatus { 'running', 'idle', 'disabled', + 'queued', } diff --git a/back/loaders/initData.ts b/back/loaders/initData.ts index 52eabf69..99485edf 100644 --- a/back/loaders/initData.ts +++ b/back/loaders/initData.ts @@ -74,6 +74,25 @@ export default async () => { } }); + // patch 禁用状态字段改变 + cronDb + .find({ + status: CrontabStatus.disabled, + }) + .exec((err, docs) => { + if (docs.length > 0) { + const ids = docs.map((x) => x._id); + cronDb.update( + { _id: { $in: ids } }, + { $set: { status: CrontabStatus.idle, isDisabled: 1 } }, + { multi: true }, + (err) => { + cronService.autosave_crontab(); + }, + ); + } + }); + // 初始化保存一次ck和定时任务数据 await cronService.autosave_crontab(); await cookieService.set_cookies(); diff --git a/back/schedule.ts b/back/schedule.ts index 960b2488..7a8047ae 100644 --- a/back/schedule.ts +++ b/back/schedule.ts @@ -29,7 +29,8 @@ const run = async () => { if ( _schedule && _schedule.length > 5 && - task.status !== CrontabStatus.disabled + task.status !== CrontabStatus.disabled && + !task.isDisabled ) { schedule.scheduleJob(task.schedule, function () { let command = task.command as string; diff --git a/back/services/cron.ts b/back/services/cron.ts index fcd2e582..3b6b0ef2 100644 --- a/back/services/cron.ts +++ b/back/services/cron.ts @@ -7,11 +7,16 @@ import { exec, execSync, spawn } from 'child_process'; import fs from 'fs'; import cron_parser from 'cron-parser'; import { getFileContentByName } from '../config/util'; +import PQueue from 'p-queue'; @Service() export default class CronService { private cronDb = new DataStore({ filename: config.cronDbFile }); + private queue = new PQueue({ + concurrency: parseInt(process.env.MaxConcurrentNum) || 5, + }); + constructor(@Inject('logger') private logger: winston.Logger) { this.cronDb.loadDatabase((err) => { if (err) throw err; @@ -124,97 +129,102 @@ export default class CronService { } public async run(ids: string[]) { - this.cronDb.find({ _id: { $in: ids } }).exec((err, docs: Crontab[]) => { - for (let i = 0; i < docs.length; i++) { - const doc = docs[i]; - this.runSingle(doc); - } - }); + this.cronDb.update( + { _id: { $in: ids } }, + { $set: { status: CrontabStatus.queued } }, + { multi: true }, + ); + for (let i = 0; i < ids.length; i++) { + const id = ids[i]; + this.queue.add(() => this.runSingle(id)); + } } public async stop(ids: string[]) { this.cronDb.find({ _id: { $in: ids } }).exec((err, docs: Crontab[]) => { - for (let i = 0; i < docs.length; i++) { - const doc = docs[i]; - if (doc.pid) { - exec(`kill -9 ${doc.pid}`, (err, stdout, stderr) => { - this.cronDb.update( - { _id: doc._id }, - { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, - ); - }); - } - } + this.cronDb.update( + { _id: { $in: ids } }, + { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, + ); + const pids = docs + .map((x) => x.pid) + .filter((x) => !!x) + .join('\n'); + console.log(pids); + exec(`echo - e "${pids}" | xargs kill - 9`); }); } - private async runSingle(cron: Crontab) { - let { _id, command } = cron; + private async runSingle(id: string): Promise { + return new Promise(async (resolve) => { + const cron = await this.get(id); + if (cron.status !== CrontabStatus.queued) { + resolve(0); + return; + } - this.logger.silly('Running job'); - this.logger.silly('ID: ' + _id); - this.logger.silly('Original command: ' + command); + let { _id, command } = cron; - let logFile = `${config.manualLogPath}${_id}.log`; - fs.writeFileSync(logFile, `开始执行...\n\n${new Date().toString()}\n`); + this.logger.silly('Running job'); + this.logger.silly('ID: ' + _id); + this.logger.silly('Original command: ' + command); - let cmdStr = command; - if (!cmdStr.includes('task ') && !cmdStr.includes('ql ')) { - cmdStr = `task ${cmdStr}`; - } - if (cmdStr.endsWith('.js')) { - cmdStr = `${cmdStr} now`; - } - const cmd = spawn(cmdStr, { shell: true }); + let logFile = `${config.manualLogPath}${_id}.log`; + fs.writeFileSync(logFile, `开始执行...\n${new Date().toString()}\n`); - this.cronDb.update( - { _id }, - { $set: { status: CrontabStatus.running, pid: cmd.pid } }, - ); + let cmdStr = command; + if (!cmdStr.includes('task ') && !cmdStr.includes('ql ')) { + cmdStr = `task ${cmdStr}`; + } + if (cmdStr.endsWith('.js')) { + cmdStr = `${cmdStr} now`; + } + const cmd = spawn(cmdStr, { shell: true }); - cmd.stdout.on('data', (data) => { - this.logger.info(`stdout: ${data}`); - fs.appendFileSync(logFile, data); - }); - - cmd.stderr.on('data', (data) => { - this.logger.info(`stderr: ${data}`); - fs.appendFileSync(logFile, data); - }); - - cmd.on('close', (code) => { - this.logger.info(`child process exited with code ${code}`); this.cronDb.update( { _id }, - { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, + { $set: { status: CrontabStatus.running, pid: cmd.pid } }, ); - }); - cmd.on('error', (err) => { - this.logger.info(err); - fs.appendFileSync(logFile, err.stack); - }); + cmd.stdout.on('data', (data) => { + this.logger.info(`stdout: ${data}`); + fs.appendFileSync(logFile, data); + }); - cmd.on('exit', (code: number, signal: any) => { - this.logger.info(`cmd exit ${code}`); - this.cronDb.update( - { _id }, - { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, - ); - fs.appendFileSync(logFile, `\n\n执行结束...`); - }); + cmd.stderr.on('data', (data) => { + this.logger.info(`stderr: ${data}`); + fs.appendFileSync(logFile, data); + }); - cmd.on('disconnect', () => { - this.logger.info(`cmd disconnect`); - this.cronDb.update({ _id }, { $set: { status: CrontabStatus.idle } }); - fs.appendFileSync(logFile, `\n\n连接断开...`); + cmd.on('close', (code) => { + this.logger.info(`child process exited with code ${code}`); + this.cronDb.update( + { _id }, + { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, + ); + }); + + cmd.on('error', (err) => { + this.logger.info(err); + fs.appendFileSync(logFile, err.stack); + }); + + cmd.on('exit', (code: number, signal: any) => { + this.logger.info(`cmd exit ${code}`); + this.cronDb.update( + { _id }, + { $set: { status: CrontabStatus.idle }, $unset: { pid: true } }, + ); + fs.appendFileSync(logFile, `\n执行结束...`); + resolve(code); + }); }); } public async disabled(ids: string[]) { this.cronDb.update( { _id: { $in: ids } }, - { $set: { status: CrontabStatus.disabled } }, + { $set: { isDisabled: 1 } }, { multi: true }, ); await this.set_crontab(true); @@ -223,7 +233,7 @@ export default class CronService { public async enabled(ids: string[]) { this.cronDb.update( { _id: { $in: ids } }, - { $set: { status: CrontabStatus.idle } }, + { $set: { isDisabled: 0 } }, { multi: true }, ); await this.set_crontab(true); @@ -244,7 +254,7 @@ export default class CronService { var crontab_string = ''; tabs.forEach((tab) => { const _schedule = tab.schedule && tab.schedule.split(' '); - if (tab.status === CrontabStatus.disabled || _schedule.length !== 5) { + if (tab.isDisabled === 1 || _schedule.length !== 5) { crontab_string += '# '; crontab_string += tab.schedule; crontab_string += ' '; diff --git a/package.json b/package.json index a3ee805f..f4c7760d 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "nedb": "^1.8.0", "node-fetch": "^2.6.1", "node-schedule": "^2.0.0", + "p-queue": "6.6.2", "reflect-metadata": "^0.1.13", "typedi": "^0.8.0", "winston": "^3.3.3" diff --git a/sample/config.sample.sh b/sample/config.sample.sh index 3e94687e..faa23fb9 100644 --- a/sample/config.sample.sh +++ b/sample/config.sample.sh @@ -13,6 +13,9 @@ AutoAddCron="true" ## 设置定时任务执行的超时时间,默认1h,后缀"s"代表秒(默认值), "m"代表分, "h"代表小时, "d"代表天 CommandTimeoutTime="1h" +## 设置批量执行任务时的并发数,默认同时执行5个任务 +MaxConcurrentNum="5" + ## 在运行 task 命令时,随机延迟启动任务的最大延迟时间 ## 如果任务不是必须准点运行的任务,那么给它增加一个随机延迟,由你定义最大延迟时间,单位为秒,如 RandomDelay="300" ,表示任务将在 1-300 秒内随机延迟一个秒数,然后再运行 ## 在crontab.list中,在每小时第0-2分、第30-31分、第59分这几个时间内启动的任务,均算作必须准点运行的任务,在启动这些任务时,即使你定义了RandomDelay,也将准点运行,不启用随机延迟 diff --git a/src/pages/crontab/index.tsx b/src/pages/crontab/index.tsx index 555b0095..b72a62ea 100644 --- a/src/pages/crontab/index.tsx +++ b/src/pages/crontab/index.tsx @@ -24,6 +24,7 @@ import { StopOutlined, DeleteOutlined, PauseCircleOutlined, + SendOutlined, } from '@ant-design/icons'; import config from '@/utils/config'; import { PageContainer } from '@ant-design/pro-layout'; @@ -38,6 +39,7 @@ enum CrontabStatus { 'running', 'idle', 'disabled', + 'queued', } enum OperationName { @@ -99,17 +101,29 @@ const Crontab = () => { align: 'center' as const, render: (text: string, record: any) => ( <> - {record.status === CrontabStatus.idle && ( - } color="default"> - 空闲中 - + {!record.isDisabled && ( + <> + {record.status === CrontabStatus.idle && ( + } color="default"> + 空闲中 + + )} + {record.status === CrontabStatus.running && ( + } + color="processing" + > + 运行中 + + )} + {record.status === CrontabStatus.queued && ( + } color="default"> + 队列中 + + )} + )} - {record.status === CrontabStatus.running && ( - } color="processing"> - 运行中 - - )} - {record.status === CrontabStatus.disabled && ( + {record.isDisabled === 1 && ( } color="error"> 已禁用 @@ -123,7 +137,7 @@ const Crontab = () => { align: 'center' as const, render: (text: string, record: any, index: number) => ( - {record.status !== CrontabStatus.running && ( + {record.status === CrontabStatus.idle && ( { @@ -134,7 +148,7 @@ const Crontab = () => { )} - {record.status === CrontabStatus.running && ( + {record.status !== CrontabStatus.idle && ( { @@ -303,12 +317,10 @@ const Crontab = () => { const enabledOrDisabledCron = (record: any, index: number) => { Modal.confirm({ - title: `确认${ - record.status === CrontabStatus.disabled ? '启用' : '禁用' - }`, + title: `确认${record.isDisabled === 1 ? '启用' : '禁用'}`, content: ( <> - 确认{record.status === CrontabStatus.disabled ? '启用' : '禁用'} + 确认{record.isDisabled === 1 ? '启用' : '禁用'} 定时任务{' '} {record.name} @@ -320,7 +332,7 @@ const Crontab = () => { request .put( `${config.apiPrefix}crons/${ - record.status === CrontabStatus.disabled ? 'enable' : 'disable' + record.isDisabled === 1 ? 'enable' : 'disable' }`, { data: [record._id], @@ -328,14 +340,11 @@ const Crontab = () => { ) .then((data: any) => { if (data.code === 200) { - const newStatus = - record.status === CrontabStatus.disabled - ? CrontabStatus.idle - : CrontabStatus.disabled; + const newStatus = record.isDisabled === 1 ? 0 : 1; const result = [...value]; result.splice(index, 1, { ...record, - status: newStatus, + isDisabled: newStatus, }); setValue(result); } else { @@ -366,14 +375,14 @@ const Crontab = () => { ) : ( ) } > - {record.status === CrontabStatus.disabled ? '启用' : '禁用'} + {record.isDisabled === 1 ? '启用' : '禁用'} {record.isSystem !== 1 && ( }> diff --git a/src/pages/crontab/logModal.tsx b/src/pages/crontab/logModal.tsx index a53a029c..6d990176 100644 --- a/src/pages/crontab/logModal.tsx +++ b/src/pages/crontab/logModal.tsx @@ -11,6 +11,7 @@ enum CrontabStatus { 'running', 'idle', 'disabled', + 'queued', } const CronLogModal = ({