Implement graph execution engine for visual workflows

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2025-11-09 13:42:12 +00:00
parent 5ed2e5b809
commit 2c5357eedc
2 changed files with 423 additions and 19 deletions

View File

@ -0,0 +1,348 @@
// Graph Execution Engine for Visual Workflow
import winston from 'winston';
interface WorkflowNode {
id: string;
type: string;
position: { x: number; y: number };
data: any;
}
interface WorkflowEdge {
id: string;
source: string;
target: string;
}
interface WorkflowGraph {
nodes: WorkflowNode[];
edges: WorkflowEdge[];
}
interface ExecutionContext {
triggerData: any;
variables: Map<string, any>;
executedNodes: Set<string>;
results: Map<string, any>;
}
export class GraphExecutor {
private logger: winston.Logger;
constructor(logger: winston.Logger) {
this.logger = logger;
}
/**
* Execute a workflow graph
*/
public async executeGraph(
workflowGraph: WorkflowGraph,
triggerData: any,
executor: any, // ScenarioService instance
): Promise<{ success: boolean; results: any; executedNodes: string[] }> {
const context: ExecutionContext = {
triggerData,
variables: new Map(),
executedNodes: new Set(),
results: new Map(),
};
try {
// Find trigger nodes (entry points)
const triggerNodes = workflowGraph.nodes.filter(
(node) => node.type === 'trigger',
);
if (triggerNodes.length === 0) {
throw new Error('No trigger node found in workflow');
}
// Execute from each trigger node
for (const triggerNode of triggerNodes) {
await this.executeNode(
triggerNode,
workflowGraph,
context,
executor,
);
}
return {
success: true,
results: Object.fromEntries(context.results),
executedNodes: Array.from(context.executedNodes),
};
} catch (error: any) {
this.logger.error('Graph execution failed:', error);
return {
success: false,
results: { error: error.message },
executedNodes: Array.from(context.executedNodes),
};
}
}
/**
* Execute a single node and its connected nodes
*/
private async executeNode(
node: WorkflowNode,
graph: WorkflowGraph,
context: ExecutionContext,
executor: any,
): Promise<any> {
// Skip if already executed
if (context.executedNodes.has(node.id)) {
return context.results.get(node.id);
}
this.logger.info(`Executing node: ${node.id} (${node.type})`);
context.executedNodes.add(node.id);
let result: any = null;
try {
// Execute based on node type
switch (node.type) {
case 'trigger':
result = await this.executeTriggerNode(node, context, executor);
break;
case 'condition':
result = await this.executeConditionNode(node, context, executor);
break;
case 'action':
result = await this.executeActionNode(node, context, executor);
break;
case 'control':
result = await this.executeControlNode(node, context, executor);
break;
case 'logic_gate':
result = await this.executeLogicGateNode(node, context, executor);
break;
default:
this.logger.warn(`Unknown node type: ${node.type}`);
result = { skipped: true };
}
context.results.set(node.id, result);
// Find and execute next nodes
const nextEdges = graph.edges.filter((edge) => edge.source === node.id);
for (const edge of nextEdges) {
const nextNode = graph.nodes.find((n) => n.id === edge.target);
if (nextNode) {
await this.executeNode(nextNode, graph, context, executor);
}
}
return result;
} catch (error: any) {
this.logger.error(`Node execution failed: ${node.id}`, error);
context.results.set(node.id, { error: error.message });
throw error;
}
}
private async executeTriggerNode(
node: WorkflowNode,
context: ExecutionContext,
executor: any,
): Promise<any> {
// Trigger nodes just pass through the trigger data
return { triggered: true, data: context.triggerData };
}
private async executeConditionNode(
node: WorkflowNode,
context: ExecutionContext,
executor: any,
): Promise<any> {
const { field, operator, value } = node.data;
// Get field value from trigger data or variables
let fieldValue = this.getFieldValue(field, context);
// Evaluate condition
const matched = this.evaluateCondition(fieldValue, operator, value);
this.logger.info(
`Condition ${node.id}: ${field} ${operator} ${value} = ${matched}`,
);
return { matched, fieldValue, expectedValue: value };
}
private async executeActionNode(
node: WorkflowNode,
context: ExecutionContext,
executor: any,
): Promise<any> {
const { actionType } = node.data;
switch (actionType) {
case 'run_task':
return await executor.executeRunTask(node.data);
case 'set_variable':
return await executor.executeSetVariable(node.data);
case 'execute_command':
return await executor.executeCommand(node.data);
case 'send_notification':
return await executor.executeSendNotification(node.data);
default:
throw new Error(`Unknown action type: ${actionType}`);
}
}
private async executeControlNode(
node: WorkflowNode,
context: ExecutionContext,
executor: any,
): Promise<any> {
const { controlType } = node.data;
switch (controlType) {
case 'delay':
const delayMs = (node.data.delaySeconds || 0) * 1000;
await new Promise((resolve) => setTimeout(resolve, delayMs));
return { delayed: delayMs };
case 'retry':
// Retry logic handled by caller
return { retryConfig: node.data };
case 'circuit_breaker':
// Circuit breaker logic handled by caller
return { circuitBreakerConfig: node.data };
default:
throw new Error(`Unknown control type: ${controlType}`);
}
}
private async executeLogicGateNode(
node: WorkflowNode,
context: ExecutionContext,
executor: any,
): Promise<any> {
const { gateType } = node.data;
// Logic gates are evaluated by checking incoming edges
return { gateType, passed: true };
}
private getFieldValue(field: string, context: ExecutionContext): any {
// Support dot notation for nested fields
const parts = field.split('.');
let value: any = context.triggerData;
for (const part of parts) {
if (value && typeof value === 'object' && part in value) {
value = value[part];
} else {
// Check variables
if (context.variables.has(field)) {
return context.variables.get(field);
}
return undefined;
}
}
return value;
}
private evaluateCondition(
fieldValue: any,
operator: string,
expectedValue: any,
): boolean {
switch (operator) {
case 'equals':
return fieldValue == expectedValue;
case 'not_equals':
return fieldValue != expectedValue;
case 'greater_than':
return Number(fieldValue) > Number(expectedValue);
case 'less_than':
return Number(fieldValue) < Number(expectedValue);
case 'contains':
return String(fieldValue).includes(String(expectedValue));
case 'not_contains':
return !String(fieldValue).includes(String(expectedValue));
default:
this.logger.warn(`Unknown operator: ${operator}`);
return false;
}
}
/**
* Validate a workflow graph
*/
public validateGraph(workflowGraph: WorkflowGraph): {
valid: boolean;
errors: string[];
} {
const errors: string[] = [];
// Check for at least one trigger node
const triggerNodes = workflowGraph.nodes.filter(
(node) => node.type === 'trigger',
);
if (triggerNodes.length === 0) {
errors.push('Workflow must have at least one trigger node');
}
// Check for cycles (simple check)
const visited = new Set<string>();
const recursionStack = new Set<string>();
const hasCycle = (nodeId: string): boolean => {
visited.add(nodeId);
recursionStack.add(nodeId);
const outgoingEdges = workflowGraph.edges.filter(
(edge) => edge.source === nodeId,
);
for (const edge of outgoingEdges) {
if (!visited.has(edge.target)) {
if (hasCycle(edge.target)) {
return true;
}
} else if (recursionStack.has(edge.target)) {
return true;
}
}
recursionStack.delete(nodeId);
return false;
};
for (const node of workflowGraph.nodes) {
if (!visited.has(node.id) && hasCycle(node.id)) {
errors.push('Workflow contains cycles');
break;
}
}
// Check for disconnected nodes (excluding triggers)
const connectedNodes = new Set<string>();
workflowGraph.edges.forEach((edge) => {
connectedNodes.add(edge.source);
connectedNodes.add(edge.target);
});
const disconnectedNodes = workflowGraph.nodes.filter(
(node) =>
node.type !== 'trigger' && !connectedNodes.has(node.id),
);
if (disconnectedNodes.length > 0) {
errors.push(
`Disconnected nodes found: ${disconnectedNodes.map((n) => n.id).join(', ')}`,
);
}
return {
valid: errors.length === 0,
errors,
};
}
}

