修复系统启动执行订阅任务

This commit is contained in:
whyour 2022-05-28 19:56:12 +08:00
parent a09a21fa5d
commit b10157b09a
3 changed files with 33 additions and 14 deletions

View File

@ -26,6 +26,6 @@ export default async () => {
// 运行所有订阅 // 运行所有订阅
const subs = await subscriptionService.list(); const subs = await subscriptionService.list();
for (const sub of subs) { for (const sub of subs) {
await subscriptionService.handleTask(sub); await subscriptionService.handleTask(sub, true, true, true);
} }
}; };

View File

@ -106,6 +106,7 @@ export default class ScheduleService {
async createCronTask( async createCronTask(
{ id = 0, command, name, schedule = '' }: ScheduleTaskType, { id = 0, command, name, schedule = '' }: ScheduleTaskType,
callbacks?: TaskCallbacks, callbacks?: TaskCallbacks,
runImmediately = false,
) { ) {
const _id = this.formatId(id); const _id = this.formatId(id);
this.logger.info( this.logger.info(
@ -122,6 +123,10 @@ export default class ScheduleService {
await this.runTask(command, callbacks); await this.runTask(command, callbacks);
}), }),
); );
if (runImmediately) {
await this.runTask(command, callbacks);
}
} }
async cancelCronTask({ id = 0, name }: ScheduleTaskType) { async cancelCronTask({ id = 0, name }: ScheduleTaskType) {
@ -160,9 +165,17 @@ export default class ScheduleService {
}, },
); );
const job = new LongIntervalJob({ ...schedule, runImmediately }, task, _id); const job = new LongIntervalJob(
{ ...schedule, runImmediately: false },
task,
_id,
);
this.intervalSchedule.addIntervalJob(job); this.intervalSchedule.addIntervalJob(job);
if (runImmediately) {
await this.runTask(command, callbacks);
}
} }
async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) { async cancelIntervalTask({ id = 0, name }: ScheduleTaskType) {

View File

@ -124,7 +124,12 @@ export default class SubscriptionService {
return { url, host }; return { url, host };
} }
public handleTask(doc: Subscription, needCreate = true, needAddKey = true) { public async handleTask(
doc: Subscription,
needCreate = true,
needAddKey = true,
runImmediately = false,
) {
const { url, host } = this.formatUrl(doc); const { url, host } = this.formatUrl(doc);
if (doc.type === 'private-repo' && doc.pull_type === 'ssh-key') { if (doc.type === 'private-repo' && doc.pull_type === 'ssh-key') {
if (needAddKey) { if (needAddKey) {
@ -143,20 +148,21 @@ export default class SubscriptionService {
if (doc.schedule_type === 'crontab') { if (doc.schedule_type === 'crontab') {
this.scheduleService.cancelCronTask(doc as any); this.scheduleService.cancelCronTask(doc as any);
needCreate && needCreate &&
this.scheduleService.createCronTask( (await this.scheduleService.createCronTask(
doc as any, doc as any,
this.taskCallbacks(doc), this.taskCallbacks(doc),
); runImmediately,
));
} else { } else {
this.scheduleService.cancelIntervalTask(doc as any); this.scheduleService.cancelIntervalTask(doc as any);
const { type, value } = doc.interval_schedule as any; const { type, value } = doc.interval_schedule as any;
needCreate && needCreate &&
this.scheduleService.createIntervalTask( (await this.scheduleService.createIntervalTask(
doc as any, doc as any,
{ [type]: value } as SimpleIntervalSchedule, { [type]: value } as SimpleIntervalSchedule,
true, runImmediately,
this.taskCallbacks(doc), this.taskCallbacks(doc),
); ));
} }
} }
@ -270,7 +276,7 @@ export default class SubscriptionService {
public async create(payload: Subscription): Promise<Subscription> { public async create(payload: Subscription): Promise<Subscription> {
const tab = new Subscription(payload); const tab = new Subscription(payload);
const doc = await this.insert(tab); const doc = await this.insert(tab);
this.handleTask(doc); await this.handleTask(doc);
return doc; return doc;
} }
@ -280,7 +286,7 @@ export default class SubscriptionService {
public async update(payload: Subscription): Promise<Subscription> { public async update(payload: Subscription): Promise<Subscription> {
const newDoc = await this.updateDb(payload); const newDoc = await this.updateDb(payload);
this.handleTask(newDoc); await this.handleTask(newDoc);
return newDoc; return newDoc;
} }
@ -323,7 +329,7 @@ export default class SubscriptionService {
public async remove(ids: number[]) { public async remove(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } }); const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) { for (const doc of docs) {
this.handleTask(doc, false, false); await this.handleTask(doc, false, false);
} }
await SubscriptionModel.destroy({ where: { id: ids } }); await SubscriptionModel.destroy({ where: { id: ids } });
} }
@ -354,7 +360,7 @@ export default class SubscriptionService {
this.logger.silly(error); this.logger.silly(error);
} }
} }
this.handleTask(doc, false); await this.handleTask(doc, false);
const command = this.formatCommand(doc); const command = this.formatCommand(doc);
const err = await this.killTask(command); const err = await this.killTask(command);
const absolutePath = await this.handleLogPath(doc.log_path as string); const absolutePath = await this.handleLogPath(doc.log_path as string);
@ -425,7 +431,7 @@ export default class SubscriptionService {
public async disabled(ids: number[]) { public async disabled(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } }); const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) { for (const doc of docs) {
this.handleTask(doc, false); await this.handleTask(doc, false);
} }
await SubscriptionModel.update({ is_disabled: 1 }, { where: { id: ids } }); await SubscriptionModel.update({ is_disabled: 1 }, { where: { id: ids } });
} }
@ -433,7 +439,7 @@ export default class SubscriptionService {
public async enabled(ids: number[]) { public async enabled(ids: number[]) {
const docs = await SubscriptionModel.findAll({ where: { id: ids } }); const docs = await SubscriptionModel.findAll({ where: { id: ids } });
for (const doc of docs) { for (const doc of docs) {
this.handleTask(doc); await this.handleTask(doc);
} }
await SubscriptionModel.update({ is_disabled: 0 }, { where: { id: ids } }); await SubscriptionModel.update({ is_disabled: 0 }, { where: { id: ids } });
} }