重构任务执行逻辑

This commit is contained in:
whyour
2022-05-15 20:40:29 +08:00
parent bb47d67d0b
commit 9dcc547ac7
7 changed files with 204 additions and 190 deletions
+8
View File
@@ -232,6 +232,14 @@ export async function fileExist(file: any) {
});
}
export async function createFile(file: string, data: string = '') {
return new Promise((resolve) => {
fs.mkdirSync(path.dirname(file), { recursive: true });
fs.writeFileSync(file, data);
resolve(true);
});
}
export async function concurrentRun(
fnList: Array<() => Promise<any>> = [],
max = 5,
+75 -68
View File
@@ -1,13 +1,14 @@
import { Service, Inject } from 'typedi';
import winston from 'winston';
import nodeSchedule from 'node-schedule';
import { exec } from 'child_process';
import { ChildProcessWithoutNullStreams, exec, spawn } from 'child_process';
import {
ToadScheduler,
LongIntervalJob,
AsyncTask,
SimpleIntervalSchedule,
} from 'toad-scheduler';
import dayjs from 'dayjs';
interface ScheduleTaskType {
id: number;
@@ -16,6 +17,19 @@ interface ScheduleTaskType {
schedule?: string;
}
export interface TaskCallbacks {
onStart?: (
cp: ChildProcessWithoutNullStreams,
startTime: dayjs.Dayjs,
) => void;
onEnd?: (
cp: ChildProcessWithoutNullStreams,
endTime: dayjs.Dayjs,
diff: number,
) => void;
onError?: (message: string) => void;
}
@Service()
export default class ScheduleService {
private scheduleStacks = new Map<string, nodeSchedule.Job>();
@@ -26,12 +40,63 @@ export default class ScheduleService {
constructor(@Inject('logger') private logger: winston.Logger) {}
async createCronTask({
id = 0,
command,
name,
schedule = '',
}: ScheduleTaskType) {
async runTask(command: string, callbacks: TaskCallbacks = {}) {
return new Promise(async (resolve, reject) => {
try {
const startTime = dayjs();
const cp = spawn(command, { shell: '/bin/bash' });
callbacks.onStart?.(cp, startTime);
cp.stderr.on('data', (data) => {
this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
data.toString(),
);
callbacks.onError?.(data.toString());
});
cp.on('error', (err) => {
this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
err,
);
callbacks.onError?.(JSON.stringify(err));
});
cp.on('exit', async (code, signal) => {
this.logger.info(
`${command} pid: ${cp.pid} exit ${code} signal ${signal}`,
);
});
cp.on('close', async (code) => {
const endTime = dayjs();
this.logger.info(`${command} pid: ${cp.pid} closed ${code}`);
callbacks.onEnd?.(cp, endTime, endTime.diff(startTime, 'seconds'));
resolve(null);
});
} catch (error) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
error,
);
callbacks.onError?.(JSON.stringify(error));
resolve(null);
}
});
}
async createCronTask(
{ id = 0, command, name, schedule = '' }: ScheduleTaskType,
callbacks?: TaskCallbacks,
) {
const _id = this.formatId(id);
this.logger.info(
'[创建cron任务],任务ID: %scron: %s,任务名: %s,执行命令: %s',
@@ -44,39 +109,7 @@ export default class ScheduleService {
this.scheduleStacks.set(
_id,
nodeSchedule.scheduleJob(_id, schedule, async () => {
try {
exec(
command,
{ maxBuffer: this.maxBuffer },
async (error, stdout, stderr) => {
if (error) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
error,
);
}
if (stderr) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
stderr,
);
}
},
);
} catch (error) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
error,
);
} finally {
}
await this.runTask(command, callbacks);
}),
);
}
@@ -91,6 +124,7 @@ export default class ScheduleService {
{ id = 0, command, name = '' }: ScheduleTaskType,
schedule: SimpleIntervalSchedule,
runImmediately = true,
callbacks?: TaskCallbacks,
) {
const _id = this.formatId(id);
this.logger.info(
@@ -103,34 +137,7 @@ export default class ScheduleService {
name,
async () => {
return new Promise(async (resolve, reject) => {
try {
exec(
command,
{ maxBuffer: this.maxBuffer },
async (error, stdout, stderr) => {
if (error) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
error,
);
}
if (stderr) {
await this.logger.error(
'执行任务%s失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
stderr,
);
}
resolve();
},
);
} catch (error) {
reject(error);
}
await this.runTask(command, callbacks);
});
},
(err) => {
+92 -69
View File
@@ -6,15 +6,25 @@ import {
SubscriptionModel,
SubscriptionStatus,
} from '../data/subscription';
import { exec, execSync, spawn } from 'child_process';
import {
ChildProcessWithoutNullStreams,
exec,
execSync,
spawn,
} from 'child_process';
import fs from 'fs';
import cron_parser from 'cron-parser';
import { getFileContentByName, concurrentRun, fileExist } from '../config/util';
import {
getFileContentByName,
concurrentRun,
fileExist,
createFile,
} from '../config/util';
import { promises, existsSync } from 'fs';
import { promisify } from 'util';
import { Op } from 'sequelize';
import path from 'path';
import ScheduleService from './schedule';
import ScheduleService, { TaskCallbacks } from './schedule';
import { SimpleIntervalSchedule } from 'toad-scheduler';
@Service()
@@ -97,7 +107,11 @@ export default class SubscriptionService {
doc.command = this.formatCommand(doc);
if (doc.schedule_type === 'crontab') {
this.scheduleService.cancelCronTask(doc as any);
needCreate && this.scheduleService.createCronTask(doc as any);
needCreate &&
this.scheduleService.createCronTask(
doc as any,
this.taskCallbacks(doc),
);
} else {
this.scheduleService.cancelIntervalTask(doc as any);
const { type, value } = doc.interval_schedule as any;
@@ -105,10 +119,64 @@ export default class SubscriptionService {
this.scheduleService.createIntervalTask(
doc as any,
{ [type]: value } as SimpleIntervalSchedule,
true,
this.taskCallbacks(doc),
);
}
}
private async handleLogPath(
logPath: string,
data: string = '',
): Promise<string> {
const absolutePath = path.resolve(config.logPath, logPath);
const logFileExist = await fileExist(absolutePath);
if (!logFileExist) {
await createFile(absolutePath, data);
}
return absolutePath;
}
private taskCallbacks(doc: Subscription): TaskCallbacks {
return {
onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => {
const logTime = startTime.format('YYYY-MM-DD-HH-mm-ss');
const logPath = `${doc.alias}/${logTime}.log`;
await this.handleLogPath(
logPath as string,
`## 开始执行... ${startTime.format('YYYY-MM-DD HH:mm:ss')}\n`,
);
await SubscriptionModel.update(
{
status: SubscriptionStatus.running,
pid: cp.pid,
log_path: logPath,
},
{ where: { id: doc.id } },
);
},
onEnd: async (cp, endTime, diff) => {
const sub = await this.getDb({ id: doc.id });
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id: doc.id } },
);
const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync(
absolutePath,
`\n## 执行结束... ${endTime.format(
'YYYY-MM-DD HH:mm:ss',
)} 耗时 ${diff}`,
);
},
onError: async (message: string) => {
const sub = await this.getDb({ id: doc.id });
const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync(absolutePath, `\n${message}`);
},
};
}
public async create(payload: Subscription): Promise<Subscription> {
const tab = new Subscription(payload);
const doc = await this.insert(tab);
@@ -195,17 +263,14 @@ export default class SubscriptionService {
this.handleTask(doc, false);
const command = this.formatCommand(doc);
const err = await this.killTask(command);
const absolutePath = path.resolve(config.logPath, `${doc.log_path}`);
const logFileExist = doc.log_path && (await fileExist(absolutePath));
if (logFileExist) {
const str = err ? `\n${err}` : '';
fs.appendFileSync(
`${absolutePath}`,
`${str}\n## 执行结束... ${new Date()
.toLocaleString('zh', { hour12: false })
.replace(' 24:', ' 00:')} `,
);
}
const absolutePath = await this.handleLogPath(doc.log_path as string);
const str = err ? `\n${err}` : '';
fs.appendFileSync(
`${absolutePath}`,
`${str}\n## 执行结束... ${new Date()
.toLocaleString('zh', { hour12: false })
.replace(' 24:', ' 00:')} `,
);
}
await SubscriptionModel.update(
@@ -249,57 +314,18 @@ export default class SubscriptionService {
}
}
private async runSingle(cronId: number): Promise<number> {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
if (cron.status !== SubscriptionStatus.queued) {
resolve();
return;
}
private async runSingle(subscriptionId: number) {
const subscription = await this.getDb({ id: subscriptionId });
if (subscription.status !== SubscriptionStatus.queued) {
return;
}
let { id, log_path, name } = cron;
const command = this.formatCommand(cron);
const absolutePath = path.resolve(config.logPath, `${log_path}`);
const logFileExist = log_path && (await fileExist(absolutePath));
const command = this.formatCommand(subscription);
this.logger.silly('Running job' + name);
this.logger.silly('ID: ' + id);
this.logger.silly('Original command: ' + command);
const cp = spawn(command, { shell: '/bin/bash' });
await SubscriptionModel.update(
{ status: SubscriptionStatus.running, pid: cp.pid },
{ where: { id } },
);
cp.stderr.on('data', (data) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${data}`);
}
});
cp.on('error', (err) => {
if (logFileExist) {
fs.appendFileSync(`${absolutePath}`, `${JSON.stringify(err)}`);
}
});
cp.on('exit', async (code, signal) => {
this.logger.info(`${''} pid: ${cp.pid} exit ${code} signal ${signal}`);
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id } },
);
resolve();
});
cp.on('close', async (code) => {
this.logger.info(`${''} pid: ${cp.pid} closed ${code}`);
await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined },
{ where: { id } },
);
resolve();
});
});
await this.scheduleService.runTask(
command,
this.taskCallbacks(subscription),
);
}
public async disabled(ids: number[]) {
@@ -324,11 +350,8 @@ export default class SubscriptionService {
return '';
}
const absolutePath = path.resolve(config.logPath, `${doc.log_path}`);
const logFileExist = doc.log_path && (await fileExist(absolutePath));
if (logFileExist) {
return getFileContentByName(`${absolutePath}`);
}
const absolutePath = await this.handleLogPath(doc.log_path as string);
return getFileContentByName(absolutePath);
}
public async logs(id: number) {