mirror of
https://github.com/whyour/qinglong.git
synced 2026-07-01 04:40:38 +08:00
增加运行实例
This commit is contained in:
@@ -5,6 +5,10 @@ import CronService from '../services/cron';
|
||||
import CronViewService from '../services/cronView';
|
||||
import { celebrate, Joi } from 'celebrate';
|
||||
import { commonCronSchema } from '../validation/schedule';
|
||||
import {
|
||||
RunningInstanceModel,
|
||||
InstanceStatus,
|
||||
} from '../data/runningInstance';
|
||||
|
||||
const route = Router();
|
||||
|
||||
@@ -446,6 +450,49 @@ export default (app: Router) => {
|
||||
},
|
||||
);
|
||||
|
||||
route.get(
|
||||
'/:id/instances',
|
||||
celebrate({
|
||||
params: Joi.object({
|
||||
id: Joi.number().required(),
|
||||
}),
|
||||
}),
|
||||
async (req: Request<{ id: number }>, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const instances = await RunningInstanceModel.findAll({
|
||||
where: {
|
||||
cron_id: req.params.id,
|
||||
status: InstanceStatus.running,
|
||||
},
|
||||
order: [['started_at', 'DESC']],
|
||||
raw: true,
|
||||
});
|
||||
return res.send({ code: 200, data: instances });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
route.post(
|
||||
'/:id/instances/:instanceId/stop',
|
||||
celebrate({
|
||||
params: Joi.object({
|
||||
id: Joi.number().required(),
|
||||
instanceId: Joi.number().required(),
|
||||
}),
|
||||
}),
|
||||
async (req: Request<{ id: number; instanceId: number }>, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const cronService = Container.get(CronService);
|
||||
const data = await cronService.stopInstance(req.params.instanceId);
|
||||
return res.send(data);
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
route.get(
|
||||
'/:id/logs',
|
||||
celebrate({
|
||||
|
||||
+31
-11
@@ -3,6 +3,10 @@ import { Container } from 'typedi';
|
||||
import { fn, col, where, Op } from 'sequelize';
|
||||
import { CrontabModel } from '../data/cron';
|
||||
import { CrontabStatModel } from '../data/cronStats';
|
||||
import {
|
||||
RunningInstanceModel,
|
||||
InstanceStatus,
|
||||
} from '../data/runningInstance';
|
||||
import dayjs from 'dayjs';
|
||||
import os from 'os';
|
||||
|
||||
@@ -239,9 +243,9 @@ export default (app: Router) => {
|
||||
'/runtime',
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const runningCrons = await CrontabModel.findAll({
|
||||
const runningInstances = await RunningInstanceModel.findAll({
|
||||
where: {
|
||||
status: 0, // running
|
||||
status: InstanceStatus.running,
|
||||
},
|
||||
raw: true,
|
||||
});
|
||||
@@ -253,15 +257,31 @@ export default (app: Router) => {
|
||||
raw: true,
|
||||
});
|
||||
|
||||
const running = runningCrons.map((c: any) => ({
|
||||
id: c.id,
|
||||
name: c.name || c.command || `任务#${c.id}`,
|
||||
pid: c.pid,
|
||||
elapsed: c.last_execution_time
|
||||
? Math.floor((Date.now() / 1000) - c.last_execution_time)
|
||||
: 0,
|
||||
logPath: c.log_path,
|
||||
}));
|
||||
// Fetch cron names for running instances
|
||||
const cronIds = [
|
||||
...new Set(runningInstances.map((i: any) => i.cron_id)),
|
||||
];
|
||||
const crons =
|
||||
cronIds.length > 0
|
||||
? await CrontabModel.findAll({
|
||||
where: { id: cronIds },
|
||||
raw: true,
|
||||
})
|
||||
: [];
|
||||
const cronMap = new Map(crons.map((c: any) => [c.id, c]));
|
||||
|
||||
const now = dayjs().unix();
|
||||
const running = runningInstances.map((inst: any) => {
|
||||
const cron = cronMap.get(inst.cron_id);
|
||||
return {
|
||||
instanceId: inst.id,
|
||||
id: inst.cron_id,
|
||||
name: cron?.name || cron?.command || `任务#${inst.cron_id}`,
|
||||
pid: inst.pid,
|
||||
elapsed: inst.started_at ? now - inst.started_at : 0,
|
||||
logPath: inst.log_path,
|
||||
};
|
||||
});
|
||||
|
||||
const dayAgo = dayjs().subtract(24, 'hour').unix();
|
||||
const idleTasks = await CrontabModel.findAll({
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
import { sequelize } from '.';
|
||||
import { DataTypes, Model } from 'sequelize';
|
||||
|
||||
export enum InstanceStatus {
|
||||
'running' = 0,
|
||||
'finished' = 1,
|
||||
'stopped' = 2,
|
||||
'error' = 3,
|
||||
}
|
||||
|
||||
export interface RunningInstanceAttributes {
|
||||
id?: number;
|
||||
cron_id: number;
|
||||
pid?: number;
|
||||
log_path?: string;
|
||||
started_at: number;
|
||||
finished_at?: number;
|
||||
status: InstanceStatus;
|
||||
exit_code?: number;
|
||||
}
|
||||
|
||||
export class RunningInstance {
|
||||
id?: number;
|
||||
cron_id!: number;
|
||||
pid?: number;
|
||||
log_path?: string;
|
||||
started_at!: number;
|
||||
finished_at?: number;
|
||||
status!: InstanceStatus;
|
||||
exit_code?: number;
|
||||
|
||||
constructor(options: RunningInstanceAttributes) {
|
||||
this.id = options.id;
|
||||
this.cron_id = options.cron_id;
|
||||
this.pid = options.pid;
|
||||
this.log_path = options.log_path;
|
||||
this.started_at = options.started_at;
|
||||
this.finished_at = options.finished_at;
|
||||
this.status = options.status;
|
||||
this.exit_code = options.exit_code;
|
||||
}
|
||||
}
|
||||
|
||||
export interface RunningInstanceModel
|
||||
extends Model<RunningInstanceAttributes, RunningInstanceAttributes>,
|
||||
RunningInstanceAttributes {}
|
||||
|
||||
export const RunningInstanceModel = sequelize.define<RunningInstanceModel>(
|
||||
'RunningInstance',
|
||||
{
|
||||
cron_id: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: false,
|
||||
},
|
||||
pid: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: true,
|
||||
},
|
||||
log_path: {
|
||||
type: DataTypes.STRING,
|
||||
allowNull: true,
|
||||
},
|
||||
started_at: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: false,
|
||||
},
|
||||
finished_at: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: true,
|
||||
},
|
||||
status: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: false,
|
||||
defaultValue: InstanceStatus.running,
|
||||
},
|
||||
exit_code: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: true,
|
||||
},
|
||||
},
|
||||
);
|
||||
@@ -7,6 +7,7 @@ import { SystemModel } from '../data/system';
|
||||
import { SubscriptionModel } from '../data/subscription';
|
||||
import { CrontabViewModel } from '../data/cronView';
|
||||
import { CrontabStatModel } from '../data/cronStats';
|
||||
import { RunningInstanceModel } from '../data/runningInstance';
|
||||
import { sequelize } from '../data';
|
||||
|
||||
export default async () => {
|
||||
@@ -19,6 +20,7 @@ export default async () => {
|
||||
await SubscriptionModel.sync();
|
||||
await CrontabViewModel.sync();
|
||||
await CrontabStatModel.sync();
|
||||
await RunningInstanceModel.sync();
|
||||
|
||||
// 初始化新增字段
|
||||
const migrations = [
|
||||
|
||||
@@ -18,6 +18,7 @@ import OpenService from '../services/open';
|
||||
import { shareStore } from '../shared/store';
|
||||
import Logger from './logger';
|
||||
import { AppModel } from '../data/open';
|
||||
import { InstanceStatus, RunningInstanceModel } from '../data/runningInstance';
|
||||
|
||||
export default async () => {
|
||||
const cronService = Container.get(CronService);
|
||||
@@ -139,6 +140,12 @@ export default async () => {
|
||||
// 初始化更新所有任务状态为空闲
|
||||
await CrontabModel.update({ status: CrontabStatus.idle }, { where: {} });
|
||||
|
||||
// 清空所有运行中的实例记录(服务重启后进程已不存在)
|
||||
await RunningInstanceModel.update(
|
||||
{ status: InstanceStatus.stopped },
|
||||
{ where: { status: InstanceStatus.running } },
|
||||
);
|
||||
|
||||
// 初始化时执行一次所有的 ql repo 任务
|
||||
CrontabModel.findAll({
|
||||
where: {
|
||||
|
||||
+102
-3
@@ -2,6 +2,10 @@ import { Service, Inject } from 'typedi';
|
||||
import winston from 'winston';
|
||||
import config from '../config';
|
||||
import { Crontab, CrontabModel, CrontabStatus } from '../data/cron';
|
||||
import {
|
||||
RunningInstanceModel,
|
||||
InstanceStatus,
|
||||
} from '../data/runningInstance';
|
||||
import { exec, execSync } from 'child_process';
|
||||
import fs from 'fs/promises';
|
||||
import CronExpressionParser from 'cron-parser';
|
||||
@@ -189,6 +193,35 @@ export default class CronService {
|
||||
if (status === CrontabStatus.idle && log_path !== cron.log_path) {
|
||||
options = omit(options, ['status', 'log_path', 'pid']);
|
||||
}
|
||||
|
||||
// Manage RunningInstance records for status transitions from shell scripts
|
||||
if (status === CrontabStatus.running) {
|
||||
// Create a new running instance record
|
||||
await RunningInstanceModel.create({
|
||||
cron_id: id,
|
||||
pid: pid || undefined,
|
||||
log_path: log_path || undefined,
|
||||
started_at: last_execution_time || dayjs().unix(),
|
||||
status: InstanceStatus.running,
|
||||
});
|
||||
} else if (status === CrontabStatus.idle) {
|
||||
// Mark the matching running instance as finished
|
||||
const finishedAt = dayjs().unix();
|
||||
await RunningInstanceModel.update(
|
||||
{
|
||||
finished_at: finishedAt,
|
||||
status: InstanceStatus.finished,
|
||||
},
|
||||
{
|
||||
where: {
|
||||
cron_id: id,
|
||||
pid: pid || undefined,
|
||||
status: InstanceStatus.running,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
await CrontabModel.update(
|
||||
{ ...pickBy(options, (v) => v === 0 || !!v) },
|
||||
{ where: { id } },
|
||||
@@ -499,12 +532,53 @@ export default class CronService {
|
||||
}
|
||||
}
|
||||
|
||||
// Mark all running instances as stopped
|
||||
const finishedAt = dayjs().unix();
|
||||
await RunningInstanceModel.update(
|
||||
{ status: InstanceStatus.stopped, finished_at: finishedAt },
|
||||
{ where: { cron_id: ids, status: InstanceStatus.running } },
|
||||
);
|
||||
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id: ids } },
|
||||
);
|
||||
}
|
||||
|
||||
public async stopInstance(instanceId: number) {
|
||||
const instance = await RunningInstanceModel.findOne({
|
||||
where: { id: instanceId, status: InstanceStatus.running },
|
||||
});
|
||||
if (!instance) {
|
||||
return { code: 400, message: '实例不存在或已停止' };
|
||||
}
|
||||
if (instance.pid) {
|
||||
try {
|
||||
await killTask(instance.pid);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`[panel][停止实例失败] 实例ID: ${instanceId}, PID: ${instance.pid}, 错误: ${error}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
await RunningInstanceModel.update(
|
||||
{ status: InstanceStatus.stopped, finished_at: dayjs().unix() },
|
||||
{ where: { id: instanceId } },
|
||||
);
|
||||
|
||||
// Check if there are still other running instances for this cron
|
||||
const otherRunning = await RunningInstanceModel.count({
|
||||
where: { cron_id: instance.cron_id, status: InstanceStatus.running },
|
||||
});
|
||||
if (otherRunning === 0) {
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id: instance.cron_id } },
|
||||
);
|
||||
}
|
||||
return { code: 200, message: '实例已停止' };
|
||||
}
|
||||
|
||||
private async runSingle(cronId: number): Promise<number | void> {
|
||||
return taskLimit.manualRunWithCronLimit(() => {
|
||||
return new Promise(async (resolve: any) => {
|
||||
@@ -543,6 +617,15 @@ export default class CronService {
|
||||
{ shell: '/bin/bash' },
|
||||
);
|
||||
|
||||
const startedAt = dayjs().unix();
|
||||
const instance = await RunningInstanceModel.create({
|
||||
cron_id: id!,
|
||||
pid: cp.pid,
|
||||
log_path: logPath,
|
||||
started_at: startedAt,
|
||||
status: InstanceStatus.running,
|
||||
});
|
||||
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.running, pid: cp.pid, log_path: logPath },
|
||||
{ where: { id } },
|
||||
@@ -574,10 +657,26 @@ export default class CronService {
|
||||
code,
|
||||
);
|
||||
await logStreamManager.closeStream(absolutePath);
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id } },
|
||||
const finishedAt = dayjs().unix();
|
||||
await RunningInstanceModel.update(
|
||||
{
|
||||
finished_at: finishedAt,
|
||||
status: code === 0 ? InstanceStatus.finished : InstanceStatus.error,
|
||||
exit_code: code ?? undefined,
|
||||
},
|
||||
{ where: { id: instance.id } },
|
||||
);
|
||||
|
||||
// Only set cron to idle if no other running instances exist
|
||||
const otherRunning = await RunningInstanceModel.count({
|
||||
where: { cron_id: id!, status: InstanceStatus.running },
|
||||
});
|
||||
if (otherRunning === 0) {
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
{ where: { id } },
|
||||
);
|
||||
}
|
||||
resolve({ ...params, pid: cp.pid, code });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,6 +4,11 @@ import Logger from '../loaders/logger';
|
||||
import { ICron } from '../protos/cron';
|
||||
import { CrontabModel, CrontabStatus } from '../data/cron';
|
||||
import { killTask } from '../config/util';
|
||||
import {
|
||||
RunningInstanceModel,
|
||||
InstanceStatus,
|
||||
} from '../data/runningInstance';
|
||||
import dayjs from 'dayjs';
|
||||
|
||||
export function runCron(cmd: string, cron: ICron): Promise<number | void> {
|
||||
return taskLimit.runWithCronLimit(cron, () => {
|
||||
@@ -29,6 +34,12 @@ export function runCron(cmd: string, cron: ICron): Promise<number | void> {
|
||||
`[schedule][停止已运行任务] 任务ID: ${cron.id}, PID: ${existingCron.pid}`,
|
||||
);
|
||||
await killTask(existingCron.pid);
|
||||
// Mark old running instances as stopped
|
||||
const stoppedAt = dayjs().unix();
|
||||
await RunningInstanceModel.update(
|
||||
{ status: InstanceStatus.stopped, finished_at: stoppedAt },
|
||||
{ where: { cron_id: Number(cron.id), status: InstanceStatus.running } },
|
||||
);
|
||||
// Update the status to idle after killing
|
||||
await CrontabModel.update(
|
||||
{ status: CrontabStatus.idle, pid: undefined },
|
||||
|
||||
Reference in New Issue
Block a user