diff --git a/back/config/util.ts b/back/config/util.ts index dc6fd26c..7d61f74f 100644 --- a/back/config/util.ts +++ b/back/config/util.ts @@ -417,6 +417,27 @@ export async function getPid(cmd: string) { return pid ? Number(pid) : undefined; } +export async function getAllPids(cmd: string): Promise { + const taskCommand = `ps -eo pid,command | grep "${cmd}" | grep -v grep | awk '{print $1}'`; + const pidsStr = await promiseExec(taskCommand); + if (!pidsStr) return []; + return pidsStr + .split('\n') + .map((p) => Number(p.trim())) + .filter((p) => !isNaN(p) && p > 0); +} + +export async function killAllTasks(cmd: string): Promise { + const pids = await getAllPids(cmd); + for (const pid of pids) { + try { + await killTask(pid); + } catch (error) { + // Ignore errors if process already terminated + } + } +} + interface IVersion { version: string; changeLogLink: string; diff --git a/back/data/cron.ts b/back/data/cron.ts index 422b58e4..f2093798 100644 --- a/back/data/cron.ts +++ b/back/data/cron.ts @@ -22,6 +22,7 @@ export class Crontab { task_before?: string; task_after?: string; log_name?: string; + allow_multiple_instances?: 1 | 0; constructor(options: Crontab) { this.name = options.name; @@ -47,6 +48,7 @@ export class Crontab { this.task_before = options.task_before; this.task_after = options.task_after; this.log_name = options.log_name; + this.allow_multiple_instances = options.allow_multiple_instances || 0; } } @@ -87,4 +89,5 @@ export const CrontabModel = sequelize.define('Crontab', { task_before: DataTypes.STRING, task_after: DataTypes.STRING, log_name: DataTypes.STRING, + allow_multiple_instances: DataTypes.NUMBER, }); diff --git a/back/loaders/db.ts b/back/loaders/db.ts index 42c2af72..49fefb6b 100644 --- a/back/loaders/db.ts +++ b/back/loaders/db.ts @@ -19,51 +19,38 @@ export default async () => { await CrontabViewModel.sync(); // 初始化新增字段 - try { - await sequelize.query( - 'alter table CrontabViews add column filterRelation VARCHAR(255)', - ); - } catch (error) {} - try { - await sequelize.query( - 'alter table Subscriptions add column proxy VARCHAR(255)', - ); - } catch (error) {} - try { - await sequelize.query('alter table CrontabViews add column type NUMBER'); - } catch (error) {} - try { - await sequelize.query( - 'alter table Subscriptions add column autoAddCron NUMBER', - ); - } catch (error) {} - try { - await sequelize.query( - 'alter table Subscriptions add column autoDelCron NUMBER', - ); - } catch (error) {} - try { - await sequelize.query('alter table Crontabs add column sub_id NUMBER'); - } catch (error) {} - try { - await sequelize.query( - 'alter table Crontabs add column extra_schedules JSON', - ); - } catch (error) {} - try { - await sequelize.query('alter table Crontabs add column task_before TEXT'); - } catch (error) {} - try { - await sequelize.query('alter table Crontabs add column task_after TEXT'); - } catch (error) {} - try { - await sequelize.query( - 'alter table Crontabs add column log_name VARCHAR(255)', - ); - } catch (error) { } - try { - await sequelize.query('alter table Envs add column isPinned NUMBER'); - } catch (error) {} + const migrations = [ + { + table: 'CrontabViews', + column: 'filterRelation', + type: 'VARCHAR(255)', + }, + { table: 'Subscriptions', column: 'proxy', type: 'VARCHAR(255)' }, + { table: 'CrontabViews', column: 'type', type: 'NUMBER' }, + { table: 'Subscriptions', column: 'autoAddCron', type: 'NUMBER' }, + { table: 'Subscriptions', column: 'autoDelCron', type: 'NUMBER' }, + { table: 'Crontabs', column: 'sub_id', type: 'NUMBER' }, + { table: 'Crontabs', column: 'extra_schedules', type: 'JSON' }, + { table: 'Crontabs', column: 'task_before', type: 'TEXT' }, + { table: 'Crontabs', column: 'task_after', type: 'TEXT' }, + { table: 'Crontabs', column: 'log_name', type: 'VARCHAR(255)' }, + { + table: 'Crontabs', + column: 'allow_multiple_instances', + type: 'NUMBER', + }, + { table: 'Envs', column: 'isPinned', type: 'NUMBER' }, + ]; + + for (const migration of migrations) { + try { + await sequelize.query( + `alter table ${migration.table} add column ${migration.column} ${migration.type}`, + ); + } catch (error) { + // Column already exists or other error, continue + } + } Logger.info('✌️ DB loaded'); } catch (error) { diff --git a/back/services/cron.ts b/back/services/cron.ts index cd2975bd..ad08998c 100644 --- a/back/services/cron.ts +++ b/back/services/cron.ts @@ -9,6 +9,7 @@ import { getFileContentByName, fileExist, killTask, + killAllTasks, getUniqPath, safeJSONParse, isDemoEnv, @@ -28,7 +29,7 @@ import { logStreamManager } from '../shared/logStreamManager'; @Service() export default class CronService { - constructor(@Inject('logger') private logger: winston.Logger) { } + constructor(@Inject('logger') private logger: winston.Logger) {} private isNodeCron(cron: Crontab) { const { schedule, extra_schedules } = cron; @@ -57,7 +58,9 @@ export default class CronService { } let uniqPath = await getUniqPath(command, `${id}`); if (log_name) { - const normalizedLogName = log_name.startsWith('/') ? log_name : path.join(config.logPath, log_name); + const normalizedLogName = log_name.startsWith('/') + ? log_name + : path.join(config.logPath, log_name); if (normalizedLogName.startsWith(config.logPath)) { uniqPath = log_name; } @@ -162,7 +165,7 @@ export default class CronService { let cron; try { cron = await this.getDb({ id }); - } catch (err) { } + } catch (err) {} if (!cron) { continue; } @@ -462,12 +465,17 @@ export default class CronService { public async stop(ids: number[]) { const docs = await CrontabModel.findAll({ where: { id: ids } }); for (const doc of docs) { - if (doc.pid) { - try { - await killTask(doc.pid); - } catch (error) { - this.logger.error(error); - } + // Kill all running instances of this task + try { + const command = this.makeCommand(doc); + await killAllTasks(command); + this.logger.info( + `[panel][停止所有运行中的任务实例] 任务ID: ${doc.id}, 命令: ${command}`, + ); + } catch (error) { + this.logger.error( + `[panel][停止任务失败] 任务ID: ${doc.id}, 错误: ${error}`, + ); } } @@ -498,7 +506,10 @@ export default class CronService { let { id, command, log_name } = cron; - const uniqPath = log_name === '/dev/null' ? (await getUniqPath(command, `${id}`)) : log_name; + const uniqPath = + log_name === '/dev/null' + ? await getUniqPath(command, `${id}`) + : log_name; const logTime = dayjs().format('YYYY-MM-DD-HH-mm-ss-SSS'); const logDirPath = path.resolve(config.logPath, `${uniqPath}`); await fs.mkdir(logDirPath, { recursive: true }); diff --git a/back/shared/runCron.ts b/back/shared/runCron.ts index 29ed97e2..31f3ada4 100644 --- a/back/shared/runCron.ts +++ b/back/shared/runCron.ts @@ -2,10 +2,45 @@ import { spawn } from 'cross-spawn'; import taskLimit from './pLimit'; import Logger from '../loaders/logger'; import { ICron } from '../protos/cron'; +import { CrontabModel, CrontabStatus } from '../data/cron'; +import { killTask } from '../config/util'; export function runCron(cmd: string, cron: ICron): Promise { return taskLimit.runWithCronLimit(cron, () => { return new Promise(async (resolve: any) => { + // Check if the cron is already running and stop it (only if multiple instances are not allowed) + try { + const existingCron = await CrontabModel.findOne({ + where: { id: Number(cron.id) }, + }); + + // Default to single instance mode (0) for backward compatibility + const allowMultipleInstances = + existingCron?.allow_multiple_instances === 1; + + if ( + !allowMultipleInstances && + existingCron && + existingCron.pid && + (existingCron.status === CrontabStatus.running || + existingCron.status === CrontabStatus.queued) + ) { + Logger.info( + `[schedule][停止已运行任务] 任务ID: ${cron.id}, PID: ${existingCron.pid}`, + ); + await killTask(existingCron.pid); + // Update the status to idle after killing + await CrontabModel.update( + { status: CrontabStatus.idle, pid: undefined }, + { where: { id: Number(cron.id) } }, + ); + } + } catch (error) { + Logger.error( + `[schedule][检查已运行任务失败] 任务ID: ${cron.id}, 错误: ${error}`, + ); + } + Logger.info( `[schedule][开始执行任务] 参数 ${JSON.stringify({ ...cron, diff --git a/back/validation/schedule.ts b/back/validation/schedule.ts index 60d82bcd..475859a1 100644 --- a/back/validation/schedule.ts +++ b/back/validation/schedule.ts @@ -64,7 +64,11 @@ export const commonCronSchema = { return value; } - if (!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(value)) { + if ( + !/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test( + value, + ) + ) { return helpers.error('string.pattern.base'); } if (value.length > 100) { @@ -77,4 +81,5 @@ export const commonCronSchema = { 'string.max': '日志名称不能超过100个字符', 'string.unsafePath': '绝对路径必须在日志目录内或使用 /dev/null', }), + allow_multiple_instances: Joi.number().optional().valid(0, 1), }; diff --git a/src/locales/en-US.json b/src/locales/en-US.json index 73088cc1..4b2fbdc1 100644 --- a/src/locales/en-US.json +++ b/src/locales/en-US.json @@ -533,5 +533,10 @@ "日志名称不能超过100个字符": "Log name cannot exceed 100 characters", "未启用": "Not enabled", "默认为 CPU 个数": "Default is the number of CPUs", - "Minimum is 4": "Minimum is 4" + "Minimum is 4": "Minimum is 4", + "实例模式": "Instance Mode", + "单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例": "Single instance mode: automatically stop old task before starting new scheduled task; Multi-instance mode: allow multiple task instances to run simultaneously", + "请选择实例模式": "Please select instance mode", + "单实例": "Single Instance", + "多实例": "Multi-Instance" } diff --git a/src/locales/zh-CN.json b/src/locales/zh-CN.json index 454edbfb..fdb1ca10 100644 --- a/src/locales/zh-CN.json +++ b/src/locales/zh-CN.json @@ -533,5 +533,10 @@ "日志名称不能超过100个字符": "日志名称不能超过100个字符", "未启用": "未启用", "默认为 CPU 个数": "默认为 CPU 个数", - "最小是 4": "最小是 4" + "最小是 4": "最小是 4", + "实例模式": "实例模式", + "单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例": "单实例模式:定时启动新任务前会自动停止旧任务;多实例模式:允许同时运行多个任务实例", + "请选择实例模式": "请选择实例模式", + "单实例": "单实例", + "多实例": "多实例" } diff --git a/src/pages/crontab/modal.tsx b/src/pages/crontab/modal.tsx index a911b219..ac8e8182 100644 --- a/src/pages/crontab/modal.tsx +++ b/src/pages/crontab/modal.tsx @@ -180,6 +180,18 @@ const CronModal = ({ + + + 100) { return Promise.reject(intl.get('日志名称不能超过100个字符')); } - if (!/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test(value)) { + if ( + !/^(?!.*(?:^|\/)\.{1,2}(?:\/|$))(?:\/)?(?:[\w.-]+\/)*[\w.-]+\/?$/.test( + value, + ) + ) { return Promise.reject( intl.get('日志名称只能包含字母、数字、下划线和连字符'), ); diff --git a/src/pages/crontab/type.ts b/src/pages/crontab/type.ts index 890eaef8..a71eade3 100644 --- a/src/pages/crontab/type.ts +++ b/src/pages/crontab/type.ts @@ -37,6 +37,7 @@ export interface ICrontab { nextRunTime: Date; sub_id: number; extra_schedules?: Array<{ schedule: string }>; + allow_multiple_instances?: 1 | 0; } export enum ScheduleType {