修复系统启动执行订阅任务

This commit is contained in:
whyour 2022-05-28 19:56:12 +08:00
parent 8d2c62d6d1
commit f338537ea0
3 changed files with 33 additions and 14 deletions

View File

@ -26,6 +26,6 @@ export default async () => {
// 运行所有订阅
const subs = await subscriptionService.list();
for (const sub of subs) {
await subscriptionService.handleTask(sub);
await subscriptionService.handleTask(sub, true, true, true);
}
};

View File

@ -106,6 +106,7 @@ export default class ScheduleService {
async createCronTask(
{ id = 0, command, name, schedule = '' }: ScheduleTaskType,
callbacks?: TaskCallbacks,
runImmediately = false,
) {
const _id = this.formatId(id);
this.logger.info(
@ -122,6 +123,10 @@ export default class ScheduleService {
await this.runTask(command, callbacks);
}),
);
if (runImmediately) {
await this.runTask(command, callbacks);
}
}
async cancelCronTask({ id = 0, name }: ScheduleTaskType) {
@ -160,9 +165,17 @@ export default class ScheduleService {
},
);
const job = new LongIntervalJob({ ...schedule, runImmediately }, task, _id);
const job = new LongIntervalJob(
{ ...schedule, runImmediately: false },
task,
_id,
);
this.intervalSchedule.addIntervalJob(job);
if (runImmediately) {
await this.runTask(command, callbacks);
}
}
async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) {

View File

@ -124,7 +124,12 @@ export default class SubscriptionService {
return { url, host };
}
public handleTask(doc: Subscription, needCreate = true, needAddKey = true) {
public async handleTask(
doc: Subscription,
needCreate = true,
needAddKey = true,
runImmediately = false,
) {
const { url, host } = this.formatUrl(doc);
if (doc.type === 'private-repo' && doc.pull_type === 'ssh-key') {
if (needAddKey) {
@ -143,20 +148,21 @@ export default class SubscriptionService {
if (doc.schedule_type === 'crontab') {
this.scheduleService.cancelCronTask(doc as any);
needCreate &&
this.scheduleService.createCronTask(
(await this.scheduleService.createCronTask(
doc as any,
this.taskCallbacks(doc),
);
runImmediately,
));
} else {
this.scheduleService.cancelIntervalTask(doc as any);
const { type, value } = doc.interval_schedule as any;
needCreate &&
this.scheduleService.createIntervalTask(
(await this.scheduleService.createIntervalTask(
doc as any,
{ [type]: value } as SimpleIntervalSchedule,
true,
runImmediately,
this.taskCallbacks(doc),
);
));
}
}
@ -270,7 +276,7 @@ export default class SubscriptionService {
public async create(payload: Subscription): Promise<Subscription> {
const tab = new Subscription(payload);
const doc = await this.insert(tab);
this.handleTask(doc);
await this.handleTask(doc);
return doc;
}
@ -280,7 +286,7 @@ export default class SubscriptionService {
public async update(payload: Subscription): Promise<Subscription> {
const newDoc = await this.updateDb(payload);
this.handleTask(newDoc);
await this.handleTask(newDoc);
return newDoc;
}
@ -323,7 +329,7 @@ export default class SubscriptionService {
public async remove(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) {
this.handleTask(doc, false, false);
await this.handleTask(doc, false, false);
}
await SubscriptionModel.destroy({ where: { id: ids } });
}
@ -354,7 +360,7 @@ export default class SubscriptionService {
this.logger.silly(error);
}
}
this.handleTask(doc, false);
await this.handleTask(doc, false);
const command = this.formatCommand(doc);
const err = await this.killTask(command);
const absolutePath = await this.handleLogPath(doc.log_path as string);
@ -425,7 +431,7 @@ export default class SubscriptionService {
public async disabled(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) {
this.handleTask(doc, false);
await this.handleTask(doc, false);
}
await SubscriptionModel.update({ is_disabled: 1 }, { where: { id: ids } });
}
@ -433,7 +439,7 @@ export default class SubscriptionService {
public async enabled(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) {
this.handleTask(doc);
await this.handleTask(doc);
}
await SubscriptionModel.update({ is_disabled: 0 }, { where: { id: ids } });
}