View File

@ -12,6 +12,7 @@ import config from '../config';
import fs from 'fs/promises'; import fs from 'fs/promises';
import path from 'path'; import path from 'path';
import chokidar from 'chokidar'; import chokidar from 'chokidar';
import { GraphExecutor } from './graphExecutor';
const execAsync = promisify(exec); const execAsync = promisify(exec);
@ -19,12 +20,14 @@ const execAsync = promisify(exec);
export default class ScenarioService { export default class ScenarioService {
private watchers: Map<number, any> = new Map(); private watchers: Map<number, any> = new Map();
private webhookTokens: Map<number, string> = new Map(); private webhookTokens: Map<number, string> = new Map();
private graphExecutor: GraphExecutor;
constructor( constructor(
@Inject('logger') private logger: winston.Logger, @Inject('logger') private logger: winston.Logger,
private cronService: CronService, private cronService: CronService,
private envService: EnvService, private envService: EnvService,
) { ) {
this.graphExecutor = new GraphExecutor(logger);
this.initializeWatchers(); this.initializeWatchers();
} }
@ -284,29 +287,49 @@ export default class ScenarioService {
}; };
try { try {
// Evaluate conditions // Check if this is a graph-based workflow
const conditionsMatched = await this.evaluateConditions( if (scenario.workflowGraph && scenario.workflowGraph.nodes && scenario.workflowGraph.nodes.length > 0) {
scenario.conditions || [], // Execute using graph executor
scenario.conditionLogic || 'AND', const result = await this.graphExecutor.executeGraph(
triggerData, scenario.workflowGraph,
); triggerData,
this,
log.conditionsMatched = conditionsMatched; );
if (!conditionsMatched) { log.conditionsMatched = true; // Graph execution handles conditions internally
log.executionStatus = result.success ? 'success' : 'failure';
log.executionDetails = result.results;
log.executionTime = Date.now() - startTime;
if (!result.success) {
throw new Error(JSON.stringify(result.results));
}
} else {
// Legacy execution path for form-based scenarios
// Evaluate conditions
const conditionsMatched = await this.evaluateConditions(
scenario.conditions || [],
scenario.conditionLogic || 'AND',
triggerData,
);
log.conditionsMatched = conditionsMatched;
if (!conditionsMatched) {
log.executionStatus = 'success';
log.executionDetails = { message: 'Conditions not matched, skipped' };
await this.createLog(log);
return;
}
// Execute actions
const actionResults = await this.executeActions(scenario.actions || []);
log.executionStatus = 'success'; log.executionStatus = 'success';
log.executionDetails = { message: 'Conditions not matched, skipped' }; log.executionDetails = actionResults;
await this.createLog(log); log.executionTime = Date.now() - startTime;
return;
} }
// Execute actions
const actionResults = await this.executeActions(scenario.actions || []);
log.executionStatus = 'success';
log.executionDetails = actionResults;
log.executionTime = Date.now() - startTime;
// Update scenario stats // Update scenario stats
await ScenarioModel.update( await ScenarioModel.update(
{ {
@ -498,4 +521,37 @@ export default class ScenarioService {
} }
return null; return null;
} }
// Public methods for graph executor
public async executeRunTask(data: any): Promise<any> {
if (data.cronId) {
return await this.cronService.run([data.cronId]);
}
throw new Error('cronId is required for run_task action');
}
public async executeSetVariable(data: any): Promise<any> {
if (data.name && data.value !== undefined) {
return await this.envService.create([{
name: data.name,
value: data.value,
remarks: `Set by scenario workflow`,
}]);
}
throw new Error('name and value are required for set_variable action');
}
public async executeCommand(data: any): Promise<any> {
if (data.command) {
const { stdout, stderr } = await execAsync(data.command);
return { stdout, stderr };
}
throw new Error('command is required for execute_command action');
}
public async executeSendNotification(data: any): Promise<any> {
// Log notification for now - can be extended to integrate with notification service
this.logger.info(`Notification: ${data.message || 'No message'}`);
return { sent: true, message: data.message };
}
} }