定时任务支持 @once 和 @boot 任务

This commit is contained in:
whyour 2025-02-20 02:18:59 +08:00
parent 496918131f
commit 8173075b67
11 changed files with 242 additions and 118 deletions

View File

@ -4,7 +4,8 @@ import { Logger } from 'winston';
import CronService from '../services/cron'; import CronService from '../services/cron';
import CronViewService from '../services/cronView'; import CronViewService from '../services/cronView';
import { celebrate, Joi } from 'celebrate'; import { celebrate, Joi } from 'celebrate';
import cron_parser from 'cron-parser'; import { commonCronSchema } from '../validation/schedule';
const route = Router(); const route = Router();
export default (app: Router) => { export default (app: Router) => {
@ -170,27 +171,14 @@ export default (app: Router) => {
route.post( route.post(
'/', '/',
celebrate({ celebrate({
body: Joi.object({ body: Joi.object(commonCronSchema),
command: Joi.string().required(),
schedule: Joi.string().required(),
name: Joi.string().optional(),
labels: Joi.array().optional(),
sub_id: Joi.number().optional().allow(null),
extra_schedules: Joi.array().optional().allow(null),
task_before: Joi.string().optional().allow('').allow(null),
task_after: Joi.string().optional().allow('').allow(null),
}),
}), }),
async (req: Request, res: Response, next: NextFunction) => { async (req: Request, res: Response, next: NextFunction) => {
const logger: Logger = Container.get('logger'); const logger: Logger = Container.get('logger');
try { try {
if (cron_parser.parseExpression(req.body.schedule).hasNext()) { const cronService = Container.get(CronService);
const cronService = Container.get(CronService); const data = await cronService.create(req.body);
const data = await cronService.create(req.body); return res.send({ code: 200, data });
return res.send({ code: 200, data });
} else {
return res.send({ code: 400, message: 'param schedule error' });
}
} catch (e) { } catch (e) {
return next(e); return next(e);
} }
@ -331,30 +319,16 @@ export default (app: Router) => {
'/', '/',
celebrate({ celebrate({
body: Joi.object({ body: Joi.object({
labels: Joi.array().optional().allow(null), ...commonCronSchema,
command: Joi.string().required(),
schedule: Joi.string().required(),
name: Joi.string().optional().allow(null),
sub_id: Joi.number().optional().allow(null),
extra_schedules: Joi.array().optional().allow(null),
task_before: Joi.string().optional().allow('').allow(null),
task_after: Joi.string().optional().allow('').allow(null),
id: Joi.number().required(), id: Joi.number().required(),
}), }),
}), }),
async (req: Request, res: Response, next: NextFunction) => { async (req: Request, res: Response, next: NextFunction) => {
const logger: Logger = Container.get('logger'); const logger: Logger = Container.get('logger');
try { try {
if ( const cronService = Container.get(CronService);
!req.body.schedule || const data = await cronService.update(req.body);
cron_parser.parseExpression(req.body.schedule).hasNext() return res.send({ code: 200, data });
) {
const cronService = Container.get(CronService);
const data = await cronService.update(req.body);
return res.send({ code: 200, data });
} else {
return res.send({ code: 400, message: 'param schedule error' });
}
} catch (e) { } catch (e) {
return next(e); return next(e);
} }
@ -418,7 +392,7 @@ export default (app: Router) => {
const logger: Logger = Container.get('logger'); const logger: Logger = Container.get('logger');
try { try {
const cronService = Container.get(CronService); const cronService = Container.get(CronService);
const data = await cronService.import_crontab(); const data = await cronService.importCrontab();
return res.send({ code: 200, data }); return res.send({ code: 200, data });
} catch (e) { } catch (e) {
return next(e); return next(e);

View File

@ -17,6 +17,7 @@ async function startServer() {
Logger.debug(`✌️ 后端服务启动成功!`); Logger.debug(`✌️ 后端服务启动成功!`);
console.debug(`✌️ 后端服务启动成功!`); console.debug(`✌️ 后端服务启动成功!`);
process.send?.('ready'); process.send?.('ready');
require('./loaders/bootAfter').default();
}) })
.on('error', (err) => { .on('error', (err) => {
Logger.error(err); Logger.error(err);

View File

@ -0,0 +1,8 @@
import Container from 'typedi';
import CronService from '../services/cron';
export default async () => {
const cronService = Container.get(CronService);
await cronService.bootTask();
};

View File

@ -35,11 +35,24 @@ export default class CronService {
return false; return false;
} }
private isOnceSchedule(schedule?: string) {
return schedule?.startsWith('@once');
}
private isBootSchedule(schedule?: string) {
return schedule?.startsWith('@boot');
}
private isSpecialSchedule(schedule?: string) {
return this.isOnceSchedule(schedule) || this.isBootSchedule(schedule);
}
public async create(payload: Crontab): Promise<Crontab> { public async create(payload: Crontab): Promise<Crontab> {
const tab = new Crontab(payload); const tab = new Crontab(payload);
tab.saved = false; tab.saved = false;
const doc = await this.insert(tab); const doc = await this.insert(tab);
if (this.isNodeCron(doc)) {
if (this.isNodeCron(doc) && !this.isSpecialSchedule(doc.schedule)) {
await cronClient.addCron([ await cronClient.addCron([
{ {
name: doc.name || '', name: doc.name || '',
@ -50,7 +63,8 @@ export default class CronService {
}, },
]); ]);
} }
await this.set_crontab();
await this.setCrontab();
return doc; return doc;
} }
@ -63,13 +77,16 @@ export default class CronService {
const tab = new Crontab({ ...doc, ...payload }); const tab = new Crontab({ ...doc, ...payload });
tab.saved = false; tab.saved = false;
const newDoc = await this.updateDb(tab); const newDoc = await this.updateDb(tab);
if (doc.isDisabled === 1) { if (doc.isDisabled === 1) {
return newDoc; return newDoc;
} }
if (this.isNodeCron(doc)) { if (this.isNodeCron(doc)) {
await cronClient.delCron([String(doc.id)]); await cronClient.delCron([String(doc.id)]);
} }
if (this.isNodeCron(newDoc)) {
if (this.isNodeCron(newDoc) && !this.isSpecialSchedule(newDoc.schedule)) {
await cronClient.addCron([ await cronClient.addCron([
{ {
name: doc.name || '', name: doc.name || '',
@ -80,7 +97,8 @@ export default class CronService {
}, },
]); ]);
} }
await this.set_crontab();
await this.setCrontab();
return newDoc; return newDoc;
} }
@ -135,7 +153,7 @@ export default class CronService {
public async remove(ids: number[]) { public async remove(ids: number[]) {
await CrontabModel.destroy({ where: { id: ids } }); await CrontabModel.destroy({ where: { id: ids } });
await cronClient.delCron(ids.map(String)); await cronClient.delCron(ids.map(String));
await this.set_crontab(); await this.setCrontab();
} }
public async pin(ids: number[]) { public async pin(ids: number[]) {
@ -381,7 +399,7 @@ export default class CronService {
try { try {
const result = await CrontabModel.findAll(condition); const result = await CrontabModel.findAll(condition);
const count = await CrontabModel.count({ where: query }); const count = await CrontabModel.count({ where: query });
return { data: result, total: count }; return { data: result.map((x) => x.get({ plain: true })), total: count };
} catch (error) { } catch (error) {
throw error; throw error;
} }
@ -502,7 +520,7 @@ export default class CronService {
public async disabled(ids: number[]) { public async disabled(ids: number[]) {
await CrontabModel.update({ isDisabled: 1 }, { where: { id: ids } }); await CrontabModel.update({ isDisabled: 1 }, { where: { id: ids } });
await cronClient.delCron(ids.map(String)); await cronClient.delCron(ids.map(String));
await this.set_crontab(); await this.setCrontab();
} }
public async enabled(ids: number[]) { public async enabled(ids: number[]) {
@ -518,7 +536,7 @@ export default class CronService {
extra_schedules: doc.extra_schedules || [], extra_schedules: doc.extra_schedules || [],
})); }));
await cronClient.addCron(sixCron); await cronClient.addCron(sixCron);
await this.set_crontab(); await this.setCrontab();
} }
public async log(id: number) { public async log(id: number) {
@ -586,7 +604,7 @@ export default class CronService {
return crontab_job_string; return crontab_job_string;
} }
private async set_crontab(data?: { data: Crontab[]; total: number }) { private async setCrontab(data?: { data: Crontab[]; total: number }) {
const tabs = data ?? (await this.crontabs()); const tabs = data ?? (await this.crontabs());
var crontab_string = ''; var crontab_string = '';
tabs.data.forEach((tab) => { tabs.data.forEach((tab) => {
@ -594,7 +612,8 @@ export default class CronService {
if ( if (
tab.isDisabled === 1 || tab.isDisabled === 1 ||
_schedule!.length !== 5 || _schedule!.length !== 5 ||
tab.extra_schedules?.length tab.extra_schedules?.length ||
this.isSpecialSchedule(tab.schedule)
) { ) {
crontab_string += '# '; crontab_string += '# ';
crontab_string += tab.schedule; crontab_string += tab.schedule;
@ -615,7 +634,7 @@ export default class CronService {
await CrontabModel.update({ saved: true }, { where: {} }); await CrontabModel.update({ saved: true }, { where: {} });
} }
public import_crontab() { public importCrontab() {
exec('crontab -l', (error, stdout, stderr) => { exec('crontab -l', (error, stdout, stderr) => {
const lines = stdout.split('\n'); const lines = stdout.split('\n');
const namePrefix = new Date().getTime(); const namePrefix = new Date().getTime();
@ -651,10 +670,15 @@ export default class CronService {
public async autosave_crontab() { public async autosave_crontab() {
const tabs = await this.crontabs(); const tabs = await this.crontabs();
this.set_crontab(tabs); this.setCrontab(tabs);
const sixCron = tabs.data const regularCrons = tabs.data
.filter((x) => this.isNodeCron(x) && x.isDisabled !== 1) .filter(
(x) =>
this.isNodeCron(x) &&
x.isDisabled !== 1 &&
!this.isSpecialSchedule(x.schedule),
)
.map((doc) => ({ .map((doc) => ({
name: doc.name || '', name: doc.name || '',
id: String(doc.id), id: String(doc.id),
@ -662,6 +686,22 @@ export default class CronService {
command: this.makeCommand(doc), command: this.makeCommand(doc),
extra_schedules: doc.extra_schedules || [], extra_schedules: doc.extra_schedules || [],
})); }));
await cronClient.addCron(sixCron); await cronClient.addCron(regularCrons);
}
public async bootTask() {
const tabs = await this.crontabs();
const bootTasks = tabs.data.filter(
(x) => !x.isDisabled && this.isBootSchedule(x.schedule),
);
if (bootTasks.length > 0) {
await CrontabModel.update(
{ status: CrontabStatus.queued },
{ where: { id: bootTasks.map((t) => t.id!) } },
);
for (const task of bootTasks) {
await this.runSingle(task.id!);
}
}
} }
} }

View File

@ -0,0 +1,36 @@
import { Joi } from 'celebrate';
import cron_parser from 'cron-parser';
const validateSchedule = (value: string, helpers: any) => {
if (value.startsWith('@once') || value.startsWith('@boot')) {
return value;
}
try {
if (cron_parser.parseExpression(value).hasNext()) {
return value;
}
} catch (e) {
return helpers.error('any.invalid');
}
return helpers.error('any.invalid');
};
export const scheduleSchema = Joi.string()
.required()
.custom(validateSchedule)
.messages({
'any.invalid': '无效的定时规则',
'string.empty': '定时规则不能为空',
});
export const commonCronSchema = {
name: Joi.string().optional(),
command: Joi.string().required(),
schedule: scheduleSchema,
labels: Joi.array().optional(),
sub_id: Joi.number().optional().allow(null),
extra_schedules: Joi.array().optional().allow(null),
task_before: Joi.string().optional().allow('').allow(null),
task_after: Joi.string().optional().allow('').allow(null),
};

View File

@ -479,7 +479,7 @@ handle_task_end() {
[[ "$diff_time" == 0 ]] && diff_time=1 [[ "$diff_time" == 0 ]] && diff_time=1
if [[ $ID ]]; then if [[ $ID ]]; then
local error=$(update_cron "\"$ID\"" "1" "" "$log_path" "$begin_timestamp" "$diff_time") local error=$(update_cron "\"$ID\"" "1" "$$" "$log_path" "$begin_timestamp" "$diff_time")
if [[ $error ]]; then if [[ $error ]]; then
error_message=", 任务状态更新失败(${error})" error_message=", 任务状态更新失败(${error})"
fi fi

View File

@ -553,7 +553,7 @@ main() {
local end_time=$(format_time "$time_format" "$etime") local end_time=$(format_time "$time_format" "$etime")
local end_timestamp=$(format_timestamp "$time_format" "$etime") local end_timestamp=$(format_timestamp "$time_format" "$etime")
local diff_time=$(($end_timestamp - $begin_timestamp)) local diff_time=$(($end_timestamp - $begin_timestamp))
[[ $ID ]] && update_cron "\"$ID\"" "1" "" "$log_path" "$begin_timestamp" "$diff_time" [[ $ID ]] && update_cron "\"$ID\"" "1" "$$" "$log_path" "$begin_timestamp" "$diff_time"
if [[ "$p1" != "repo" ]] && [[ "$p1" != "raw" ]]; then if [[ "$p1" != "repo" ]] && [[ "$p1" != "raw" ]]; then
eval echo -e "\\\n\#\# 执行结束... $end_time 耗时 $diff_time 秒     " $cmd eval echo -e "\\\n\#\# 执行结束... $end_time 耗时 $diff_time 秒     " $cmd

View File

@ -496,5 +496,8 @@
"NPM 镜像源": "NPM Mirror Source", "NPM 镜像源": "NPM Mirror Source",
"PyPI 镜像源": "PyPI Mirror Source", "PyPI 镜像源": "PyPI Mirror Source",
"alpine linux 镜像源": "Alpine Linux Mirror Source", "alpine linux 镜像源": "Alpine Linux Mirror Source",
"如果恢复失败,可进入容器执行": "If recovery fails, you can enter the container and execute" "如果恢复失败,可进入容器执行": "If recovery fails, you can enter the container and execute",
"常规定时": "Normal Timing",
"手动运行": "Manual Run",
"开机运行": "Boot Run"
} }

View File

@ -496,5 +496,9 @@
"NPM 镜像源": "NPM 镜像源", "NPM 镜像源": "NPM 镜像源",
"PyPI 镜像源": "PyPI 镜像源", "PyPI 镜像源": "PyPI 镜像源",
"alpine linux 镜像源": "alpine linux 镜像源", "alpine linux 镜像源": "alpine linux 镜像源",
"如果恢复失败,可进入容器执行": "如果恢复失败,可进入容器执行" "如果恢复失败,可进入容器执行": "如果恢复失败,可进入容器执行",
"常规定时": "常规定时",
"手动运行": "手动运行",
"开机运行": "开机运行"
} }

View File

@ -263,7 +263,9 @@ const Crontab = () => {
}, },
}, },
render: (text, record) => { render: (text, record) => {
return dayjs(record.nextRunTime).format('YYYY-MM-DD HH:mm:ss'); return record.nextRunTime
? dayjs(record.nextRunTime).format('YYYY-MM-DD HH:mm:ss')
: '-';
}, },
}, },
{ {
@ -396,9 +398,13 @@ const Crontab = () => {
setValue( setValue(
data.map((x) => { data.map((x) => {
const specialSchedules = ['@once', '@boot'];
const nextRunTime = specialSchedules.includes(x.schedule)
? null
: getCrontabsNextDate(x.schedule, x.extra_schedules);
return { return {
...x, ...x,
nextRunTime: getCrontabsNextDate(x.schedule, x.extra_schedules), nextRunTime,
subscription: subscriptionMap?.[x.sub_id], subscription: subscriptionMap?.[x.sub_id],
}; };
}), }),

View File

@ -1,12 +1,30 @@
import intl from 'react-intl-universal'; import intl from 'react-intl-universal';
import React, { useEffect, useState } from 'react'; import React, { useEffect, useState } from 'react';
import { Modal, message, Input, Form, Button, Space } from 'antd'; import { Modal, message, Input, Form, Button, Space, Select } from 'antd';
import { request } from '@/utils/http'; import { request } from '@/utils/http';
import config from '@/utils/config'; import config from '@/utils/config';
import cronParse from 'cron-parser'; import cronParse from 'cron-parser';
import EditableTagGroup from '@/components/tag'; import EditableTagGroup from '@/components/tag';
import { MinusCircleOutlined, PlusOutlined } from '@ant-design/icons'; import { MinusCircleOutlined, PlusOutlined } from '@ant-design/icons';
enum ScheduleType {
Normal = 'normal',
Once = 'once',
Boot = 'boot',
}
const scheduleTypeMap = {
[ScheduleType.Normal]: '',
[ScheduleType.Once]: '@once',
[ScheduleType.Boot]: '@boot',
};
const getScheduleType = (schedule?: string): ScheduleType => {
if (schedule?.startsWith('@once')) return ScheduleType.Once;
if (schedule?.startsWith('@boot')) return ScheduleType.Boot;
return ScheduleType.Normal;
};
const CronModal = ({ const CronModal = ({
cron, cron,
handleCancel, handleCancel,
@ -18,15 +36,26 @@ const CronModal = ({
}) => { }) => {
const [form] = Form.useForm(); const [form] = Form.useForm();
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
const [scheduleType, setScheduleType] = useState<ScheduleType>(
cron ? getScheduleType(cron.schedule) : ScheduleType.Normal,
);
const handleOk = async (values: any) => { const handleOk = async (values: any) => {
setLoading(true); setLoading(true);
const method = cron?.id ? 'put' : 'post';
const payload = { ...values };
if (cron?.id) {
payload.id = cron.id;
}
try { try {
const method = cron?.id ? 'put' : 'post';
const payload = {
...values,
schedule:
scheduleType !== ScheduleType.Normal
? scheduleTypeMap[scheduleType]
: values.schedule,
};
if (cron?.id) {
payload.id = cron.id;
}
const { code, data } = await request[method]( const { code, data } = await request[method](
`${config.apiPrefix}crons`, `${config.apiPrefix}crons`,
payload, payload,
@ -38,74 +67,53 @@ const CronModal = ({
); );
handleCancel(data); handleCancel(data);
} }
setLoading(false);
} catch (error: any) { } catch (error: any) {
console.error(error);
} finally {
setLoading(false); setLoading(false);
} }
}; };
useEffect(() => { useEffect(() => {
form.resetFields(); form.resetFields();
setScheduleType(getScheduleType(cron?.schedule));
}, [cron, visible]); }, [cron, visible]);
return ( const handleScheduleTypeChange = (type: ScheduleType) => {
<Modal setScheduleType(type);
title={cron?.id ? intl.get('编辑任务') : intl.get('创建任务')} form.setFieldValue('schedule', '');
open={visible} };
forceRender
centered const renderScheduleOptions = () => (
maskClosable={false} <Select defaultValue={scheduleType} value={scheduleType} onChange={handleScheduleTypeChange}>
onOk={() => { <Select.Option value={ScheduleType.Normal}>
form {intl.get('常规定时')}
.validateFields() </Select.Option>
.then((values) => { <Select.Option value={ScheduleType.Once}>
handleOk(values); {intl.get('手动运行')}
}) </Select.Option>
.catch((info) => { <Select.Option value={ScheduleType.Boot}>
console.log('Validate Failed:', info); {intl.get('开机运行')}
}); </Select.Option>
}} </Select>
onCancel={() => handleCancel()} );
confirmLoading={loading}
> const renderScheduleFields = () => {
<Form if (scheduleType !== ScheduleType.Normal) return null;
form={form}
layout="vertical" return (
name="form_in_modal" <>
initialValues={cron}
>
<Form.Item
name="name"
label={intl.get('名称')}
rules={[{ required: true, whitespace: true }]}
>
<Input placeholder={intl.get('请输入任务名称')} />
</Form.Item>
<Form.Item
name="command"
label={intl.get('命令/脚本')}
rules={[{ required: true, whitespace: true }]}
>
<Input.TextArea
rows={4}
autoSize={{ minRows: 1, maxRows: 5 }}
placeholder={intl.get(
'支持输入脚本路径/任意系统可执行命令/task 脚本路径',
)}
/>
</Form.Item>
<Form.Item <Form.Item
name="schedule" name="schedule"
label={intl.get('定时规则')} label={intl.get('定时规则')}
rules={[ rules={[
{ required: true }, { required: true },
{ {
validator: (rule, value) => { validator: (_, value) => {
if (!value || cronParse.parseExpression(value).hasNext()) { if (!value || cronParse.parseExpression(value).hasNext()) {
return Promise.resolve(); return Promise.resolve();
} else {
return Promise.reject(intl.get('Cron表达式格式有误'));
} }
return Promise.reject(intl.get('Cron表达式格式有误'));
}, },
}, },
]} ]}
@ -136,14 +144,58 @@ const CronModal = ({
))} ))}
<Form.Item> <Form.Item>
<a onClick={() => add({ schedule: '' })}> <a onClick={() => add({ schedule: '' })}>
<PlusOutlined /> <PlusOutlined /> {intl.get('新增定时规则')}
{intl.get('新增定时规则')}
</a> </a>
</Form.Item> </Form.Item>
<Form.ErrorList errors={errors} /> <Form.ErrorList errors={errors} />
</> </>
)} )}
</Form.List> </Form.List>
</>
);
};
return (
<Modal
title={cron?.id ? intl.get('编辑任务') : intl.get('创建任务')}
open={visible}
forceRender
centered
maskClosable={false}
onOk={() => form.validateFields().then(handleOk)}
onCancel={() => handleCancel()}
confirmLoading={loading}
>
<Form
form={form}
layout="vertical"
name="form_in_modal"
initialValues={cron}
>
<Form.Item
name="name"
label={intl.get('名称')}
rules={[{ required: true, whitespace: true }]}
>
<Input placeholder={intl.get('请输入任务名称')} />
</Form.Item>
<Form.Item
name="command"
label={intl.get('命令/脚本')}
rules={[{ required: true, whitespace: true }]}
>
<Input.TextArea
rows={4}
autoSize={{ minRows: 1, maxRows: 5 }}
placeholder={intl.get(
'支持输入脚本路径/任意系统可执行命令/task 脚本路径',
)}
/>
</Form.Item>
<Form.Item label={intl.get('定时类型')} required>
{renderScheduleOptions()}
</Form.Item>
{renderScheduleFields()}
<Form.Item name="labels" label={intl.get('标签')}> <Form.Item name="labels" label={intl.get('标签')}>
<EditableTagGroup /> <EditableTagGroup />
</Form.Item> </Form.Item>
@ -155,7 +207,7 @@ const CronModal = ({
)} )}
rules={[ rules={[
{ {
validator(rule, value) { validator(_, value) {
if ( if (
value && value &&
(value.includes(' task ') || value.startsWith('task ')) (value.includes(' task ') || value.startsWith('task '))
@ -183,7 +235,7 @@ const CronModal = ({
)} )}
rules={[ rules={[
{ {
validator(rule, value) { validator(_, value) {
if ( if (
value && value &&
(value.includes(' task ') || value.startsWith('task ')) (value.includes(' task ') || value.startsWith('task '))