修改订阅before/after执行时机

This commit is contained in:
whyour 2022-05-22 22:14:32 +08:00
parent 9dab6660d0
commit 6f13cc6e6f
2 changed files with 34 additions and 41 deletions

View File

@ -29,8 +29,6 @@ export interface TaskCallbacks {
) => Promise<void>; ) => Promise<void>;
onLog?: (message: string) => Promise<void>; onLog?: (message: string) => Promise<void>;
onError?: (message: string) => Promise<void>; onError?: (message: string) => Promise<void>;
onBefore?: () => Promise<void>;
onAfter?: () => Promise<void>;
} }
@Service() @Service()
@ -46,7 +44,6 @@ export default class ScheduleService {
async runTask(command: string, callbacks: TaskCallbacks = {}) { async runTask(command: string, callbacks: TaskCallbacks = {}) {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
try { try {
await callbacks.onBefore?.();
const startTime = dayjs(); const startTime = dayjs();
const cp = spawn(command, { shell: '/bin/bash' }); const cp = spawn(command, { shell: '/bin/bash' });
@ -91,7 +88,6 @@ export default class ScheduleService {
endTime, endTime,
endTime.diff(startTime, 'seconds'), endTime.diff(startTime, 'seconds'),
); );
await callbacks.onAfter?.();
resolve(null); resolve(null);
}); });
} catch (error) { } catch (error) {

View File

@ -175,17 +175,32 @@ export default class SubscriptionService {
private taskCallbacks(doc: Subscription): TaskCallbacks { private taskCallbacks(doc: Subscription): TaskCallbacks {
return { return {
onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => { onStart: async (cp: ChildProcessWithoutNullStreams, startTime) => {
// 执行sub_before
let beforeStr = '';
try {
if (doc.sub_before) {
beforeStr = execSync(doc.sub_before).toString();
}
} catch (error) {
beforeStr = JSON.stringify(error);
}
if (beforeStr) {
beforeStr += '\n';
}
const logTime = startTime.format('YYYY-MM-DD-HH-mm-ss'); const logTime = startTime.format('YYYY-MM-DD-HH-mm-ss');
const logPath = `${doc.alias}/${logTime}.log`; const logPath = `${doc.alias}/${logTime}.log`;
await this.handleLogPath( await this.handleLogPath(
logPath as string, logPath as string,
`## 开始执行... ${startTime.format('YYYY-MM-DD HH:mm:ss')}\n`, `${beforeStr}## 开始执行... ${startTime.format(
'YYYY-MM-DD HH:mm:ss',
)}\n`,
); );
await SubscriptionModel.update( await SubscriptionModel.update(
{ {
status: SubscriptionStatus.running, status: SubscriptionStatus.running,
pid: cp.pid,
log_path: logPath, log_path: logPath,
pid: cp.pid,
}, },
{ where: { id: doc.id } }, { where: { id: doc.id } },
); );
@ -194,7 +209,7 @@ export default class SubscriptionService {
const sub = await this.getDb({ id: doc.id }); const sub = await this.getDb({ id: doc.id });
await SubscriptionModel.update( await SubscriptionModel.update(
{ status: SubscriptionStatus.idle, pid: undefined }, { status: SubscriptionStatus.idle, pid: undefined },
{ where: { id: doc.id } }, { where: { id: sub.id } },
); );
const absolutePath = await this.handleLogPath(sub.log_path as string); const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync( fs.appendFileSync(
@ -203,6 +218,22 @@ export default class SubscriptionService {
'YYYY-MM-DD HH:mm:ss', 'YYYY-MM-DD HH:mm:ss',
)} ${diff} `, )} ${diff} `,
); );
// 执行 sub_after
let afterStr = '';
try {
if (sub.sub_after) {
afterStr = execSync(sub.sub_after).toString();
}
} catch (error) {
afterStr = JSON.stringify(error);
}
if (afterStr) {
afterStr = `\n\n${afterStr}`;
const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync(absolutePath, afterStr);
}
this.sockService.sendMessage({ this.sockService.sendMessage({
type: 'runSubscriptionEnd', type: 'runSubscriptionEnd',
message: '订阅执行完成', message: '订阅执行完成',
@ -219,40 +250,6 @@ export default class SubscriptionService {
const absolutePath = await this.handleLogPath(sub.log_path as string); const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync(absolutePath, `\n${message}`); fs.appendFileSync(absolutePath, `\n${message}`);
}, },
onBefore: async () => {
return new Promise((resolve, reject) => {
if (doc.sub_before) {
exec(doc.sub_before, async (err, stdout, stderr) => {
const absolutePath = await this.handleLogPath(
doc.log_path as string,
);
fs.appendFileSync(
absolutePath,
stdout || stderr || `${JSON.stringify(err || {})}`,
);
});
} else {
resolve();
}
});
},
onAfter: async () => {
return new Promise((resolve, reject) => {
if (doc.sub_after) {
exec(doc.sub_after, async (err, stdout, stderr) => {
const absolutePath = await this.handleLogPath(
doc.log_path as string,
);
fs.appendFileSync(
absolutePath,
stdout || stderr || `${JSON.stringify(err || {})}`,
);
});
} else {
resolve();
}
});
},
}; };
} }