重构任务并发执行逻辑

This commit is contained in:
whyour
2023-05-30 16:32:00 +08:00
parent 86e3d8736b
commit f8dfee8945
11 changed files with 285 additions and 236 deletions
+59 -57
View File
@@ -16,6 +16,7 @@ import { Op, where, col as colFn, FindOptions } from 'sequelize';
import path from 'path';
import { TASK_PREFIX, QL_PREFIX } from '../config/const';
import cronClient from '../schedule/client';
import { runCronWithLimit } from '../shared/pLimit';
@Service()
export default class CronService {
@@ -365,10 +366,9 @@ export default class CronService {
{ status: CrontabStatus.queued },
{ where: { id: ids } },
);
concurrentRun(
ids.map((id) => async () => await this.runSingle(id)),
10,
);
ids.forEach(id => {
this.runSingle(id)
})
}
public async stop(ids: number[]) {
@@ -390,65 +390,67 @@ export default class CronService {
}
private async runSingle(cronId: number): Promise<number> {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
if (cron.status !== CrontabStatus.queued) {
resolve();
return;
}
let { id, command, log_path } = cron;
const absolutePath = path.resolve(config.logPath, `${log_path}`);
const logFileExist = log_path && (await fileExist(absolutePath));
this.logger.silly('Running job');
this.logger.silly('ID: ' + id);
this.logger.silly('Original command: ' + command);
let cmdStr = command;
if (!cmdStr.startsWith(TASK_PREFIX) && !cmdStr.startsWith(QL_PREFIX)) {
cmdStr = `${TASK_PREFIX}${cmdStr}`;
}
if (
cmdStr.endsWith('.js') ||
cmdStr.endsWith('.py') ||
cmdStr.endsWith('.pyc') ||
cmdStr.endsWith('.sh') ||
cmdStr.endsWith('.ts')
) {
cmdStr = `${cmdStr} now`;
}
const cp = spawn(`ID=${id} ${cmdStr}`, { shell: '/bin/bash' });
await CrontabModel.update(
{ status: CrontabStatus.running, pid: cp.pid },
{ where: { id } },
);
cp.stderr.on('data', (data) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${data.toString()}`);
return runCronWithLimit(() => {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
if (cron.status !== CrontabStatus.queued) {
resolve();
return;
}
});
cp.on('error', (err) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`);
}
});
cp.on('exit', async (code, signal) => {
this.logger.info(
`任务 ${command} 进程id: ${cp.pid} 退出,退出码 ${code}`,
);
});
cp.on('close', async (code) => {
let { id, command, log_path } = cron;
const absolutePath = path.resolve(config.logPath, `${log_path}`);
const logFileExist = log_path && (await fileExist(absolutePath));
this.logger.silly('Running job');
this.logger.silly('ID: ' + id);
this.logger.silly('Original command: ' + command);
let cmdStr = command;
if (!cmdStr.startsWith(TASK_PREFIX) && !cmdStr.startsWith(QL_PREFIX)) {
cmdStr = `${TASK_PREFIX}${cmdStr}`;
}
if (
cmdStr.endsWith('.js') ||
cmdStr.endsWith('.py') ||
cmdStr.endsWith('.pyc') ||
cmdStr.endsWith('.sh') ||
cmdStr.endsWith('.ts')
) {
cmdStr = `${cmdStr} now`;
}
const cp = spawn(`ID=${id} ${cmdStr}`, { shell: '/bin/bash' });
await CrontabModel.update(
{ status: CrontabStatus.idle, pid: undefined },
{ status: CrontabStatus.running, pid: cp.pid },
{ where: { id } },
);
resolve();
cp.stderr.on('data', (data) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${data.toString()}`);
}
});
cp.on('error', (err) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`);
}
});
cp.on('exit', async (code, signal) => {
this.logger.info(
`任务 ${command} 进程id: ${cp.pid} 退出,退出码 ${code}`,
);
});
cp.on('close', async (code) => {
await CrontabModel.update(
{ status: CrontabStatus.idle, pid: undefined },
{ where: { id } },
);
resolve();
});
});
});
})
}
public async disabled(ids: number[]) {