订阅增加before/after

This commit is contained in:
whyour
2022-05-20 01:15:45 +08:00
parent bdecdce118
commit 189826c5db
6 changed files with 112 additions and 18 deletions
+6
View File
@@ -38,6 +38,9 @@ export default (app: Router) => {
dependences: Joi.string().optional().allow('').allow(null),
pull_type: Joi.string().optional().allow('').allow(null),
pull_option: Joi.object().optional().allow('').allow(null),
extensions: Joi.string().optional().allow('').allow(null),
sub_before: Joi.string().optional().allow('').allow(null),
sub_after: Joi.string().optional().allow('').allow(null),
schedule_type: Joi.string().required(),
alias: Joi.string().required(),
}),
@@ -167,6 +170,9 @@ export default (app: Router) => {
pull_type: Joi.string().optional().allow('').allow(null),
pull_option: Joi.object().optional().allow('').allow(null),
schedule_type: Joi.string().optional().allow('').allow(null),
extensions: Joi.string().optional().allow('').allow(null),
sub_before: Joi.string().optional().allow('').allow(null),
sub_after: Joi.string().optional().allow('').allow(null),
alias: Joi.string().required(),
id: Joi.number().required(),
}),
+9
View File
@@ -25,6 +25,9 @@ export class Subscription {
log_path?: string;
alias: string;
command?: string;
extensions?: string;
sub_before?: string;
sub_after?: string;
constructor(options: Subscription) {
this.id = options.id;
@@ -48,6 +51,9 @@ export class Subscription {
this.schedule_type = options.schedule_type;
this.alias = options.alias;
this.interval_schedule = options.interval_schedule;
this.extensions = options.extensions;
this.sub_before = options.sub_before;
this.sub_after = options.sub_after;
}
}
@@ -85,6 +91,9 @@ export const SubscriptionModel = sequelize.define<SubscriptionInstance>(
blacklist: DataTypes.STRING,
status: DataTypes.NUMBER,
dependences: DataTypes.STRING,
extensions: DataTypes.STRING,
sub_before: DataTypes.STRING,
sub_after: DataTypes.STRING,
branch: DataTypes.STRING,
pull_type: DataTypes.STRING,
pull_option: DataTypes.JSON,
+21 -13
View File
@@ -21,14 +21,16 @@ export interface TaskCallbacks {
onStart?: (
cp: ChildProcessWithoutNullStreams,
startTime: dayjs.Dayjs,
) => void;
) => Promise<void>;
onEnd?: (
cp: ChildProcessWithoutNullStreams,
endTime: dayjs.Dayjs,
diff: number,
) => void;
onLog?: (message: string) => void;
onError?: (message: string) => void;
) => Promise<void>;
onLog?: (message: string) => Promise<void>;
onError?: (message: string) => Promise<void>;
onBefore?: () => Promise<void>;
onAfter?: () => Promise<void>;
}
@Service()
@@ -44,33 +46,34 @@ export default class ScheduleService {
async runTask(command: string, callbacks: TaskCallbacks = {}) {
return new Promise(async (resolve, reject) => {
try {
await callbacks.onBefore?.();
const startTime = dayjs();
const cp = spawn(command, { shell: '/bin/bash' });
callbacks.onStart?.(cp, startTime);
await callbacks.onStart?.(cp, startTime);
cp.stdout.on('data', (data) => {
callbacks.onLog?.(data.toString());
cp.stdout.on('data', async (data) => {
await callbacks.onLog?.(data.toString());
});
cp.stderr.on('data', (data) => {
cp.stderr.on('data', async (data) => {
this.logger.error(
'执行任务 %s 失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
data.toString(),
);
callbacks.onError?.(data.toString());
await callbacks.onError?.(data.toString());
});
cp.on('error', (err) => {
cp.on('error', async (err) => {
this.logger.error(
'创建任务 %s 失败,时间:%s, 错误信息:%j',
command,
new Date().toLocaleString(),
err,
);
callbacks.onError?.(JSON.stringify(err));
await callbacks.onError?.(JSON.stringify(err));
});
cp.on('exit', async (code, signal) => {
@@ -82,7 +85,12 @@ export default class ScheduleService {
cp.on('close', async (code) => {
const endTime = dayjs();
this.logger.info(`${command} pid: ${cp.pid} closed ${code}`);
callbacks.onEnd?.(cp, endTime, endTime.diff(startTime, 'seconds'));
await callbacks.onEnd?.(
cp,
endTime,
endTime.diff(startTime, 'seconds'),
);
await callbacks.onAfter?.();
resolve(null);
});
} catch (error) {
@@ -92,7 +100,7 @@ export default class ScheduleService {
new Date().toLocaleString(),
error,
);
callbacks.onError?.(JSON.stringify(error));
await callbacks.onError?.(JSON.stringify(error));
resolve(null);
}
});
+36 -2
View File
@@ -97,13 +97,13 @@ export default class SubscriptionService {
private formatCommand(doc: Subscription, url?: string) {
let command = 'ql ';
let _url = url || this.formatUrl(doc).url;
const { type, whitelist, blacklist, dependences, branch } = doc;
const { type, whitelist, blacklist, dependences, branch, extensions } = doc;
if (type === 'file') {
command += `raw "${_url}"`;
} else {
command += `repo "${_url}" "${whitelist || ''}" "${blacklist || ''}" "${
dependences || ''
}" "${branch || ''}"`;
}" "${branch || ''}" "${extensions || ''}"`;
}
return command;
}
@@ -219,6 +219,40 @@ export default class SubscriptionService {
const absolutePath = await this.handleLogPath(sub.log_path as string);
fs.appendFileSync(absolutePath, `\n${message}`);
},
onBefore: async () => {
return new Promise((resolve, reject) => {
if (doc.sub_before) {
exec(doc.sub_before, async (err, stdout, stderr) => {
const absolutePath = await this.handleLogPath(
doc.log_path as string,
);
fs.appendFileSync(
absolutePath,
stdout || stderr || `${JSON.stringify(err || {})}`,
);
});
} else {
resolve();
}
});
},
onAfter: async () => {
return new Promise((resolve, reject) => {
if (doc.sub_after) {
exec(doc.sub_after, async (err, stdout, stderr) => {
const absolutePath = await this.handleLogPath(
doc.log_path as string,
);
fs.appendFileSync(
absolutePath,
stdout || stderr || `${JSON.stringify(err || {})}`,
);
});
} else {
resolve();
}
});
},
};
}