批量运行任务添加并行数限制

This commit is contained in:
whyour
2021-05-16 17:24:53 +08:00
parent fefdcb88fd
commit c183201e6f
9 changed files with 146 additions and 94 deletions
+79 -69
View File
@@ -7,11 +7,16 @@ import { exec, execSync, spawn } from 'child_process';
import fs from 'fs';
import cron_parser from 'cron-parser';
import { getFileContentByName } from '../config/util';
import PQueue from 'p-queue';
@Service()
export default class CronService {
private cronDb = new DataStore({ filename: config.cronDbFile });
private queue = new PQueue({
concurrency: parseInt(process.env.MaxConcurrentNum) || 5,
});
constructor(@Inject('logger') private logger: winston.Logger) {
this.cronDb.loadDatabase((err) => {
if (err) throw err;
@@ -124,97 +129,102 @@ export default class CronService {
}
public async run(ids: string[]) {
this.cronDb.find({ _id: { $in: ids } }).exec((err, docs: Crontab[]) => {
for (let i = 0; i < docs.length; i++) {
const doc = docs[i];
this.runSingle(doc);
}
});
this.cronDb.update(
{ _id: { $in: ids } },
{ $set: { status: CrontabStatus.queued } },
{ multi: true },
);
for (let i = 0; i < ids.length; i++) {
const id = ids[i];
this.queue.add(() => this.runSingle(id));
}
}
public async stop(ids: string[]) {
this.cronDb.find({ _id: { $in: ids } }).exec((err, docs: Crontab[]) => {
for (let i = 0; i < docs.length; i++) {
const doc = docs[i];
if (doc.pid) {
exec(`kill -9 ${doc.pid}`, (err, stdout, stderr) => {
this.cronDb.update(
{ _id: doc._id },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
);
});
}
}
this.cronDb.update(
{ _id: { $in: ids } },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
);
const pids = docs
.map((x) => x.pid)
.filter((x) => !!x)
.join('\n');
console.log(pids);
exec(`echo - e "${pids}" | xargs kill - 9`);
});
}
private async runSingle(cron: Crontab) {
let { _id, command } = cron;
private async runSingle(id: string): Promise<number> {
return new Promise(async (resolve) => {
const cron = await this.get(id);
if (cron.status !== CrontabStatus.queued) {
resolve(0);
return;
}
this.logger.silly('Running job');
this.logger.silly('ID: ' + _id);
this.logger.silly('Original command: ' + command);
let { _id, command } = cron;
let logFile = `${config.manualLogPath}${_id}.log`;
fs.writeFileSync(logFile, `开始执行...\n\n${new Date().toString()}\n`);
this.logger.silly('Running job');
this.logger.silly('ID: ' + _id);
this.logger.silly('Original command: ' + command);
let cmdStr = command;
if (!cmdStr.includes('task ') && !cmdStr.includes('ql ')) {
cmdStr = `task ${cmdStr}`;
}
if (cmdStr.endsWith('.js')) {
cmdStr = `${cmdStr} now`;
}
const cmd = spawn(cmdStr, { shell: true });
let logFile = `${config.manualLogPath}${_id}.log`;
fs.writeFileSync(logFile, `开始执行...\n${new Date().toString()}\n`);
this.cronDb.update(
{ _id },
{ $set: { status: CrontabStatus.running, pid: cmd.pid } },
);
let cmdStr = command;
if (!cmdStr.includes('task ') && !cmdStr.includes('ql ')) {
cmdStr = `task ${cmdStr}`;
}
if (cmdStr.endsWith('.js')) {
cmdStr = `${cmdStr} now`;
}
const cmd = spawn(cmdStr, { shell: true });
cmd.stdout.on('data', (data) => {
this.logger.info(`stdout: ${data}`);
fs.appendFileSync(logFile, data);
});
cmd.stderr.on('data', (data) => {
this.logger.info(`stderr: ${data}`);
fs.appendFileSync(logFile, data);
});
cmd.on('close', (code) => {
this.logger.info(`child process exited with code ${code}`);
this.cronDb.update(
{ _id },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
{ $set: { status: CrontabStatus.running, pid: cmd.pid } },
);
});
cmd.on('error', (err) => {
this.logger.info(err);
fs.appendFileSync(logFile, err.stack);
});
cmd.stdout.on('data', (data) => {
this.logger.info(`stdout: ${data}`);
fs.appendFileSync(logFile, data);
});
cmd.on('exit', (code: number, signal: any) => {
this.logger.info(`cmd exit ${code}`);
this.cronDb.update(
{ _id },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
);
fs.appendFileSync(logFile, `\n\n执行结束...`);
});
cmd.stderr.on('data', (data) => {
this.logger.info(`stderr: ${data}`);
fs.appendFileSync(logFile, data);
});
cmd.on('disconnect', () => {
this.logger.info(`cmd disconnect`);
this.cronDb.update({ _id }, { $set: { status: CrontabStatus.idle } });
fs.appendFileSync(logFile, `\n\n连接断开...`);
cmd.on('close', (code) => {
this.logger.info(`child process exited with code ${code}`);
this.cronDb.update(
{ _id },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
);
});
cmd.on('error', (err) => {
this.logger.info(err);
fs.appendFileSync(logFile, err.stack);
});
cmd.on('exit', (code: number, signal: any) => {
this.logger.info(`cmd exit ${code}`);
this.cronDb.update(
{ _id },
{ $set: { status: CrontabStatus.idle }, $unset: { pid: true } },
);
fs.appendFileSync(logFile, `\n执行结束...`);
resolve(code);
});
});
}
public async disabled(ids: string[]) {
this.cronDb.update(
{ _id: { $in: ids } },
{ $set: { status: CrontabStatus.disabled } },
{ $set: { isDisabled: 1 } },
{ multi: true },
);
await this.set_crontab(true);
@@ -223,7 +233,7 @@ export default class CronService {
public async enabled(ids: string[]) {
this.cronDb.update(
{ _id: { $in: ids } },
{ $set: { status: CrontabStatus.idle } },
{ $set: { isDisabled: 0 } },
{ multi: true },
);
await this.set_crontab(true);
@@ -244,7 +254,7 @@ export default class CronService {
var crontab_string = '';
tabs.forEach((tab) => {
const _schedule = tab.schedule && tab.schedule.split(' ');
if (tab.status === CrontabStatus.disabled || _schedule.length !== 5) {
if (tab.isDisabled === 1 || _schedule.length !== 5) {
crontab_string += '# ';
crontab_string += tab.schedule;
crontab_string += ' ';