mirror of
https://github.com/xtr-dev/payload-automation.git
synced 2025-12-10 17:03:22 +00:00
- Introduce `WorkflowBuilder` for visual workflow configuration - Add child components: `WorkflowToolbar`, `StepConfigurationForm`, and `StepNode` - Implement `WorkflowBuilderField` for integration with PayloadCMS - Provide dynamic step type handling and JSON-based configuration editing - Enhance UI with drag-and-drop functionality and step dependencies management
979 lines
31 KiB
TypeScript
979 lines
31 KiB
TypeScript
import type { Payload, PayloadRequest } from 'payload'
|
|
|
|
// We need to reference the generated types dynamically since they're not available at build time
|
|
// Using generic types and casting where necessary
|
|
export type PayloadWorkflow = {
|
|
id: number
|
|
name: string
|
|
description?: null | string
|
|
triggers?: Array<{
|
|
type?: null | string
|
|
condition?: null | string
|
|
parameters?: {
|
|
collectionSlug?: null | string
|
|
operation?: null | string
|
|
global?: null | string
|
|
globalOperation?: null | string
|
|
[key: string]: unknown
|
|
} | null
|
|
[key: string]: unknown
|
|
}> | null
|
|
steps?: Array<{
|
|
type?: null | string
|
|
name?: null | string
|
|
input?: unknown
|
|
dependencies?: null | string[]
|
|
condition?: null | string
|
|
[key: string]: unknown
|
|
}> | null
|
|
[key: string]: unknown
|
|
}
|
|
|
|
import Handlebars from 'handlebars'
|
|
|
|
// Helper type to extract workflow step data from the generated types
|
|
export type WorkflowStep = {
|
|
name: string // Ensure name is always present for our execution logic
|
|
} & NonNullable<PayloadWorkflow['steps']>[0]
|
|
|
|
// Helper type to extract workflow trigger data from the generated types
|
|
export type WorkflowTrigger = {
|
|
type: string // Ensure type is always present for our execution logic
|
|
} & NonNullable<PayloadWorkflow['triggers']>[0]
|
|
|
|
export interface ExecutionContext {
|
|
steps: Record<string, any>
|
|
trigger: Record<string, any>
|
|
}
|
|
|
|
export class WorkflowExecutor {
|
|
constructor(
|
|
private payload: Payload,
|
|
private logger: Payload['logger']
|
|
) {}
|
|
|
|
/**
|
|
* Convert string values to appropriate types based on common patterns
|
|
*/
|
|
private convertValueType(value: unknown, key: string): unknown {
|
|
if (typeof value !== 'string') {
|
|
return value
|
|
}
|
|
|
|
// Type conversion patterns based on field names and values
|
|
const numericFields = ['timeout', 'retries', 'delay', 'port', 'limit', 'offset', 'count', 'max', 'min']
|
|
const booleanFields = ['enabled', 'required', 'active', 'success', 'failed', 'complete']
|
|
|
|
// Convert numeric fields
|
|
if (numericFields.some(field => key.toLowerCase().includes(field))) {
|
|
const numValue = Number(value)
|
|
if (!isNaN(numValue)) {
|
|
this.logger.debug({
|
|
key,
|
|
originalValue: value,
|
|
convertedValue: numValue
|
|
}, 'Auto-converted field to number')
|
|
return numValue
|
|
}
|
|
}
|
|
|
|
// Convert boolean fields
|
|
if (booleanFields.some(field => key.toLowerCase().includes(field))) {
|
|
if (value === 'true') return true
|
|
if (value === 'false') return false
|
|
}
|
|
|
|
// Try to parse as number if it looks numeric
|
|
if (/^\d+$/.test(value)) {
|
|
const numValue = parseInt(value, 10)
|
|
this.logger.debug({
|
|
key,
|
|
originalValue: value,
|
|
convertedValue: numValue
|
|
}, 'Auto-converted numeric string to number')
|
|
return numValue
|
|
}
|
|
|
|
// Try to parse as float if it looks like a decimal
|
|
if (/^\d+\.\d+$/.test(value)) {
|
|
const floatValue = parseFloat(value)
|
|
this.logger.debug({
|
|
key,
|
|
originalValue: value,
|
|
convertedValue: floatValue
|
|
}, 'Auto-converted decimal string to number')
|
|
return floatValue
|
|
}
|
|
|
|
// Return as string if no conversion applies
|
|
return value
|
|
}
|
|
|
|
/**
|
|
* Classifies error types based on error messages
|
|
*/
|
|
private classifyErrorType(errorMessage: string): string {
|
|
if (errorMessage.includes('timeout') || errorMessage.includes('ETIMEDOUT')) {
|
|
return 'timeout'
|
|
}
|
|
if (errorMessage.includes('ENOTFOUND') || errorMessage.includes('getaddrinfo')) {
|
|
return 'dns'
|
|
}
|
|
if (errorMessage.includes('ECONNREFUSED') || errorMessage.includes('ECONNRESET')) {
|
|
return 'connection'
|
|
}
|
|
if (errorMessage.includes('network') || errorMessage.includes('fetch')) {
|
|
return 'network'
|
|
}
|
|
return 'unknown'
|
|
}
|
|
|
|
/**
|
|
* Evaluate a step condition using JSONPath
|
|
*/
|
|
private evaluateStepCondition(condition: string, context: ExecutionContext): boolean {
|
|
return this.evaluateCondition(condition, context)
|
|
}
|
|
|
|
/**
|
|
* Execute a single workflow step
|
|
*/
|
|
private async executeStep(
|
|
step: WorkflowStep,
|
|
stepIndex: number,
|
|
context: ExecutionContext,
|
|
req: PayloadRequest,
|
|
workflowRunId?: number | string
|
|
): Promise<void> {
|
|
const stepName = step.name || 'step-' + stepIndex
|
|
|
|
this.logger.info({
|
|
hasStep: 'step' in step,
|
|
step: JSON.stringify(step),
|
|
stepName
|
|
}, 'Executing step')
|
|
|
|
// Check step condition if present
|
|
if (step.condition) {
|
|
this.logger.debug({
|
|
condition: step.condition,
|
|
stepName,
|
|
availableSteps: Object.keys(context.steps),
|
|
completedSteps: Object.entries(context.steps)
|
|
.filter(([_, s]) => s.state === 'succeeded')
|
|
.map(([name]) => name),
|
|
triggerType: context.trigger?.type
|
|
}, 'Evaluating step condition')
|
|
|
|
const conditionMet = this.evaluateStepCondition(step.condition, context)
|
|
|
|
if (!conditionMet) {
|
|
this.logger.info({
|
|
condition: step.condition,
|
|
stepName,
|
|
contextSnapshot: JSON.stringify({
|
|
stepOutputs: Object.entries(context.steps).reduce((acc, [name, step]) => {
|
|
acc[name] = { state: step.state, hasOutput: !!step.output }
|
|
return acc
|
|
}, {} as Record<string, any>),
|
|
triggerData: context.trigger?.data ? 'present' : 'absent'
|
|
})
|
|
}, 'Step condition not met, skipping')
|
|
|
|
// Mark step as completed but skipped
|
|
context.steps[stepName] = {
|
|
error: undefined,
|
|
input: undefined,
|
|
output: { reason: 'Condition not met', skipped: true },
|
|
state: 'succeeded'
|
|
}
|
|
|
|
// Update workflow run context if needed
|
|
if (workflowRunId) {
|
|
await this.updateWorkflowRunContext(workflowRunId, context, req)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
this.logger.info({
|
|
condition: step.condition,
|
|
stepName,
|
|
contextSnapshot: JSON.stringify({
|
|
stepOutputs: Object.entries(context.steps).reduce((acc, [name, step]) => {
|
|
acc[name] = { state: step.state, hasOutput: !!step.output }
|
|
return acc
|
|
}, {} as Record<string, any>),
|
|
triggerData: context.trigger?.data ? 'present' : 'absent'
|
|
})
|
|
}, 'Step condition met, proceeding with execution')
|
|
}
|
|
|
|
// Initialize step context
|
|
context.steps[stepName] = {
|
|
error: undefined,
|
|
input: undefined,
|
|
output: undefined,
|
|
state: 'running',
|
|
_startTime: Date.now() // Track execution start time for independent duration tracking
|
|
}
|
|
|
|
// Move taskSlug declaration outside try block so it's accessible in catch
|
|
const taskSlug = step.type as string
|
|
|
|
try {
|
|
// Get input configuration from the step
|
|
const inputConfig = (step.input as Record<string, unknown>) || {}
|
|
|
|
// Resolve input data using Handlebars templates
|
|
const resolvedInput = this.resolveStepInput(inputConfig, context, taskSlug)
|
|
context.steps[stepName].input = resolvedInput
|
|
|
|
if (!taskSlug) {
|
|
throw new Error(`Step ${stepName} is missing a task type`)
|
|
}
|
|
|
|
this.logger.info({
|
|
hasInput: !!resolvedInput,
|
|
hasReq: !!req,
|
|
stepName,
|
|
taskSlug
|
|
}, 'Queueing task')
|
|
|
|
const job = await this.payload.jobs.queue({
|
|
input: resolvedInput,
|
|
req,
|
|
task: taskSlug
|
|
})
|
|
|
|
// Run the specific job immediately and wait for completion
|
|
this.logger.info({ jobId: job.id }, 'Running job immediately using runByID')
|
|
const runResults = await this.payload.jobs.runByID({
|
|
id: job.id,
|
|
req
|
|
})
|
|
|
|
this.logger.info({
|
|
jobId: job.id,
|
|
runResult: runResults,
|
|
hasResult: !!runResults
|
|
}, 'Job run completed')
|
|
|
|
// Give a small delay to ensure job is fully processed
|
|
await new Promise(resolve => setTimeout(resolve, 100))
|
|
|
|
// Get the job result
|
|
const completedJob = await this.payload.findByID({
|
|
id: job.id,
|
|
collection: 'payload-jobs',
|
|
req
|
|
})
|
|
|
|
this.logger.info({
|
|
jobId: job.id,
|
|
totalTried: completedJob.totalTried,
|
|
hasError: completedJob.hasError,
|
|
taskStatus: completedJob.taskStatus ? Object.keys(completedJob.taskStatus) : 'null'
|
|
}, 'Retrieved job results')
|
|
|
|
const taskStatus = completedJob.taskStatus?.[completedJob.taskSlug]?.[completedJob.totalTried]
|
|
const isComplete = taskStatus?.complete === true
|
|
const hasError = completedJob.hasError || !isComplete
|
|
|
|
// Extract error information from job if available
|
|
let errorMessage: string | undefined
|
|
if (hasError) {
|
|
// Try to get error from the latest log entry
|
|
if (completedJob.log && completedJob.log.length > 0) {
|
|
const latestLog = completedJob.log[completedJob.log.length - 1]
|
|
errorMessage = latestLog.error?.message || latestLog.error
|
|
}
|
|
|
|
// Fallback to top-level error
|
|
if (!errorMessage && completedJob.error) {
|
|
errorMessage = completedJob.error.message || completedJob.error
|
|
}
|
|
|
|
// Try to get error from task output if available
|
|
if (!errorMessage && taskStatus?.output?.error) {
|
|
errorMessage = taskStatus.output.error
|
|
}
|
|
|
|
// Check if task handler returned with state='failed'
|
|
if (!errorMessage && taskStatus?.state === 'failed') {
|
|
errorMessage = 'Task handler returned a failed state'
|
|
// Try to get more specific error from output
|
|
if (taskStatus.output?.error) {
|
|
errorMessage = taskStatus.output.error
|
|
}
|
|
}
|
|
|
|
// Check for network errors in the job data
|
|
if (!errorMessage && completedJob.result) {
|
|
const result = completedJob.result
|
|
if (result.error) {
|
|
errorMessage = result.error
|
|
}
|
|
}
|
|
|
|
// Final fallback to generic message with more detail
|
|
if (!errorMessage) {
|
|
const jobDetails = {
|
|
taskSlug,
|
|
hasError: completedJob.hasError,
|
|
taskStatus: taskStatus?.complete,
|
|
totalTried: completedJob.totalTried
|
|
}
|
|
errorMessage = `Task ${taskSlug} failed without detailed error information. Job details: ${JSON.stringify(jobDetails)}`
|
|
}
|
|
}
|
|
|
|
const result: {
|
|
error: string | undefined
|
|
output: unknown
|
|
state: 'failed' | 'succeeded'
|
|
} = {
|
|
error: errorMessage,
|
|
output: taskStatus?.output || {},
|
|
state: isComplete ? 'succeeded' : 'failed'
|
|
}
|
|
|
|
// Store the output and error
|
|
context.steps[stepName].output = result.output
|
|
context.steps[stepName].state = result.state
|
|
if (result.error) {
|
|
context.steps[stepName].error = result.error
|
|
}
|
|
|
|
// Independent execution tracking (not dependent on PayloadCMS task status)
|
|
context.steps[stepName].executionInfo = {
|
|
completed: true, // Step execution completed (regardless of success/failure)
|
|
success: result.state === 'succeeded',
|
|
executedAt: new Date().toISOString(),
|
|
duration: Date.now() - (context.steps[stepName]._startTime || Date.now())
|
|
}
|
|
|
|
// For failed steps, try to extract detailed error information from the job logs
|
|
// This approach is more reliable than external storage and persists with the workflow
|
|
if (result.state === 'failed') {
|
|
const errorDetails = this.extractErrorDetailsFromJob(completedJob, context.steps[stepName], stepName)
|
|
if (errorDetails) {
|
|
context.steps[stepName].errorDetails = errorDetails
|
|
|
|
this.logger.info({
|
|
stepName,
|
|
errorType: errorDetails.errorType,
|
|
duration: errorDetails.duration,
|
|
attempts: errorDetails.attempts
|
|
}, 'Extracted detailed error information for failed step')
|
|
}
|
|
}
|
|
|
|
this.logger.debug({context}, 'Step execution context')
|
|
|
|
if (result.state !== 'succeeded') {
|
|
throw new Error(result.error || `Step ${stepName} failed`)
|
|
}
|
|
|
|
this.logger.info({
|
|
output: result.output,
|
|
stepName
|
|
}, 'Step completed')
|
|
|
|
// Update workflow run with current step results if workflowRunId is provided
|
|
if (workflowRunId) {
|
|
await this.updateWorkflowRunContext(workflowRunId, context, req)
|
|
}
|
|
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
|
context.steps[stepName].state = 'failed'
|
|
context.steps[stepName].error = errorMessage
|
|
|
|
// Independent execution tracking for failed steps
|
|
context.steps[stepName].executionInfo = {
|
|
completed: true, // Execution attempted and completed (even if it failed)
|
|
success: false,
|
|
executedAt: new Date().toISOString(),
|
|
duration: Date.now() - (context.steps[stepName]._startTime || Date.now()),
|
|
failureReason: errorMessage
|
|
}
|
|
|
|
this.logger.error({
|
|
error: errorMessage,
|
|
input: context.steps[stepName].input,
|
|
stepName,
|
|
taskSlug
|
|
}, 'Step execution failed')
|
|
|
|
// Update workflow run with current step results if workflowRunId is provided
|
|
if (workflowRunId) {
|
|
try {
|
|
await this.updateWorkflowRunContext(workflowRunId, context, req)
|
|
} catch (updateError) {
|
|
this.logger.error({
|
|
error: updateError instanceof Error ? updateError.message : 'Unknown error',
|
|
stepName
|
|
}, 'Failed to update workflow run context after step failure')
|
|
}
|
|
}
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extracts detailed error information from job logs and input
|
|
*/
|
|
private extractErrorDetailsFromJob(job: any, stepContext: any, stepName: string) {
|
|
try {
|
|
// Get error information from multiple sources
|
|
const input = stepContext.input || {}
|
|
const logs = job.log || []
|
|
const latestLog = logs[logs.length - 1]
|
|
|
|
// Extract error message from job error or log
|
|
const errorMessage = job.error?.message || latestLog?.error?.message || 'Unknown error'
|
|
|
|
// For timeout scenarios, check if it's a timeout based on duration and timeout setting
|
|
let errorType = this.classifyErrorType(errorMessage)
|
|
|
|
// Special handling for HTTP timeouts - if task failed and duration exceeds timeout, it's likely a timeout
|
|
if (errorType === 'unknown' && input.timeout && stepContext.executionInfo?.duration) {
|
|
const timeoutMs = parseInt(input.timeout) || 30000
|
|
const actualDuration = stepContext.executionInfo.duration
|
|
|
|
// If execution duration is close to or exceeds timeout, classify as timeout
|
|
if (actualDuration >= (timeoutMs * 0.9)) { // 90% of timeout threshold
|
|
errorType = 'timeout'
|
|
this.logger.debug({
|
|
timeoutMs,
|
|
actualDuration,
|
|
stepName
|
|
}, 'Classified error as timeout based on duration analysis')
|
|
}
|
|
}
|
|
|
|
// Calculate duration from execution info if available
|
|
const duration = stepContext.executionInfo?.duration || 0
|
|
|
|
// Extract attempt count from logs
|
|
const attempts = job.totalTried || 1
|
|
|
|
return {
|
|
stepId: `${stepName}-${Date.now()}`,
|
|
errorType,
|
|
duration,
|
|
attempts,
|
|
finalError: errorMessage,
|
|
context: {
|
|
url: input.url,
|
|
method: input.method,
|
|
timeout: input.timeout,
|
|
statusCode: latestLog?.output?.status,
|
|
headers: input.headers
|
|
},
|
|
timestamp: new Date().toISOString()
|
|
}
|
|
} catch (error) {
|
|
this.logger.warn({
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
stepName
|
|
}, 'Failed to extract error details from job')
|
|
return null
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Resolve step execution order based on dependencies
|
|
*/
|
|
private resolveExecutionOrder(steps: WorkflowStep[]): WorkflowStep[][] {
|
|
const stepMap = new Map<string, WorkflowStep>()
|
|
const dependencyGraph = new Map<string, string[]>()
|
|
const indegree = new Map<string, number>()
|
|
|
|
// Build the step map and dependency graph
|
|
for (const step of steps) {
|
|
const stepName = step.name || `step-${steps.indexOf(step)}`
|
|
const dependencies = step.dependencies || []
|
|
|
|
stepMap.set(stepName, { ...step, name: stepName, dependencies })
|
|
dependencyGraph.set(stepName, dependencies)
|
|
indegree.set(stepName, dependencies.length)
|
|
}
|
|
|
|
// Topological sort to determine execution batches
|
|
const executionBatches: WorkflowStep[][] = []
|
|
const processed = new Set<string>()
|
|
|
|
while (processed.size < steps.length) {
|
|
const currentBatch: WorkflowStep[] = []
|
|
|
|
// Find all steps with no remaining dependencies
|
|
for (const [stepName, inDegree] of indegree.entries()) {
|
|
if (inDegree === 0 && !processed.has(stepName)) {
|
|
const step = stepMap.get(stepName)
|
|
if (step) {
|
|
currentBatch.push(step)
|
|
}
|
|
}
|
|
}
|
|
|
|
if (currentBatch.length === 0) {
|
|
throw new Error('Circular dependency detected in workflow steps')
|
|
}
|
|
|
|
executionBatches.push(currentBatch)
|
|
|
|
// Update indegrees for next iteration
|
|
for (const step of currentBatch) {
|
|
processed.add(step.name)
|
|
|
|
// Reduce indegree for steps that depend on completed steps
|
|
for (const [otherStepName, dependencies] of dependencyGraph.entries()) {
|
|
if (dependencies.includes(step.name) && !processed.has(otherStepName)) {
|
|
indegree.set(otherStepName, (indegree.get(otherStepName) || 0) - 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return executionBatches
|
|
}
|
|
|
|
|
|
/**
|
|
* Resolve step input using Handlebars templates with automatic type conversion
|
|
*/
|
|
private resolveStepInput(config: Record<string, unknown>, context: ExecutionContext, stepType?: string): Record<string, unknown> {
|
|
const resolved: Record<string, unknown> = {}
|
|
|
|
this.logger.debug({
|
|
configKeys: Object.keys(config),
|
|
contextSteps: Object.keys(context.steps),
|
|
triggerType: context.trigger?.type,
|
|
stepType
|
|
}, 'Starting step input resolution with Handlebars')
|
|
|
|
for (const [key, value] of Object.entries(config)) {
|
|
if (typeof value === 'string') {
|
|
// Check if the string contains Handlebars templates
|
|
if (value.includes('{{') && value.includes('}}')) {
|
|
this.logger.debug({
|
|
key,
|
|
template: value,
|
|
availableSteps: Object.keys(context.steps),
|
|
hasTriggerData: !!context.trigger?.data,
|
|
hasTriggerDoc: !!context.trigger?.doc
|
|
}, 'Processing Handlebars template')
|
|
|
|
try {
|
|
const template = Handlebars.compile(value)
|
|
const result = template(context)
|
|
|
|
this.logger.debug({
|
|
key,
|
|
template: value,
|
|
result: JSON.stringify(result).substring(0, 200),
|
|
resultType: typeof result
|
|
}, 'Handlebars template resolved successfully')
|
|
|
|
resolved[key] = this.convertValueType(result, key)
|
|
} catch (error) {
|
|
this.logger.warn({
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
key,
|
|
template: value,
|
|
contextSnapshot: JSON.stringify(context).substring(0, 500)
|
|
}, 'Failed to resolve Handlebars template')
|
|
resolved[key] = value // Keep original value if resolution fails
|
|
}
|
|
} else {
|
|
// Regular string, apply type conversion
|
|
resolved[key] = this.convertValueType(value, key)
|
|
}
|
|
} else if (typeof value === 'object' && value !== null) {
|
|
// Recursively resolve nested objects
|
|
this.logger.debug({
|
|
key,
|
|
nestedKeys: Object.keys(value as Record<string, unknown>)
|
|
}, 'Recursively resolving nested object')
|
|
|
|
resolved[key] = this.resolveStepInput(value as Record<string, unknown>, context, stepType)
|
|
} else {
|
|
// Keep literal values as-is
|
|
resolved[key] = value
|
|
}
|
|
}
|
|
|
|
this.logger.debug({
|
|
resolvedKeys: Object.keys(resolved),
|
|
originalKeys: Object.keys(config)
|
|
}, 'Step input resolution completed')
|
|
|
|
return resolved
|
|
}
|
|
|
|
/**
|
|
* Safely serialize an object, handling circular references and non-serializable values
|
|
*/
|
|
private safeSerialize(obj: unknown): unknown {
|
|
const seen = new WeakSet()
|
|
|
|
const serialize = (value: unknown): unknown => {
|
|
if (value === null || typeof value !== 'object') {
|
|
return value
|
|
}
|
|
|
|
if (seen.has(value)) {
|
|
return '[Circular Reference]'
|
|
}
|
|
|
|
seen.add(value)
|
|
|
|
if (Array.isArray(value)) {
|
|
return value.map(serialize)
|
|
}
|
|
|
|
const result: Record<string, unknown> = {}
|
|
for (const [key, val] of Object.entries(value as Record<string, unknown>)) {
|
|
try {
|
|
// Skip non-serializable properties that are likely internal database objects
|
|
if (key === 'table' || key === 'schema' || key === '_' || key === '__') {
|
|
continue
|
|
}
|
|
result[key] = serialize(val)
|
|
} catch {
|
|
// Skip properties that can't be accessed or serialized
|
|
result[key] = '[Non-serializable]'
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
return serialize(obj)
|
|
}
|
|
|
|
/**
|
|
* Update workflow run with current context
|
|
*/
|
|
private async updateWorkflowRunContext(
|
|
workflowRunId: number | string,
|
|
context: ExecutionContext,
|
|
req: PayloadRequest
|
|
): Promise<void> {
|
|
const serializeContext = () => ({
|
|
steps: this.safeSerialize(context.steps),
|
|
trigger: {
|
|
type: context.trigger.type,
|
|
collection: context.trigger.collection,
|
|
data: this.safeSerialize(context.trigger.data),
|
|
doc: this.safeSerialize(context.trigger.doc),
|
|
operation: context.trigger.operation,
|
|
previousDoc: this.safeSerialize(context.trigger.previousDoc),
|
|
triggeredAt: context.trigger.triggeredAt,
|
|
user: context.trigger.req?.user
|
|
}
|
|
})
|
|
|
|
await this.payload.update({
|
|
id: workflowRunId,
|
|
collection: 'workflow-runs',
|
|
data: {
|
|
context: serializeContext()
|
|
},
|
|
req
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Evaluate a condition using Handlebars templates and comparison operators
|
|
*/
|
|
public evaluateCondition(condition: string, context: ExecutionContext): boolean {
|
|
this.logger.debug({
|
|
condition,
|
|
contextKeys: Object.keys(context),
|
|
triggerType: context.trigger?.type,
|
|
triggerData: context.trigger?.data,
|
|
triggerDoc: context.trigger?.doc ? 'present' : 'absent'
|
|
}, 'Starting condition evaluation')
|
|
|
|
try {
|
|
// Check if this is a comparison expression
|
|
const comparisonMatch = condition.match(/^(.+?)\s*(==|!=|>|<|>=|<=)\s*(.+)$/)
|
|
|
|
if (comparisonMatch) {
|
|
const [, leftExpr, operator, rightExpr] = comparisonMatch
|
|
|
|
// Evaluate left side (could be Handlebars template or JSONPath)
|
|
const leftValue = this.resolveConditionValue(leftExpr.trim(), context)
|
|
|
|
// Evaluate right side (could be Handlebars template, JSONPath, or literal)
|
|
const rightValue = this.resolveConditionValue(rightExpr.trim(), context)
|
|
|
|
this.logger.debug({
|
|
condition,
|
|
leftExpr: leftExpr.trim(),
|
|
leftValue,
|
|
operator,
|
|
rightExpr: rightExpr.trim(),
|
|
rightValue,
|
|
leftType: typeof leftValue,
|
|
rightType: typeof rightValue
|
|
}, 'Evaluating comparison condition')
|
|
|
|
// Perform comparison
|
|
let result: boolean
|
|
switch (operator) {
|
|
case '!=':
|
|
result = leftValue !== rightValue
|
|
break
|
|
case '<':
|
|
result = Number(leftValue) < Number(rightValue)
|
|
break
|
|
case '<=':
|
|
result = Number(leftValue) <= Number(rightValue)
|
|
break
|
|
case '==':
|
|
result = leftValue === rightValue
|
|
break
|
|
case '>':
|
|
result = Number(leftValue) > Number(rightValue)
|
|
break
|
|
case '>=':
|
|
result = Number(leftValue) >= Number(rightValue)
|
|
break
|
|
default:
|
|
throw new Error(`Unknown comparison operator: ${operator}`)
|
|
}
|
|
|
|
this.logger.debug({
|
|
condition,
|
|
result,
|
|
leftValue,
|
|
rightValue,
|
|
operator
|
|
}, 'Comparison condition evaluation completed')
|
|
|
|
return result
|
|
} else {
|
|
// Treat as template or JSONPath boolean evaluation
|
|
const result = this.resolveConditionValue(condition, context)
|
|
|
|
this.logger.debug({
|
|
condition,
|
|
result,
|
|
resultType: Array.isArray(result) ? 'array' : typeof result,
|
|
resultLength: Array.isArray(result) ? result.length : undefined
|
|
}, 'Boolean evaluation result')
|
|
|
|
// Handle different result types
|
|
let finalResult: boolean
|
|
if (Array.isArray(result)) {
|
|
finalResult = result.length > 0 && Boolean(result[0])
|
|
} else {
|
|
finalResult = Boolean(result)
|
|
}
|
|
|
|
this.logger.debug({
|
|
condition,
|
|
finalResult,
|
|
originalResult: result
|
|
}, 'Boolean condition evaluation completed')
|
|
|
|
return finalResult
|
|
}
|
|
} catch (error) {
|
|
this.logger.warn({
|
|
condition,
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
errorStack: error instanceof Error ? error.stack : undefined
|
|
}, 'Failed to evaluate condition')
|
|
|
|
// If condition evaluation fails, assume false
|
|
return false
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolve a condition value using Handlebars templates or JSONPath
|
|
*/
|
|
private resolveConditionValue(expr: string, context: ExecutionContext): any {
|
|
// Handle string literals
|
|
if ((expr.startsWith('"') && expr.endsWith('"')) || (expr.startsWith("'") && expr.endsWith("'"))) {
|
|
return expr.slice(1, -1) // Remove quotes
|
|
}
|
|
|
|
// Handle boolean literals
|
|
if (expr === 'true') {return true}
|
|
if (expr === 'false') {return false}
|
|
|
|
// Handle number literals
|
|
if (/^-?\d+(?:\.\d+)?$/.test(expr)) {
|
|
return Number(expr)
|
|
}
|
|
|
|
// Handle Handlebars templates
|
|
if (expr.includes('{{') && expr.includes('}}')) {
|
|
try {
|
|
const template = Handlebars.compile(expr)
|
|
return template(context)
|
|
} catch (error) {
|
|
this.logger.warn({
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
expr
|
|
}, 'Failed to resolve Handlebars condition')
|
|
return false
|
|
}
|
|
}
|
|
|
|
|
|
// Return as string if nothing else matches
|
|
return expr
|
|
}
|
|
|
|
/**
|
|
* Execute a workflow with the given context
|
|
*/
|
|
async execute(workflow: PayloadWorkflow, context: ExecutionContext, req: PayloadRequest): Promise<void> {
|
|
this.logger.info({
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name
|
|
}, 'Starting workflow execution')
|
|
|
|
const serializeContext = () => ({
|
|
steps: this.safeSerialize(context.steps),
|
|
trigger: {
|
|
type: context.trigger.type,
|
|
collection: context.trigger.collection,
|
|
data: this.safeSerialize(context.trigger.data),
|
|
doc: this.safeSerialize(context.trigger.doc),
|
|
operation: context.trigger.operation,
|
|
previousDoc: this.safeSerialize(context.trigger.previousDoc),
|
|
triggeredAt: context.trigger.triggeredAt,
|
|
user: context.trigger.req?.user
|
|
}
|
|
})
|
|
|
|
this.logger.info({
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name,
|
|
contextSummary: {
|
|
triggerType: context.trigger.type,
|
|
triggerCollection: context.trigger.collection,
|
|
triggerOperation: context.trigger.operation,
|
|
hasDoc: !!context.trigger.doc,
|
|
userEmail: context.trigger.req?.user?.email
|
|
}
|
|
}, 'About to create workflow run record')
|
|
|
|
// Create a workflow run record
|
|
let workflowRun;
|
|
try {
|
|
workflowRun = await this.payload.create({
|
|
collection: 'workflow-runs',
|
|
data: {
|
|
context: serializeContext(),
|
|
startedAt: new Date().toISOString(),
|
|
status: 'running',
|
|
triggeredBy: context.trigger.req?.user?.email || 'system',
|
|
workflow: workflow.id,
|
|
workflowVersion: 1 // Default version since generated type doesn't have _version field
|
|
},
|
|
req
|
|
})
|
|
|
|
this.logger.info({
|
|
workflowRunId: workflowRun.id,
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name
|
|
}, 'Workflow run record created successfully')
|
|
} catch (error) {
|
|
this.logger.error({
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
errorStack: error instanceof Error ? error.stack : undefined,
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name
|
|
}, 'Failed to create workflow run record')
|
|
throw error
|
|
}
|
|
|
|
try {
|
|
// Resolve execution order based on dependencies
|
|
const executionBatches = this.resolveExecutionOrder(workflow.steps as WorkflowStep[] || [])
|
|
|
|
this.logger.info({
|
|
batchSizes: executionBatches.map(batch => batch.length),
|
|
totalBatches: executionBatches.length
|
|
}, 'Resolved step execution order')
|
|
|
|
// Execute each batch in sequence, but steps within each batch in parallel
|
|
for (let batchIndex = 0; batchIndex < executionBatches.length; batchIndex++) {
|
|
const batch = executionBatches[batchIndex]
|
|
|
|
this.logger.info({
|
|
batchIndex,
|
|
stepCount: batch.length,
|
|
stepNames: batch.map(s => s.name)
|
|
}, 'Executing batch')
|
|
|
|
// Execute all steps in this batch in parallel
|
|
const batchPromises = batch.map((step, stepIndex) =>
|
|
this.executeStep(step, stepIndex, context, req, workflowRun.id)
|
|
)
|
|
|
|
// Wait for all steps in the current batch to complete
|
|
await Promise.all(batchPromises)
|
|
|
|
this.logger.info({
|
|
batchIndex,
|
|
stepCount: batch.length
|
|
}, 'Batch completed')
|
|
}
|
|
|
|
// Update workflow run as completed
|
|
await this.payload.update({
|
|
id: workflowRun.id,
|
|
collection: 'workflow-runs',
|
|
data: {
|
|
completedAt: new Date().toISOString(),
|
|
context: serializeContext(),
|
|
status: 'completed'
|
|
},
|
|
req
|
|
})
|
|
|
|
this.logger.info({
|
|
runId: workflowRun.id,
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name
|
|
}, 'Workflow execution completed')
|
|
|
|
} catch (error) {
|
|
// Update workflow run as failed
|
|
await this.payload.update({
|
|
id: workflowRun.id,
|
|
collection: 'workflow-runs',
|
|
data: {
|
|
completedAt: new Date().toISOString(),
|
|
context: serializeContext(),
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
status: 'failed'
|
|
},
|
|
req
|
|
})
|
|
|
|
this.logger.error({
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
runId: workflowRun.id,
|
|
workflowId: workflow.id,
|
|
workflowName: workflow.name
|
|
}, 'Workflow execution failed')
|
|
|
|
throw error
|
|
}
|
|
}
|
|
}
|