diff --git a/back/services/graphExecutor.ts b/back/services/graphExecutor.ts new file mode 100644 index 00000000..9efb452f --- /dev/null +++ b/back/services/graphExecutor.ts @@ -0,0 +1,348 @@ +// Graph Execution Engine for Visual Workflow +import winston from 'winston'; + +interface WorkflowNode { + id: string; + type: string; + position: { x: number; y: number }; + data: any; +} + +interface WorkflowEdge { + id: string; + source: string; + target: string; +} + +interface WorkflowGraph { + nodes: WorkflowNode[]; + edges: WorkflowEdge[]; +} + +interface ExecutionContext { + triggerData: any; + variables: Map; + executedNodes: Set; + results: Map; +} + +export class GraphExecutor { + private logger: winston.Logger; + + constructor(logger: winston.Logger) { + this.logger = logger; + } + + /** + * Execute a workflow graph + */ + public async executeGraph( + workflowGraph: WorkflowGraph, + triggerData: any, + executor: any, // ScenarioService instance + ): Promise<{ success: boolean; results: any; executedNodes: string[] }> { + const context: ExecutionContext = { + triggerData, + variables: new Map(), + executedNodes: new Set(), + results: new Map(), + }; + + try { + // Find trigger nodes (entry points) + const triggerNodes = workflowGraph.nodes.filter( + (node) => node.type === 'trigger', + ); + + if (triggerNodes.length === 0) { + throw new Error('No trigger node found in workflow'); + } + + // Execute from each trigger node + for (const triggerNode of triggerNodes) { + await this.executeNode( + triggerNode, + workflowGraph, + context, + executor, + ); + } + + return { + success: true, + results: Object.fromEntries(context.results), + executedNodes: Array.from(context.executedNodes), + }; + } catch (error: any) { + this.logger.error('Graph execution failed:', error); + return { + success: false, + results: { error: error.message }, + executedNodes: Array.from(context.executedNodes), + }; + } + } + + /** + * Execute a single node and its connected nodes + */ + private async executeNode( + node: WorkflowNode, + graph: WorkflowGraph, + context: ExecutionContext, + executor: any, + ): Promise { + // Skip if already executed + if (context.executedNodes.has(node.id)) { + return context.results.get(node.id); + } + + this.logger.info(`Executing node: ${node.id} (${node.type})`); + context.executedNodes.add(node.id); + + let result: any = null; + + try { + // Execute based on node type + switch (node.type) { + case 'trigger': + result = await this.executeTriggerNode(node, context, executor); + break; + case 'condition': + result = await this.executeConditionNode(node, context, executor); + break; + case 'action': + result = await this.executeActionNode(node, context, executor); + break; + case 'control': + result = await this.executeControlNode(node, context, executor); + break; + case 'logic_gate': + result = await this.executeLogicGateNode(node, context, executor); + break; + default: + this.logger.warn(`Unknown node type: ${node.type}`); + result = { skipped: true }; + } + + context.results.set(node.id, result); + + // Find and execute next nodes + const nextEdges = graph.edges.filter((edge) => edge.source === node.id); + + for (const edge of nextEdges) { + const nextNode = graph.nodes.find((n) => n.id === edge.target); + if (nextNode) { + await this.executeNode(nextNode, graph, context, executor); + } + } + + return result; + } catch (error: any) { + this.logger.error(`Node execution failed: ${node.id}`, error); + context.results.set(node.id, { error: error.message }); + throw error; + } + } + + private async executeTriggerNode( + node: WorkflowNode, + context: ExecutionContext, + executor: any, + ): Promise { + // Trigger nodes just pass through the trigger data + return { triggered: true, data: context.triggerData }; + } + + private async executeConditionNode( + node: WorkflowNode, + context: ExecutionContext, + executor: any, + ): Promise { + const { field, operator, value } = node.data; + + // Get field value from trigger data or variables + let fieldValue = this.getFieldValue(field, context); + + // Evaluate condition + const matched = this.evaluateCondition(fieldValue, operator, value); + + this.logger.info( + `Condition ${node.id}: ${field} ${operator} ${value} = ${matched}`, + ); + + return { matched, fieldValue, expectedValue: value }; + } + + private async executeActionNode( + node: WorkflowNode, + context: ExecutionContext, + executor: any, + ): Promise { + const { actionType } = node.data; + + switch (actionType) { + case 'run_task': + return await executor.executeRunTask(node.data); + case 'set_variable': + return await executor.executeSetVariable(node.data); + case 'execute_command': + return await executor.executeCommand(node.data); + case 'send_notification': + return await executor.executeSendNotification(node.data); + default: + throw new Error(`Unknown action type: ${actionType}`); + } + } + + private async executeControlNode( + node: WorkflowNode, + context: ExecutionContext, + executor: any, + ): Promise { + const { controlType } = node.data; + + switch (controlType) { + case 'delay': + const delayMs = (node.data.delaySeconds || 0) * 1000; + await new Promise((resolve) => setTimeout(resolve, delayMs)); + return { delayed: delayMs }; + case 'retry': + // Retry logic handled by caller + return { retryConfig: node.data }; + case 'circuit_breaker': + // Circuit breaker logic handled by caller + return { circuitBreakerConfig: node.data }; + default: + throw new Error(`Unknown control type: ${controlType}`); + } + } + + private async executeLogicGateNode( + node: WorkflowNode, + context: ExecutionContext, + executor: any, + ): Promise { + const { gateType } = node.data; + // Logic gates are evaluated by checking incoming edges + return { gateType, passed: true }; + } + + private getFieldValue(field: string, context: ExecutionContext): any { + // Support dot notation for nested fields + const parts = field.split('.'); + let value: any = context.triggerData; + + for (const part of parts) { + if (value && typeof value === 'object' && part in value) { + value = value[part]; + } else { + // Check variables + if (context.variables.has(field)) { + return context.variables.get(field); + } + return undefined; + } + } + + return value; + } + + private evaluateCondition( + fieldValue: any, + operator: string, + expectedValue: any, + ): boolean { + switch (operator) { + case 'equals': + return fieldValue == expectedValue; + case 'not_equals': + return fieldValue != expectedValue; + case 'greater_than': + return Number(fieldValue) > Number(expectedValue); + case 'less_than': + return Number(fieldValue) < Number(expectedValue); + case 'contains': + return String(fieldValue).includes(String(expectedValue)); + case 'not_contains': + return !String(fieldValue).includes(String(expectedValue)); + default: + this.logger.warn(`Unknown operator: ${operator}`); + return false; + } + } + + /** + * Validate a workflow graph + */ + public validateGraph(workflowGraph: WorkflowGraph): { + valid: boolean; + errors: string[]; + } { + const errors: string[] = []; + + // Check for at least one trigger node + const triggerNodes = workflowGraph.nodes.filter( + (node) => node.type === 'trigger', + ); + if (triggerNodes.length === 0) { + errors.push('Workflow must have at least one trigger node'); + } + + // Check for cycles (simple check) + const visited = new Set(); + const recursionStack = new Set(); + + const hasCycle = (nodeId: string): boolean => { + visited.add(nodeId); + recursionStack.add(nodeId); + + const outgoingEdges = workflowGraph.edges.filter( + (edge) => edge.source === nodeId, + ); + + for (const edge of outgoingEdges) { + if (!visited.has(edge.target)) { + if (hasCycle(edge.target)) { + return true; + } + } else if (recursionStack.has(edge.target)) { + return true; + } + } + + recursionStack.delete(nodeId); + return false; + }; + + for (const node of workflowGraph.nodes) { + if (!visited.has(node.id) && hasCycle(node.id)) { + errors.push('Workflow contains cycles'); + break; + } + } + + // Check for disconnected nodes (excluding triggers) + const connectedNodes = new Set(); + workflowGraph.edges.forEach((edge) => { + connectedNodes.add(edge.source); + connectedNodes.add(edge.target); + }); + + const disconnectedNodes = workflowGraph.nodes.filter( + (node) => + node.type !== 'trigger' && !connectedNodes.has(node.id), + ); + + if (disconnectedNodes.length > 0) { + errors.push( + `Disconnected nodes found: ${disconnectedNodes.map((n) => n.id).join(', ')}`, + ); + } + + return { + valid: errors.length === 0, + errors, + }; + } +} diff --git a/back/services/scenario.ts b/back/services/scenario.ts index b31f6936..5e0f8b43 100644 --- a/back/services/scenario.ts +++ b/back/services/scenario.ts @@ -12,6 +12,7 @@ import config from '../config'; import fs from 'fs/promises'; import path from 'path'; import chokidar from 'chokidar'; +import { GraphExecutor } from './graphExecutor'; const execAsync = promisify(exec); @@ -19,12 +20,14 @@ const execAsync = promisify(exec); export default class ScenarioService { private watchers: Map = new Map(); private webhookTokens: Map = new Map(); + private graphExecutor: GraphExecutor; constructor( @Inject('logger') private logger: winston.Logger, private cronService: CronService, private envService: EnvService, ) { + this.graphExecutor = new GraphExecutor(logger); this.initializeWatchers(); } @@ -284,29 +287,49 @@ export default class ScenarioService { }; try { - // Evaluate conditions - const conditionsMatched = await this.evaluateConditions( - scenario.conditions || [], - scenario.conditionLogic || 'AND', - triggerData, - ); - - log.conditionsMatched = conditionsMatched; + // Check if this is a graph-based workflow + if (scenario.workflowGraph && scenario.workflowGraph.nodes && scenario.workflowGraph.nodes.length > 0) { + // Execute using graph executor + const result = await this.graphExecutor.executeGraph( + scenario.workflowGraph, + triggerData, + this, + ); - if (!conditionsMatched) { + log.conditionsMatched = true; // Graph execution handles conditions internally + log.executionStatus = result.success ? 'success' : 'failure'; + log.executionDetails = result.results; + log.executionTime = Date.now() - startTime; + + if (!result.success) { + throw new Error(JSON.stringify(result.results)); + } + } else { + // Legacy execution path for form-based scenarios + // Evaluate conditions + const conditionsMatched = await this.evaluateConditions( + scenario.conditions || [], + scenario.conditionLogic || 'AND', + triggerData, + ); + + log.conditionsMatched = conditionsMatched; + + if (!conditionsMatched) { + log.executionStatus = 'success'; + log.executionDetails = { message: 'Conditions not matched, skipped' }; + await this.createLog(log); + return; + } + + // Execute actions + const actionResults = await this.executeActions(scenario.actions || []); + log.executionStatus = 'success'; - log.executionDetails = { message: 'Conditions not matched, skipped' }; - await this.createLog(log); - return; + log.executionDetails = actionResults; + log.executionTime = Date.now() - startTime; } - // Execute actions - const actionResults = await this.executeActions(scenario.actions || []); - - log.executionStatus = 'success'; - log.executionDetails = actionResults; - log.executionTime = Date.now() - startTime; - // Update scenario stats await ScenarioModel.update( { @@ -498,4 +521,37 @@ export default class ScenarioService { } return null; } + + // Public methods for graph executor + public async executeRunTask(data: any): Promise { + if (data.cronId) { + return await this.cronService.run([data.cronId]); + } + throw new Error('cronId is required for run_task action'); + } + + public async executeSetVariable(data: any): Promise { + if (data.name && data.value !== undefined) { + return await this.envService.create([{ + name: data.name, + value: data.value, + remarks: `Set by scenario workflow`, + }]); + } + throw new Error('name and value are required for set_variable action'); + } + + public async executeCommand(data: any): Promise { + if (data.command) { + const { stdout, stderr } = await execAsync(data.command); + return { stdout, stderr }; + } + throw new Error('command is required for execute_command action'); + } + + public async executeSendNotification(data: any): Promise { + // Log notification for now - can be extended to integrate with notification service + this.logger.info(`Notification: ${data.message || 'No message'}`); + return { sent: true, message: data.message }; + } }