mirror of
https://github.com/whyour/qinglong.git
synced 2025-11-10 00:26:09 +08:00
Add backend models, services, and API for Scenario Mode
Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
parent
bba6c9aa29
commit
712ff80448
|
|
@ -11,6 +11,7 @@ import system from './system';
|
|||
import subscription from './subscription';
|
||||
import update from './update';
|
||||
import health from './health';
|
||||
import scenario from './scenario';
|
||||
|
||||
export default () => {
|
||||
const app = Router();
|
||||
|
|
@ -26,6 +27,7 @@ export default () => {
|
|||
subscription(app);
|
||||
update(app);
|
||||
health(app);
|
||||
scenario(app);
|
||||
|
||||
return app;
|
||||
};
|
||||
|
|
|
|||
214
back/api/scenario.ts
Normal file
214
back/api/scenario.ts
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
import { Router, Request, Response, NextFunction } from 'express';
|
||||
import { Container } from 'typedi';
|
||||
import { Logger } from 'winston';
|
||||
import ScenarioService from '../services/scenario';
|
||||
import { celebrate, Joi } from 'celebrate';
|
||||
|
||||
const route = Router();
|
||||
|
||||
export default (app: Router) => {
|
||||
app.use('/scenarios', route);
|
||||
|
||||
// List all scenarios
|
||||
route.get(
|
||||
'/',
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const data = await scenarioService.list(req.query.searchValue as string);
|
||||
return res.send({ code: 200, data });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Create a new scenario
|
||||
route.post(
|
||||
'/',
|
||||
celebrate({
|
||||
body: Joi.object({
|
||||
name: Joi.string().required(),
|
||||
description: Joi.string().optional().allow(''),
|
||||
triggerType: Joi.string()
|
||||
.valid('variable', 'webhook', 'task_status', 'time', 'system_event')
|
||||
.required(),
|
||||
triggerConfig: Joi.object().optional(),
|
||||
conditionLogic: Joi.string().valid('AND', 'OR').default('AND'),
|
||||
conditions: Joi.array().optional().default([]),
|
||||
actions: Joi.array().required(),
|
||||
retryStrategy: Joi.object({
|
||||
maxRetries: Joi.number().min(0).max(10),
|
||||
retryDelay: Joi.number().min(1),
|
||||
backoffMultiplier: Joi.number().min(1).optional(),
|
||||
errorTypes: Joi.array().items(Joi.string()).optional(),
|
||||
}).optional(),
|
||||
failureThreshold: Joi.number().min(1).default(3),
|
||||
delayExecution: Joi.number().min(0).default(0),
|
||||
isEnabled: Joi.number().valid(0, 1).default(1),
|
||||
}),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const data = await scenarioService.create(req.body);
|
||||
return res.send({ code: 200, data });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Update a scenario
|
||||
route.put(
|
||||
'/',
|
||||
celebrate({
|
||||
body: Joi.object({
|
||||
id: Joi.number().required(),
|
||||
name: Joi.string().optional(),
|
||||
description: Joi.string().optional().allow(''),
|
||||
triggerType: Joi.string()
|
||||
.valid('variable', 'webhook', 'task_status', 'time', 'system_event')
|
||||
.optional(),
|
||||
triggerConfig: Joi.object().optional(),
|
||||
conditionLogic: Joi.string().valid('AND', 'OR').optional(),
|
||||
conditions: Joi.array().optional(),
|
||||
actions: Joi.array().optional(),
|
||||
retryStrategy: Joi.object({
|
||||
maxRetries: Joi.number().min(0).max(10),
|
||||
retryDelay: Joi.number().min(1),
|
||||
backoffMultiplier: Joi.number().min(1).optional(),
|
||||
errorTypes: Joi.array().items(Joi.string()).optional(),
|
||||
}).optional(),
|
||||
failureThreshold: Joi.number().min(1).optional(),
|
||||
delayExecution: Joi.number().min(0).optional(),
|
||||
isEnabled: Joi.number().valid(0, 1).optional(),
|
||||
}),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const data = await scenarioService.update(req.body);
|
||||
return res.send({ code: 200, data });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Delete scenarios
|
||||
route.delete(
|
||||
'/',
|
||||
celebrate({
|
||||
body: Joi.array().items(Joi.number().required()),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const data = await scenarioService.remove(req.body);
|
||||
return res.send({ code: 200, data });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Get scenario logs
|
||||
route.get(
|
||||
'/logs',
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const scenarioId = req.query.scenarioId
|
||||
? parseInt(req.query.scenarioId as string)
|
||||
: undefined;
|
||||
const limit = req.query.limit
|
||||
? parseInt(req.query.limit as string)
|
||||
: 100;
|
||||
const data = await scenarioService.getLogs(scenarioId, limit);
|
||||
return res.send({ code: 200, data });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Manually trigger a scenario
|
||||
route.post(
|
||||
'/:id/trigger',
|
||||
celebrate({
|
||||
params: Joi.object({
|
||||
id: Joi.number().required(),
|
||||
}),
|
||||
body: Joi.object().optional().default({}),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
await scenarioService.triggerScenario(
|
||||
parseInt(req.params.id),
|
||||
req.body,
|
||||
);
|
||||
return res.send({ code: 200, message: 'Scenario triggered successfully' });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Webhook endpoint for external triggers
|
||||
route.post(
|
||||
'/webhook/:token',
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const scenario = await scenarioService.findByWebhookToken(
|
||||
req.params.token,
|
||||
);
|
||||
|
||||
if (!scenario) {
|
||||
return res.status(404).send({ code: 404, message: 'Invalid webhook token' });
|
||||
}
|
||||
|
||||
await scenarioService.triggerScenario(scenario.id!, {
|
||||
...req.body,
|
||||
headers: req.headers,
|
||||
query: req.query,
|
||||
webhookTriggered: true,
|
||||
});
|
||||
|
||||
return res.send({ code: 200, message: 'Webhook received and scenario triggered' });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Get webhook URL for a scenario
|
||||
route.get(
|
||||
'/:id/webhook',
|
||||
celebrate({
|
||||
params: Joi.object({
|
||||
id: Joi.number().required(),
|
||||
}),
|
||||
}),
|
||||
async (req: Request, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
const scenarioService = Container.get(ScenarioService);
|
||||
const token = scenarioService.getWebhookToken(parseInt(req.params.id));
|
||||
|
||||
if (!token) {
|
||||
return res.status(404).send({
|
||||
code: 404,
|
||||
message: 'Webhook token not found. Ensure the scenario trigger type is webhook.'
|
||||
});
|
||||
}
|
||||
|
||||
const webhookUrl = `${req.protocol}://${req.get('host')}/api/scenarios/webhook/${token}`;
|
||||
return res.send({ code: 200, data: { token, webhookUrl } });
|
||||
} catch (e) {
|
||||
return next(e);
|
||||
}
|
||||
},
|
||||
);
|
||||
};
|
||||
117
back/data/scenario.ts
Normal file
117
back/data/scenario.ts
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
import { sequelize } from '.';
|
||||
import { DataTypes, Model } from 'sequelize';
|
||||
|
||||
export class Scenario {
|
||||
id?: number;
|
||||
name: string;
|
||||
description?: string;
|
||||
isEnabled?: 1 | 0;
|
||||
triggerType?: string; // 'variable' | 'webhook' | 'task_status' | 'time' | 'system_event'
|
||||
triggerConfig?: any; // JSON configuration for the trigger
|
||||
conditionLogic?: 'AND' | 'OR';
|
||||
conditions?: any[]; // Array of condition objects
|
||||
actions?: any[]; // Array of actions to execute
|
||||
retryStrategy?: {
|
||||
maxRetries: number;
|
||||
retryDelay: number; // in seconds
|
||||
backoffMultiplier?: number;
|
||||
errorTypes?: string[];
|
||||
};
|
||||
failureThreshold?: number; // Auto-disable after N consecutive failures
|
||||
consecutiveFailures?: number;
|
||||
delayExecution?: number; // Delay in seconds after trigger
|
||||
lastTriggeredAt?: Date;
|
||||
lastExecutedAt?: Date;
|
||||
executionCount?: number;
|
||||
failureCount?: number;
|
||||
successCount?: number;
|
||||
createdAt?: Date;
|
||||
updatedAt?: Date;
|
||||
|
||||
constructor(options: Scenario) {
|
||||
this.id = options.id;
|
||||
this.name = options.name;
|
||||
this.description = options.description;
|
||||
this.isEnabled = options.isEnabled ?? 1;
|
||||
this.triggerType = options.triggerType;
|
||||
this.triggerConfig = options.triggerConfig;
|
||||
this.conditionLogic = options.conditionLogic || 'AND';
|
||||
this.conditions = options.conditions || [];
|
||||
this.actions = options.actions || [];
|
||||
this.retryStrategy = options.retryStrategy;
|
||||
this.failureThreshold = options.failureThreshold || 3;
|
||||
this.consecutiveFailures = options.consecutiveFailures || 0;
|
||||
this.delayExecution = options.delayExecution || 0;
|
||||
this.lastTriggeredAt = options.lastTriggeredAt;
|
||||
this.lastExecutedAt = options.lastExecutedAt;
|
||||
this.executionCount = options.executionCount || 0;
|
||||
this.failureCount = options.failureCount || 0;
|
||||
this.successCount = options.successCount || 0;
|
||||
this.createdAt = options.createdAt;
|
||||
this.updatedAt = options.updatedAt;
|
||||
}
|
||||
}
|
||||
|
||||
export interface ScenarioInstance extends Model<Scenario, Scenario>, Scenario {}
|
||||
|
||||
export const ScenarioModel = sequelize.define<ScenarioInstance>('Scenario', {
|
||||
name: {
|
||||
type: DataTypes.STRING,
|
||||
allowNull: false,
|
||||
},
|
||||
description: DataTypes.TEXT,
|
||||
isEnabled: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 1,
|
||||
},
|
||||
triggerType: {
|
||||
type: DataTypes.STRING,
|
||||
allowNull: false,
|
||||
},
|
||||
triggerConfig: {
|
||||
type: DataTypes.JSON,
|
||||
allowNull: true,
|
||||
},
|
||||
conditionLogic: {
|
||||
type: DataTypes.STRING,
|
||||
defaultValue: 'AND',
|
||||
},
|
||||
conditions: {
|
||||
type: DataTypes.JSON,
|
||||
defaultValue: [],
|
||||
},
|
||||
actions: {
|
||||
type: DataTypes.JSON,
|
||||
defaultValue: [],
|
||||
},
|
||||
retryStrategy: {
|
||||
type: DataTypes.JSON,
|
||||
allowNull: true,
|
||||
},
|
||||
failureThreshold: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 3,
|
||||
},
|
||||
consecutiveFailures: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
delayExecution: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
lastTriggeredAt: DataTypes.DATE,
|
||||
lastExecutedAt: DataTypes.DATE,
|
||||
executionCount: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
failureCount: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
successCount: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
});
|
||||
58
back/data/scenarioLog.ts
Normal file
58
back/data/scenarioLog.ts
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
import { sequelize } from '.';
|
||||
import { DataTypes, Model } from 'sequelize';
|
||||
|
||||
export class ScenarioLog {
|
||||
id?: number;
|
||||
scenarioId: number;
|
||||
scenarioName?: string;
|
||||
triggerData?: any; // The data that triggered the scenario
|
||||
conditionsMatched?: boolean;
|
||||
executionStatus?: 'success' | 'failure' | 'partial';
|
||||
executionDetails?: any; // Details about actions executed
|
||||
errorMessage?: string;
|
||||
executionTime?: number; // Time taken in milliseconds
|
||||
retriesAttempted?: number;
|
||||
createdAt?: Date;
|
||||
|
||||
constructor(options: ScenarioLog) {
|
||||
this.id = options.id;
|
||||
this.scenarioId = options.scenarioId;
|
||||
this.scenarioName = options.scenarioName;
|
||||
this.triggerData = options.triggerData;
|
||||
this.conditionsMatched = options.conditionsMatched;
|
||||
this.executionStatus = options.executionStatus;
|
||||
this.executionDetails = options.executionDetails;
|
||||
this.errorMessage = options.errorMessage;
|
||||
this.executionTime = options.executionTime;
|
||||
this.retriesAttempted = options.retriesAttempted || 0;
|
||||
this.createdAt = options.createdAt;
|
||||
}
|
||||
}
|
||||
|
||||
export interface ScenarioLogInstance
|
||||
extends Model<ScenarioLog, ScenarioLog>,
|
||||
ScenarioLog {}
|
||||
|
||||
export const ScenarioLogModel = sequelize.define<ScenarioLogInstance>(
|
||||
'ScenarioLog',
|
||||
{
|
||||
scenarioId: {
|
||||
type: DataTypes.NUMBER,
|
||||
allowNull: false,
|
||||
},
|
||||
scenarioName: DataTypes.STRING,
|
||||
triggerData: DataTypes.JSON,
|
||||
conditionsMatched: DataTypes.BOOLEAN,
|
||||
executionStatus: {
|
||||
type: DataTypes.STRING,
|
||||
allowNull: false,
|
||||
},
|
||||
executionDetails: DataTypes.JSON,
|
||||
errorMessage: DataTypes.TEXT,
|
||||
executionTime: DataTypes.NUMBER,
|
||||
retriesAttempted: {
|
||||
type: DataTypes.NUMBER,
|
||||
defaultValue: 0,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
|
@ -6,6 +6,8 @@ import { AppModel } from '../data/open';
|
|||
import { SystemModel } from '../data/system';
|
||||
import { SubscriptionModel } from '../data/subscription';
|
||||
import { CrontabViewModel } from '../data/cronView';
|
||||
import { ScenarioModel } from '../data/scenario';
|
||||
import { ScenarioLogModel } from '../data/scenarioLog';
|
||||
import { sequelize } from '../data';
|
||||
|
||||
export default async () => {
|
||||
|
|
@ -17,6 +19,8 @@ export default async () => {
|
|||
await EnvModel.sync();
|
||||
await SubscriptionModel.sync();
|
||||
await CrontabViewModel.sync();
|
||||
await ScenarioModel.sync();
|
||||
await ScenarioLogModel.sync();
|
||||
|
||||
// 初始化新增字段
|
||||
try {
|
||||
|
|
|
|||
501
back/services/scenario.ts
Normal file
501
back/services/scenario.ts
Normal file
|
|
@ -0,0 +1,501 @@
|
|||
import { Service, Inject } from 'typedi';
|
||||
import winston from 'winston';
|
||||
import { Scenario, ScenarioModel } from '../data/scenario';
|
||||
import { ScenarioLog, ScenarioLogModel } from '../data/scenarioLog';
|
||||
import { Op } from 'sequelize';
|
||||
import CronService from './cron';
|
||||
import EnvService from './env';
|
||||
import dayjs from 'dayjs';
|
||||
import { exec } from 'child_process';
|
||||
import { promisify } from 'util';
|
||||
import config from '../config';
|
||||
import fs from 'fs/promises';
|
||||
import path from 'path';
|
||||
import chokidar from 'chokidar';
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
@Service()
|
||||
export default class ScenarioService {
|
||||
private watchers: Map<number, any> = new Map();
|
||||
private webhookTokens: Map<number, string> = new Map();
|
||||
|
||||
constructor(
|
||||
@Inject('logger') private logger: winston.Logger,
|
||||
private cronService: CronService,
|
||||
private envService: EnvService,
|
||||
) {
|
||||
this.initializeWatchers();
|
||||
}
|
||||
|
||||
private async initializeWatchers() {
|
||||
try {
|
||||
const scenarios = await this.list({ isEnabled: 1 });
|
||||
for (const scenario of scenarios) {
|
||||
if (scenario.triggerType === 'variable') {
|
||||
await this.setupVariableWatcher(scenario);
|
||||
} else if (scenario.triggerType === 'system_event') {
|
||||
await this.setupSystemEventWatcher(scenario);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to initialize scenario watchers:', error);
|
||||
}
|
||||
}
|
||||
|
||||
public async create(payload: Scenario): Promise<Scenario> {
|
||||
const scenario = await ScenarioModel.create(payload, { returning: true });
|
||||
|
||||
if (scenario.isEnabled === 1) {
|
||||
await this.enableScenarioTrigger(scenario);
|
||||
}
|
||||
|
||||
return scenario;
|
||||
}
|
||||
|
||||
public async update(payload: Partial<Scenario>): Promise<Scenario> {
|
||||
const oldScenario = await this.getDb({ id: payload.id });
|
||||
await ScenarioModel.update(payload, { where: { id: payload.id } });
|
||||
const newScenario = await this.getDb({ id: payload.id });
|
||||
|
||||
// Handle trigger changes
|
||||
if (oldScenario.isEnabled === 1) {
|
||||
await this.disableScenarioTrigger(oldScenario);
|
||||
}
|
||||
|
||||
if (newScenario.isEnabled === 1) {
|
||||
await this.enableScenarioTrigger(newScenario);
|
||||
}
|
||||
|
||||
return newScenario;
|
||||
}
|
||||
|
||||
public async remove(ids: number[]): Promise<number> {
|
||||
for (const id of ids) {
|
||||
const scenario = await this.getDb({ id });
|
||||
if (scenario) {
|
||||
await this.disableScenarioTrigger(scenario);
|
||||
}
|
||||
}
|
||||
return await ScenarioModel.destroy({ where: { id: ids } });
|
||||
}
|
||||
|
||||
public async list(
|
||||
searchText?: string | { isEnabled: number },
|
||||
): Promise<Scenario[]> {
|
||||
let where: any = {};
|
||||
|
||||
if (typeof searchText === 'string') {
|
||||
where = {
|
||||
[Op.or]: [
|
||||
{ name: { [Op.like]: `%${searchText}%` } },
|
||||
{ description: { [Op.like]: `%${searchText}%` } },
|
||||
],
|
||||
};
|
||||
} else if (typeof searchText === 'object') {
|
||||
where = searchText;
|
||||
}
|
||||
|
||||
const result = await ScenarioModel.findAll({
|
||||
where,
|
||||
order: [['createdAt', 'DESC']],
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
public async getDb(query: any): Promise<Scenario> {
|
||||
const doc: any = await ScenarioModel.findOne({ where: { ...query } });
|
||||
return doc && (doc.get({ plain: true }) as Scenario);
|
||||
}
|
||||
|
||||
private async enableScenarioTrigger(scenario: Scenario) {
|
||||
switch (scenario.triggerType) {
|
||||
case 'variable':
|
||||
await this.setupVariableWatcher(scenario);
|
||||
break;
|
||||
case 'webhook':
|
||||
this.setupWebhookTrigger(scenario);
|
||||
break;
|
||||
case 'time':
|
||||
await this.setupTimeTrigger(scenario);
|
||||
break;
|
||||
case 'system_event':
|
||||
await this.setupSystemEventWatcher(scenario);
|
||||
break;
|
||||
case 'task_status':
|
||||
// Task status triggers are handled in the cron execution flow
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async disableScenarioTrigger(scenario: Scenario) {
|
||||
if (scenario.triggerType === 'variable' || scenario.triggerType === 'system_event') {
|
||||
const watcher = this.watchers.get(scenario.id!);
|
||||
if (watcher) {
|
||||
await watcher.close();
|
||||
this.watchers.delete(scenario.id!);
|
||||
}
|
||||
} else if (scenario.triggerType === 'webhook') {
|
||||
this.webhookTokens.delete(scenario.id!);
|
||||
} else if (scenario.triggerType === 'time') {
|
||||
// Remove time trigger (would need to cancel scheduled task)
|
||||
}
|
||||
}
|
||||
|
||||
private async setupVariableWatcher(scenario: Scenario) {
|
||||
if (!scenario.triggerConfig || !scenario.triggerConfig.watchPath) {
|
||||
return;
|
||||
}
|
||||
|
||||
const watchPath = scenario.triggerConfig.watchPath;
|
||||
const watcher = chokidar.watch(watchPath, {
|
||||
persistent: true,
|
||||
ignoreInitial: true,
|
||||
});
|
||||
|
||||
watcher.on('change', async (filePath) => {
|
||||
this.logger.info(
|
||||
`Variable change detected for scenario ${scenario.name}: ${filePath}`,
|
||||
);
|
||||
await this.triggerScenario(scenario.id!, { filePath, type: 'change' });
|
||||
});
|
||||
|
||||
this.watchers.set(scenario.id!, watcher);
|
||||
}
|
||||
|
||||
private setupWebhookTrigger(scenario: Scenario) {
|
||||
// Generate a unique token for this webhook
|
||||
const token = scenario.triggerConfig?.token || this.generateWebhookToken();
|
||||
this.webhookTokens.set(scenario.id!, token);
|
||||
}
|
||||
|
||||
private async setupTimeTrigger(scenario: Scenario) {
|
||||
// This would integrate with the existing cron system
|
||||
// For now, we'll create a cron job that triggers the scenario
|
||||
if (scenario.triggerConfig && scenario.triggerConfig.schedule) {
|
||||
// Would create a cron entry that calls triggerScenario
|
||||
}
|
||||
}
|
||||
|
||||
private async setupSystemEventWatcher(scenario: Scenario) {
|
||||
const eventType = scenario.triggerConfig?.eventType;
|
||||
|
||||
if (eventType === 'disk_space' || eventType === 'memory') {
|
||||
// Set up periodic checks
|
||||
const interval = scenario.triggerConfig?.checkInterval || 60000; // Default 1 minute
|
||||
|
||||
const checkSystem = async () => {
|
||||
const metrics = await this.getSystemMetrics();
|
||||
const threshold = scenario.triggerConfig?.threshold;
|
||||
|
||||
let shouldTrigger = false;
|
||||
if (eventType === 'disk_space' && metrics.diskUsagePercent > threshold) {
|
||||
shouldTrigger = true;
|
||||
} else if (eventType === 'memory' && metrics.memoryUsagePercent > threshold) {
|
||||
shouldTrigger = true;
|
||||
}
|
||||
|
||||
if (shouldTrigger) {
|
||||
await this.triggerScenario(scenario.id!, metrics);
|
||||
}
|
||||
};
|
||||
|
||||
const timer = setInterval(checkSystem, interval);
|
||||
this.watchers.set(scenario.id!, { close: () => clearInterval(timer) });
|
||||
}
|
||||
}
|
||||
|
||||
private async getSystemMetrics() {
|
||||
try {
|
||||
// Get disk usage
|
||||
const { stdout: diskOutput } = await execAsync("df -h / | tail -1 | awk '{print $5}' | sed 's/%//'");
|
||||
const diskUsagePercent = parseInt(diskOutput.trim());
|
||||
|
||||
// Get memory usage
|
||||
const { stdout: memOutput } = await execAsync("free | grep Mem | awk '{print ($3/$2) * 100.0}'");
|
||||
const memoryUsagePercent = parseFloat(memOutput.trim());
|
||||
|
||||
return { diskUsagePercent, memoryUsagePercent };
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to get system metrics:', error);
|
||||
return { diskUsagePercent: 0, memoryUsagePercent: 0 };
|
||||
}
|
||||
}
|
||||
|
||||
private generateWebhookToken(): string {
|
||||
return Math.random().toString(36).substring(2, 15) +
|
||||
Math.random().toString(36).substring(2, 15);
|
||||
}
|
||||
|
||||
public async triggerScenario(
|
||||
scenarioId: number,
|
||||
triggerData: any,
|
||||
): Promise<void> {
|
||||
const scenario = await this.getDb({ id: scenarioId });
|
||||
|
||||
if (!scenario || scenario.isEnabled !== 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update last triggered time
|
||||
await ScenarioModel.update(
|
||||
{ lastTriggeredAt: new Date() },
|
||||
{ where: { id: scenarioId } },
|
||||
);
|
||||
|
||||
// Check if circuit breaker is triggered
|
||||
if (
|
||||
scenario.consecutiveFailures &&
|
||||
scenario.failureThreshold &&
|
||||
scenario.consecutiveFailures >= scenario.failureThreshold
|
||||
) {
|
||||
this.logger.warn(
|
||||
`Scenario ${scenario.name} is disabled due to consecutive failures`,
|
||||
);
|
||||
await ScenarioModel.update(
|
||||
{ isEnabled: 0 },
|
||||
{ where: { id: scenarioId } },
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply delay if configured
|
||||
if (scenario.delayExecution && scenario.delayExecution > 0) {
|
||||
setTimeout(
|
||||
() => this.executeScenario(scenario, triggerData),
|
||||
scenario.delayExecution * 1000,
|
||||
);
|
||||
} else {
|
||||
await this.executeScenario(scenario, triggerData);
|
||||
}
|
||||
}
|
||||
|
||||
private async executeScenario(
|
||||
scenario: Scenario,
|
||||
triggerData: any,
|
||||
retryCount: number = 0,
|
||||
): Promise<void> {
|
||||
const startTime = Date.now();
|
||||
const log: Partial<ScenarioLog> = {
|
||||
scenarioId: scenario.id!,
|
||||
scenarioName: scenario.name,
|
||||
triggerData,
|
||||
retriesAttempted: retryCount,
|
||||
};
|
||||
|
||||
try {
|
||||
// Evaluate conditions
|
||||
const conditionsMatched = await this.evaluateConditions(
|
||||
scenario.conditions || [],
|
||||
scenario.conditionLogic || 'AND',
|
||||
triggerData,
|
||||
);
|
||||
|
||||
log.conditionsMatched = conditionsMatched;
|
||||
|
||||
if (!conditionsMatched) {
|
||||
log.executionStatus = 'success';
|
||||
log.executionDetails = { message: 'Conditions not matched, skipped' };
|
||||
await this.createLog(log);
|
||||
return;
|
||||
}
|
||||
|
||||
// Execute actions
|
||||
const actionResults = await this.executeActions(scenario.actions || []);
|
||||
|
||||
log.executionStatus = 'success';
|
||||
log.executionDetails = actionResults;
|
||||
log.executionTime = Date.now() - startTime;
|
||||
|
||||
// Update scenario stats
|
||||
await ScenarioModel.update(
|
||||
{
|
||||
lastExecutedAt: new Date(),
|
||||
executionCount: (scenario.executionCount || 0) + 1,
|
||||
successCount: (scenario.successCount || 0) + 1,
|
||||
consecutiveFailures: 0,
|
||||
},
|
||||
{ where: { id: scenario.id } },
|
||||
);
|
||||
|
||||
await this.createLog(log);
|
||||
} catch (error: any) {
|
||||
log.executionStatus = 'failure';
|
||||
log.errorMessage = error.message;
|
||||
log.executionTime = Date.now() - startTime;
|
||||
|
||||
this.logger.error(`Scenario ${scenario.name} execution failed:`, error);
|
||||
|
||||
// Handle retry logic
|
||||
const shouldRetry = scenario.retryStrategy &&
|
||||
retryCount < (scenario.retryStrategy.maxRetries || 0);
|
||||
|
||||
if (shouldRetry) {
|
||||
const delay = this.calculateRetryDelay(scenario.retryStrategy!, retryCount);
|
||||
setTimeout(
|
||||
() => this.executeScenario(scenario, triggerData, retryCount + 1),
|
||||
delay,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update failure stats
|
||||
await ScenarioModel.update(
|
||||
{
|
||||
lastExecutedAt: new Date(),
|
||||
executionCount: (scenario.executionCount || 0) + 1,
|
||||
failureCount: (scenario.failureCount || 0) + 1,
|
||||
consecutiveFailures: (scenario.consecutiveFailures || 0) + 1,
|
||||
},
|
||||
{ where: { id: scenario.id } },
|
||||
);
|
||||
|
||||
await this.createLog(log);
|
||||
}
|
||||
}
|
||||
|
||||
private async evaluateConditions(
|
||||
conditions: any[],
|
||||
logic: 'AND' | 'OR',
|
||||
triggerData: any,
|
||||
): Promise<boolean> {
|
||||
if (conditions.length === 0) {
|
||||
return true; // No conditions means always execute
|
||||
}
|
||||
|
||||
const results = await Promise.all(
|
||||
conditions.map((condition) => this.evaluateCondition(condition, triggerData)),
|
||||
);
|
||||
|
||||
if (logic === 'AND') {
|
||||
return results.every((r) => r);
|
||||
} else {
|
||||
return results.some((r) => r);
|
||||
}
|
||||
}
|
||||
|
||||
private async evaluateCondition(
|
||||
condition: any,
|
||||
triggerData: any,
|
||||
): Promise<boolean> {
|
||||
// Simple condition evaluation
|
||||
// condition format: { field: string, operator: string, value: any }
|
||||
const { field, operator, value } = condition;
|
||||
const actualValue = this.getFieldValue(triggerData, field);
|
||||
|
||||
switch (operator) {
|
||||
case 'equals':
|
||||
return actualValue === value;
|
||||
case 'not_equals':
|
||||
return actualValue !== value;
|
||||
case 'greater_than':
|
||||
return actualValue > value;
|
||||
case 'less_than':
|
||||
return actualValue < value;
|
||||
case 'contains':
|
||||
return String(actualValue).includes(String(value));
|
||||
case 'not_contains':
|
||||
return !String(actualValue).includes(String(value));
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private getFieldValue(data: any, field: string): any {
|
||||
const parts = field.split('.');
|
||||
let value = data;
|
||||
for (const part of parts) {
|
||||
if (value && typeof value === 'object') {
|
||||
value = value[part];
|
||||
} else {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
private async executeActions(actions: any[]): Promise<any[]> {
|
||||
const results = [];
|
||||
|
||||
for (const action of actions) {
|
||||
try {
|
||||
const result = await this.executeAction(action);
|
||||
results.push({ action: action.type, success: true, result });
|
||||
} catch (error: any) {
|
||||
results.push({ action: action.type, success: false, error: error.message });
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private async executeAction(action: any): Promise<any> {
|
||||
switch (action.type) {
|
||||
case 'run_task':
|
||||
// Execute a cron task
|
||||
if (action.cronId) {
|
||||
return await this.cronService.run([action.cronId]);
|
||||
}
|
||||
break;
|
||||
case 'set_variable':
|
||||
// Set an environment variable
|
||||
if (action.name && action.value !== undefined) {
|
||||
return await this.envService.create([{
|
||||
name: action.name,
|
||||
value: action.value,
|
||||
remarks: `Set by scenario: ${action.scenarioName || 'unknown'}`,
|
||||
}]);
|
||||
}
|
||||
break;
|
||||
case 'send_notification':
|
||||
// Would integrate with notification service
|
||||
this.logger.info(`Notification: ${action.message}`);
|
||||
break;
|
||||
case 'execute_command':
|
||||
// Execute a shell command
|
||||
if (action.command) {
|
||||
const { stdout, stderr } = await execAsync(action.command);
|
||||
return { stdout, stderr };
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unknown action type: ${action.type}`);
|
||||
}
|
||||
}
|
||||
|
||||
private calculateRetryDelay(
|
||||
retryStrategy: any,
|
||||
retryCount: number,
|
||||
): number {
|
||||
const baseDelay = retryStrategy.retryDelay || 5;
|
||||
const multiplier = retryStrategy.backoffMultiplier || 1;
|
||||
return baseDelay * 1000 * Math.pow(multiplier, retryCount);
|
||||
}
|
||||
|
||||
private async createLog(log: Partial<ScenarioLog>): Promise<void> {
|
||||
await ScenarioLogModel.create(log);
|
||||
}
|
||||
|
||||
public async getLogs(scenarioId?: number, limit: number = 100): Promise<ScenarioLog[]> {
|
||||
const where = scenarioId ? { scenarioId } : {};
|
||||
const logs = await ScenarioLogModel.findAll({
|
||||
where,
|
||||
order: [['createdAt', 'DESC']],
|
||||
limit,
|
||||
});
|
||||
return logs;
|
||||
}
|
||||
|
||||
public getWebhookToken(scenarioId: number): string | undefined {
|
||||
return this.webhookTokens.get(scenarioId);
|
||||
}
|
||||
|
||||
public async findByWebhookToken(token: string): Promise<Scenario | null> {
|
||||
for (const [scenarioId, webhookToken] of this.webhookTokens.entries()) {
|
||||
if (webhookToken === token) {
|
||||
return await this.getDb({ id: scenarioId });
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user