mirror of
https://github.com/whyour/qinglong.git
synced 2025-11-22 16:38:33 +08:00
Fix task duplication: add single/multi-instance support with UI configuration and stop all running instances (#2837)
* Initial plan * Stop running tasks before starting new scheduled instance Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Add multi-instance support and fix stop to kill all running instances - Add allow_multiple_instances field to Crontab model (default: 0 for single instance) - Add validation for new field in commonCronSchema - Add getAllPids and killAllTasks utility functions - Update stop method to kill ALL running instances of a task - Update runCron to respect allow_multiple_instances config - Backward compatible: defaults to single instance mode Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Add UI support for allow_multiple_instances configuration - Add allow_multiple_instances field to ICrontab interface - Add instance mode selector in task creation/edit modal - Add translations for instance mode in Chinese and English - Default to single instance mode for backward compatibility Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> * Add allow_multiple_instances column migration and optimize db.ts - Add allow_multiple_instances column to Crontabs table migration - Refactor migration code to use data-driven approach - Replace 11 individual try-catch blocks with single loop - Improve code maintainability and readability Co-authored-by: whyour <22700758+whyour@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
parent
0e5de4a824
commit
03c7031a3c
|
|
@ -417,6 +417,27 @@ export async function getPid(cmd: string) {
|
||||||
return pid ? Number(pid) : undefined;
|
return pid ? Number(pid) : undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getAllPids(cmd: string): Promise<number[]> {
|
||||||
|
const taskCommand = `ps -eo pid,command | grep "${cmd}" | grep -v grep | awk '{print $1}'`;
|
||||||
|
const pidsStr = await promiseExec(taskCommand);
|
||||||
|
if (!pidsStr) return [];
|
||||||
|
return pidsStr
|
||||||
|
.split('\n')
|
||||||
|
.map((p) => Number(p.trim()))
|
||||||
|
.filter((p) => !isNaN(p) && p > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function killAllTasks(cmd: string): Promise<void> {
|
||||||
|
const pids = await getAllPids(cmd);
|
||||||
|
for (const pid of pids) {
|
||||||
|
try {
|
||||||
|
await killTask(pid);
|
||||||
|
} catch (error) {
|
||||||
|
// Ignore errors if process already terminated
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface IVersion {
|
interface IVersion {
|
||||||
version: string;
|
version: string;
|
||||||
changeLogLink: string;
|
changeLogLink: string;
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ export class Crontab {
|
||||||
task_before?: string;
|
task_before?: string;
|
||||||
task_after?: string;
|
task_after?: string;
|
||||||
log_name?: string;
|
log_name?: string;
|
||||||
|
allow_multiple_instances?: 1 | 0;
|
||||||
|
|
||||||
constructor(options: Crontab) {
|
constructor(options: Crontab) {
|
||||||
this.name = options.name;
|
this.name = options.name;
|
||||||
|
|
@ -47,6 +48,7 @@ export class Crontab {
|
||||||
this.task_before = options.task_before;
|
this.task_before = options.task_before;
|
||||||
this.task_after = options.task_after;
|
this.task_after = options.task_after;
|
||||||
this.log_name = options.log_name;
|
this.log_name = options.log_name;
|
||||||
|
this.allow_multiple_instances = options.allow_multiple_instances || 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -87,4 +89,5 @@ export const CrontabModel = sequelize.define<CronInstance>('Crontab', {
|
||||||
task_before: DataTypes.STRING,
|
task_before: DataTypes.STRING,
|
||||||
task_after: DataTypes.STRING,
|
task_after: DataTypes.STRING,
|
||||||
log_name: DataTypes.STRING,
|
log_name: DataTypes.STRING,
|
||||||
|
allow_multiple_instances: DataTypes.NUMBER,
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -19,51 +19,38 @@ export default async () => {
|
||||||
await CrontabViewModel.sync();
|
await CrontabViewModel.sync();
|
||||||
|
|
||||||
// 初始化新增字段
|
// 初始化新增字段
|
||||||
try {
|
const migrations = [
|
||||||
await sequelize.query(
|
{
|
||||||
'alter table CrontabViews add column filterRelation VARCHAR(255)',
|
table: 'CrontabViews',
|
||||||
);
|
column: 'filterRelation',
|
||||||
} catch (error) {}
|
type: 'VARCHAR(255)',
|
||||||
try {
|
},
|
||||||
await sequelize.query(
|
{ table: 'Subscriptions', column: 'proxy', type: 'VARCHAR(255)' },
|
||||||
'alter table Subscriptions add column proxy VARCHAR(255)',
|
{ table: 'CrontabViews', column: 'type', type: 'NUMBER' },
|
||||||
);
|
{ table: 'Subscriptions', column: 'autoAddCron', type: 'NUMBER' },
|
||||||
} catch (error) {}
|
{ table: 'Subscriptions', column: 'autoDelCron', type: 'NUMBER' },
|
||||||
try {
|
{ table: 'Crontabs', column: 'sub_id', type: 'NUMBER' },
|
||||||
await sequelize.query('alter table CrontabViews add column type NUMBER');
|
{ table: 'Crontabs', column: 'extra_schedules', type: 'JSON' },
|
||||||
} catch (error) {}
|
{ table: 'Crontabs', column: 'task_before', type: 'TEXT' },
|
||||||
try {
|
{ table: 'Crontabs', column: 'task_after', type: 'TEXT' },
|
||||||
await sequelize.query(
|
{ table: 'Crontabs', column: 'log_name', type: 'VARCHAR(255)' },
|
||||||
'alter table Subscriptions add column autoAddCron NUMBER',
|
{
|
||||||
);
|
table: 'Crontabs',
|
||||||
} catch (error) {}
|
column: 'allow_multiple_instances',
|
||||||
try {
|
type: 'NUMBER',
|
||||||
await sequelize.query(
|
},
|
||||||
'alter table Subscriptions add column autoDelCron NUMBER',
|
{ table: 'Envs', column: 'isPinned', type: 'NUMBER' },
|
||||||
);
|
];
|
||||||
} catch (error) {}
|
|
||||||
try {
|
for (const migration of migrations) {
|
||||||
await sequelize.query('alter table Crontabs add column sub_id NUMBER');
|
try {
|
||||||
} catch (error) {}
|
await sequelize.query(
|
||||||
try {
|
`alter table ${migration.table} add column ${migration.column} ${migration.type}`,
|
||||||
await sequelize.query(
|
);
|
||||||
'alter table Crontabs add column extra_schedules JSON',
|
} catch (error) {
|
||||||
);
|
// Column already exists or other error, continue
|
||||||
} catch (error) {}
|
}
|
||||||
try {
|
}
|
||||||
await sequelize.query('alter table Crontabs add column task_before TEXT');
|
|
||||||
} catch (error) {}
|
|
||||||
try {
|
|
||||||
await sequelize.query('alter table Crontabs add column task_after TEXT');
|
|
||||||
} catch (error) {}
|
|
||||||
try {
|
|
||||||
await sequelize.query(
|
|
||||||
'alter table Crontabs add column log_name VARCHAR(255)',
|
|
||||||
);
|
|
||||||
} catch (error) { }
|
|
||||||
try {
|
|
||||||
await sequelize.query('alter table Envs add column isPinned NUMBER');
|
|
||||||
} catch (error) {}
|
|
||||||
|
|
||||||
Logger.info('✌️ DB loaded');
|
Logger.info('✌️ DB loaded');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import {
|
||||||
getFileContentByName,
|
getFileContentByName,
|
||||||
fileExist,
|
fileExist,
|
||||||
killTask,
|
killTask,
|
||||||
|
killAllTasks,
|
||||||
getUniqPath,
|
getUniqPath,
|
||||||
safeJSONParse,
|
safeJSONParse,
|
||||||
isDemoEnv,
|
isDemoEnv,
|
||||||
|
|
@ -28,7 +29,7 @@ import { logStreamManager } from '../shared/logStreamManager';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export default class CronService {
|
export default class CronService {
|
||||||
constructor(@Inject('logger') private logger: winston.Logger) { }
|
constructor(@Inject('logger') private logger: winston.Logger) {}
|
||||||
|
|
||||||
private isNodeCron(cron: Crontab) {
|
private isNodeCron(cron: Crontab) {
|
||||||
const { schedule, extra_schedules } = cron;
|
const { schedule, extra_schedules } = cron;
|
||||||
|
|
@ -57,7 +58,9 @@ export default class CronService {
|
||||||
}
|
}
|
||||||
let uniqPath = await getUniqPath(command, `${id}`);
|
let uniqPath = await getUniqPath(command, `${id}`);
|
||||||
if (log_name) {
|
if (log_name) {
|
||||||
const normalizedLogName = log_name.startsWith('/') ? log_name : path.join(config.logPath, log_name);
|
const normalizedLogName = log_name.startsWith('/')
|
||||||
|
? log_name
|
||||||
|
: path.join(config.logPath, log_name);
|
||||||
if (normalizedLogName.startsWith(config.logPath)) {
|
if (normalizedLogName.startsWith(config.logPath)) {
|
||||||
uniqPath = log_name;
|
uniqPath = log_name;
|
||||||
}
|
}
|
||||||
|
|
@ -162,7 +165,7 @@ export default class CronService {
|
||||||
let cron;
|
let cron;
|
||||||
try {
|
try {
|
||||||
cron = await this.getDb({ id });
|
cron = await this.getDb({ id });
|
||||||
} catch (err) { }
|
} catch (err) {}
|
||||||
if (!cron) {
|
if (!cron) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -462,12 +465,17 @@ export default class CronService {
|
||||||
public async stop(ids: number[]) {
|
public async stop(ids: number[]) {
|
||||||
const docs = await CrontabModel.findAll({ where: { id: ids } });
|
const docs = await CrontabModel.findAll({ where: { id: ids } });
|
||||||
for (const doc of docs) {
|
for (const doc of docs) {
|
||||||
if (doc.pid) {
|
// Kill all running instances of this task
|
||||||
try {
|
try {
|
||||||
await killTask(doc.pid);
|
const command = this.makeCommand(doc);
|
||||||
} catch (error) {
|
await killAllTasks(command);
|
||||||
this.logger.error(error);
|
this.logger.info(
|
||||||
}
|
`[panel][停止所有运行中的任务实例] 任务ID: ${doc.id}, 命令: ${command}`,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(
|
||||||
|
`[panel][停止任务失败] 任务ID: ${doc.id}, 错误: ${error}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -498,7 +506,10 @@ export default class CronService {
|
||||||
|
|
||||||
let { id, command, log_name } = cron;
|
let { id, command, log_name } = cron;
|
||||||
|
|
||||||
const uniqPath = log_name === '/dev/null' ? (await getUniqPath(command, `${id}`)) : log_name;
|
const uniqPath =
|
||||||
|
log_name === '/dev/null'
|
||||||
|
? await getUniqPath(command, `${id}`)
|
||||||
|
: log_name;
|
||||||
const logTime = dayjs().format('YYYY-MM-DD-HH-mm-ss-SSS');
|
const logTime = dayjs().format('YYYY-MM-DD-HH-mm-ss-SSS');
|
||||||
const logDirPath = path.resolve(config.logPath, `${uniqPath}`);
|
const logDirPath = path.resolve(config.logPath, `${uniqPath}`);
|
||||||
await fs.mkdir(logDirPath, { recursive: true });
|
await fs.mkdir(logDirPath, { recursive: true });
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,45 @@ import { spawn } from 'cross-spawn';
|
||||||
import taskLimit from './pLimit';
|
import taskLimit from './pLimit';
|
||||||
import Logger from '../loaders/logger';
|
import Logger from '../loaders/logger';
|
||||||
import { ICron } from '../protos/cron';
|
import { ICron } from '../protos/cron';
|
||||||
|
import { CrontabModel, CrontabStatus } from '../data/cron';
|
||||||
|
import { killTask } from '../config/util';
|
||||||
|
|
||||||
export function runCron(cmd: string, cron: ICron): Promise<number | void> {
|
export function runCron(cmd: string, cron: ICron): Promise<number | void> {
|
||||||
return taskLimit.runWithCronLimit(cron, () => {
|
return taskLimit.runWithCronLimit(cron, () => {
|
||||||
return new Promise(async (resolve: any) => {
|
return new Promise(async (resolve: any) => {
|
||||||
|
// Check if the cron is already running and stop it (only if multiple instances are not allowed)
|
||||||
|
try {
|
||||||
|
const existingCron = await CrontabModel.findOne({
|
||||||
|
where: { id: Number(cron.id) },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Default to single instance mode (0) for backward compatibility
|
||||||
|
const allowMultipleInstances =
|
||||||
|
existingCron?.allow_multiple_instances === 1;
|
||||||
|
|
||||||
|
if (
|
||||||
|
!allowMultipleInstances &&
|
||||||
|
existingCron &&
|
||||||
|
existingCron.pid &&
|
||||||
|
(existingCron.status === CrontabStatus.running ||
|
||||||
|
existingCron.status === CrontabStatus.queued)
|
||||||
|
) {
|
||||||
|
Logger.info(
|
||||||
|
`[schedule][停止已运行任务] 任务ID: ${cron.id}, PID: ${existingCron.pid}`,
|
||||||
|
);
|
||||||
|
await killTask(existingCron.pid);
|
||||||
|
// Update the status to idle after killing
|
||||||
|
await CrontabModel.update(
|
||||||
|
{ status: CrontabStatus.idle, pid: undefined },
|
||||||
|
{ where: { id: Number(cron.id) } },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
Logger.error(
|
||||||
|
`[schedule][检查已运行任务失败] 任务ID: ${cron.id}, 错误: ${error}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Logger.info(
|
Logger.info(
|
||||||
`[schedule][开始执行任务] 参数 ${JSON.stringify({
|
`[schedule][开始执行任务] 参数 ${JSON.stringify({
|
||||||
...cron,
|
...cron,
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,11 @@ export const commonCronSchema = {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(value)) {
|
if (
|
||||||
|
!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(
|
||||||
|
value,
|
||||||
|
)
|
||||||
|
) {
|
||||||
return helpers.error('string.pattern.base');
|
return helpers.error('string.pattern.base');
|
||||||
}
|
}
|
||||||
if (value.length > 100) {
|
if (value.length > 100) {
|
||||||
|
|
@ -77,4 +81,5 @@ export const commonCronSchema = {
|
||||||
'string.max': '日志名称不能超过100个字符',
|
'string.max': '日志名称不能超过100个字符',
|
||||||
'string.unsafePath': '绝对路径必须在日志目录内或使用 /dev/null',
|
'string.unsafePath': '绝对路径必须在日志目录内或使用 /dev/null',
|
||||||
}),
|
}),
|
||||||
|
allow_multiple_instances: Joi.number().optional().valid(0, 1),
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -533,5 +533,10 @@
|
||||||
"日志名称不能超过100个字符": "Log name cannot exceed 100 characters",
|
"日志名称不能超过100个字符": "Log name cannot exceed 100 characters",
|
||||||
"未启用": "Not enabled",
|
"未启用": "Not enabled",
|
||||||
"默认为 CPU 个数": "Default is the number of CPUs",
|
"默认为 CPU 个数": "Default is the number of CPUs",
|
||||||
"Minimum is 4": "Minimum is 4"
|
"Minimum is 4": "Minimum is 4",
|
||||||
|
"实例模式": "Instance Mode",
|
||||||
|
"单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例": "Single instance mode: automatically stop old task before starting new scheduled task; Multi-instance mode: allow multiple task instances to run simultaneously",
|
||||||
|
"请选择实例模式": "Please select instance mode",
|
||||||
|
"单实例": "Single Instance",
|
||||||
|
"多实例": "Multi-Instance"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -533,5 +533,10 @@
|
||||||
"日志名称不能超过100个字符": "日志名称不能超过100个字符",
|
"日志名称不能超过100个字符": "日志名称不能超过100个字符",
|
||||||
"未启用": "未启用",
|
"未启用": "未启用",
|
||||||
"默认为 CPU 个数": "默认为 CPU 个数",
|
"默认为 CPU 个数": "默认为 CPU 个数",
|
||||||
"最小是 4": "最小是 4"
|
"最小是 4": "最小是 4",
|
||||||
|
"实例模式": "实例模式",
|
||||||
|
"单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例": "单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例",
|
||||||
|
"请选择实例模式": "请选择实例模式",
|
||||||
|
"单实例": "单实例",
|
||||||
|
"多实例": "多实例"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -180,6 +180,18 @@ const CronModal = ({
|
||||||
<Form.Item name="labels" label={intl.get('标签')}>
|
<Form.Item name="labels" label={intl.get('标签')}>
|
||||||
<EditableTagGroup />
|
<EditableTagGroup />
|
||||||
</Form.Item>
|
</Form.Item>
|
||||||
|
<Form.Item
|
||||||
|
name="allow_multiple_instances"
|
||||||
|
label={intl.get('实例模式')}
|
||||||
|
tooltip={intl.get(
|
||||||
|
'单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例',
|
||||||
|
)}
|
||||||
|
>
|
||||||
|
<Select placeholder={intl.get('请选择实例模式')}>
|
||||||
|
<Select.Option value={0}>{intl.get('单实例')}</Select.Option>
|
||||||
|
<Select.Option value={1}>{intl.get('多实例')}</Select.Option>
|
||||||
|
</Select>
|
||||||
|
</Form.Item>
|
||||||
<Form.Item
|
<Form.Item
|
||||||
name="log_name"
|
name="log_name"
|
||||||
label={intl.get('日志名称')}
|
label={intl.get('日志名称')}
|
||||||
|
|
@ -194,7 +206,11 @@ const CronModal = ({
|
||||||
if (value.length > 100) {
|
if (value.length > 100) {
|
||||||
return Promise.reject(intl.get('日志名称不能超过100个字符'));
|
return Promise.reject(intl.get('日志名称不能超过100个字符'));
|
||||||
}
|
}
|
||||||
if (!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(value)) {
|
if (
|
||||||
|
!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(
|
||||||
|
value,
|
||||||
|
)
|
||||||
|
) {
|
||||||
return Promise.reject(
|
return Promise.reject(
|
||||||
intl.get('日志名称只能包含字母、数字、下划线和连字符'),
|
intl.get('日志名称只能包含字母、数字、下划线和连字符'),
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ export interface ICrontab {
|
||||||
nextRunTime: Date;
|
nextRunTime: Date;
|
||||||
sub_id: number;
|
sub_id: number;
|
||||||
extra_schedules?: Array<{ schedule: string }>;
|
extra_schedules?: Array<{ schedule: string }>;
|
||||||
|
allow_multiple_instances?: 1 | 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum ScheduleType {
|
export enum ScheduleType {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user