Remove initCollectionHooks and associated migration guides

- Delete `initCollectionHooks` implementation and its usage references
- Remove `MIGRATION-v0.0.24.md` and `NOT-IMPLEMENTING.md` as they are now obsolete
- Update related workflow executor logic and TypeScript definitions, ensuring compatibility
- Simplify error handling, input parsing, and logging within workflow execution
- Clean up and refactor redundant code to improve maintainability
This commit is contained in:
2025-09-10 17:36:56 +02:00
parent 435f9b0c69
commit 0f741acf73
19 changed files with 399 additions and 1077 deletions

View File

@@ -5,26 +5,26 @@ import type { Payload, PayloadRequest } from 'payload'
export type PayloadWorkflow = {
id: number
name: string
description?: string | null
description?: null | string
triggers?: Array<{
type?: string | null
condition?: string | null
type?: null | string
condition?: null | string
parameters?: {
collectionSlug?: string | null
operation?: string | null
webhookPath?: string | null
global?: string | null
globalOperation?: string | null
collectionSlug?: null | string
operation?: null | string
webhookPath?: null | string
global?: null | string
globalOperation?: null | string
[key: string]: unknown
} | null
[key: string]: unknown
}> | null
steps?: Array<{
step?: string | null
name?: string | null
step?: null | string
name?: null | string
input?: unknown
dependencies?: string[] | null
condition?: string | null
dependencies?: null | string[]
condition?: null | string
[key: string]: unknown
}> | null
[key: string]: unknown
@@ -33,14 +33,14 @@ export type PayloadWorkflow = {
import { JSONPath } from 'jsonpath-plus'
// Helper type to extract workflow step data from the generated types
export type WorkflowStep = NonNullable<PayloadWorkflow['steps']>[0] & {
export type WorkflowStep = {
name: string // Ensure name is always present for our execution logic
}
} & NonNullable<PayloadWorkflow['steps']>[0]
// Helper type to extract workflow trigger data from the generated types
export type WorkflowTrigger = NonNullable<PayloadWorkflow['triggers']>[0] & {
// Helper type to extract workflow trigger data from the generated types
export type WorkflowTrigger = {
type: string // Ensure type is always present for our execution logic
}
} & NonNullable<PayloadWorkflow['triggers']>[0]
export interface ExecutionContext {
steps: Record<string, {
@@ -89,6 +89,7 @@ export interface ExecutionContext {
email?: string
id?: string
}
[key: string]: any
}
}
@@ -98,6 +99,25 @@ export class WorkflowExecutor {
private logger: Payload['logger']
) {}
/**
* Classifies error types based on error messages
*/
private classifyErrorType(errorMessage: string): string {
if (errorMessage.includes('timeout') || errorMessage.includes('ETIMEDOUT')) {
return 'timeout'
}
if (errorMessage.includes('ENOTFOUND') || errorMessage.includes('getaddrinfo')) {
return 'dns'
}
if (errorMessage.includes('ECONNREFUSED') || errorMessage.includes('ECONNRESET')) {
return 'connection'
}
if (errorMessage.includes('network') || errorMessage.includes('fetch')) {
return 'network'
}
return 'unknown'
}
/**
* Evaluate a step condition using JSONPath
*/
@@ -189,20 +209,32 @@ export class WorkflowExecutor {
}
// Move taskSlug declaration outside try block so it's accessible in catch
const taskSlug = step.step // Use the 'step' field for task type
const taskSlug = step.type as string
try {
// Extract input data from step - PayloadCMS flattens inputSchema fields to step level
const inputFields: Record<string, unknown> = {}
// Get all fields except the core step fields
const coreFields = ['step', 'name', 'dependencies', 'condition']
const coreFields = ['step', 'name', 'dependencies', 'condition', 'type', 'id', 'parameters']
for (const [key, value] of Object.entries(step)) {
if (!coreFields.includes(key)) {
inputFields[key] = value
// Handle flattened parameters (remove 'parameter' prefix)
if (key.startsWith('parameter')) {
const cleanKey = key.replace('parameter', '')
const properKey = cleanKey.charAt(0).toLowerCase() + cleanKey.slice(1)
inputFields[properKey] = value
} else {
inputFields[key] = value
}
}
}
// Also extract from nested parameters object if it exists
if (step.parameters && typeof step.parameters === 'object') {
Object.assign(inputFields, step.parameters)
}
// Resolve input data using JSONPath
const resolvedInput = this.resolveStepInput(inputFields, context)
context.steps[stepName].input = resolvedInput
@@ -230,8 +262,8 @@ export class WorkflowExecutor {
id: job.id,
req
})
this.logger.info({
this.logger.info({
jobId: job.id,
runResult: runResults,
hasResult: !!runResults
@@ -276,7 +308,7 @@ export class WorkflowExecutor {
if (!errorMessage && taskStatus?.output?.error) {
errorMessage = taskStatus.output.error
}
// Check if task handler returned with state='failed'
if (!errorMessage && taskStatus?.state === 'failed') {
errorMessage = 'Task handler returned a failed state'
@@ -337,7 +369,7 @@ export class WorkflowExecutor {
const errorDetails = this.extractErrorDetailsFromJob(completedJob, context.steps[stepName], stepName)
if (errorDetails) {
context.steps[stepName].errorDetails = errorDetails
this.logger.info({
stepName,
errorType: errorDetails.errorType,
@@ -400,6 +432,95 @@ export class WorkflowExecutor {
}
}
/**
* Extracts detailed error information from job logs and input
*/
private extractErrorDetailsFromJob(job: any, stepContext: any, stepName: string) {
try {
// Get error information from multiple sources
const input = stepContext.input || {}
const logs = job.log || []
const latestLog = logs[logs.length - 1]
// Extract error message from job error or log
const errorMessage = job.error?.message || latestLog?.error?.message || 'Unknown error'
// For timeout scenarios, check if it's a timeout based on duration and timeout setting
let errorType = this.classifyErrorType(errorMessage)
// Special handling for HTTP timeouts - if task failed and duration exceeds timeout, it's likely a timeout
if (errorType === 'unknown' && input.timeout && stepContext.executionInfo?.duration) {
const timeoutMs = parseInt(input.timeout) || 30000
const actualDuration = stepContext.executionInfo.duration
// If execution duration is close to or exceeds timeout, classify as timeout
if (actualDuration >= (timeoutMs * 0.9)) { // 90% of timeout threshold
errorType = 'timeout'
this.logger.debug({
timeoutMs,
actualDuration,
stepName
}, 'Classified error as timeout based on duration analysis')
}
}
// Calculate duration from execution info if available
const duration = stepContext.executionInfo?.duration || 0
// Extract attempt count from logs
const attempts = job.totalTried || 1
return {
stepId: `${stepName}-${Date.now()}`,
errorType,
duration,
attempts,
finalError: errorMessage,
context: {
url: input.url,
method: input.method,
timeout: input.timeout,
statusCode: latestLog?.output?.status,
headers: input.headers
},
timestamp: new Date().toISOString()
}
} catch (error) {
this.logger.warn({
error: error instanceof Error ? error.message : 'Unknown error',
stepName
}, 'Failed to extract error details from job')
return null
}
}
/**
* Parse a condition value (string literal, number, boolean, or JSONPath)
*/
private parseConditionValue(expr: string, context: ExecutionContext): any {
// Handle string literals
if ((expr.startsWith('"') && expr.endsWith('"')) || (expr.startsWith("'") && expr.endsWith("'"))) {
return expr.slice(1, -1) // Remove quotes
}
// Handle boolean literals
if (expr === 'true') {return true}
if (expr === 'false') {return false}
// Handle number literals
if (/^-?\d+(?:\.\d+)?$/.test(expr)) {
return Number(expr)
}
// Handle JSONPath expressions
if (expr.startsWith('$')) {
return this.resolveJSONPathValue(expr, context)
}
// Return as string if nothing else matches
return expr
}
/**
* Resolve step execution order based on dependencies
*/
@@ -457,6 +578,22 @@ export class WorkflowExecutor {
return executionBatches
}
/**
* Resolve a JSONPath value from the context
*/
private resolveJSONPathValue(expr: string, context: ExecutionContext): any {
if (expr.startsWith('$')) {
const result = JSONPath({
json: context,
path: expr,
wrap: false
})
// Return first result if array, otherwise the result itself
return Array.isArray(result) && result.length > 0 ? result[0] : result
}
return expr
}
/**
* Resolve step input using JSONPath expressions
*/
@@ -486,14 +623,14 @@ export class WorkflowExecutor {
path: value,
wrap: false
})
this.logger.debug({
key,
jsonPath: value,
result: JSON.stringify(result).substring(0, 200),
resultType: Array.isArray(result) ? 'array' : typeof result
}, 'JSONPath resolved successfully')
resolved[key] = result
} catch (error) {
this.logger.warn({
@@ -510,7 +647,7 @@ export class WorkflowExecutor {
key,
nestedKeys: Object.keys(value as Record<string, unknown>)
}, 'Recursively resolving nested object')
resolved[key] = this.resolveStepInput(value as Record<string, unknown>, context)
} else {
// Keep literal values as-is
@@ -531,22 +668,22 @@ export class WorkflowExecutor {
*/
private safeSerialize(obj: unknown): unknown {
const seen = new WeakSet()
const serialize = (value: unknown): unknown => {
if (value === null || typeof value !== 'object') {
return value
}
if (seen.has(value as object)) {
if (seen.has(value)) {
return '[Circular Reference]'
}
seen.add(value as object)
seen.add(value)
if (Array.isArray(value)) {
return value.map(serialize)
}
const result: Record<string, unknown> = {}
for (const [key, val] of Object.entries(value as Record<string, unknown>)) {
try {
@@ -560,94 +697,13 @@ export class WorkflowExecutor {
result[key] = '[Non-serializable]'
}
}
return result
}
return serialize(obj)
}
/**
* Extracts detailed error information from job logs and input
*/
private extractErrorDetailsFromJob(job: any, stepContext: any, stepName: string) {
try {
// Get error information from multiple sources
const input = stepContext.input || {}
const logs = job.log || []
const latestLog = logs[logs.length - 1]
// Extract error message from job error or log
const errorMessage = job.error?.message || latestLog?.error?.message || 'Unknown error'
// For timeout scenarios, check if it's a timeout based on duration and timeout setting
let errorType = this.classifyErrorType(errorMessage)
// Special handling for HTTP timeouts - if task failed and duration exceeds timeout, it's likely a timeout
if (errorType === 'unknown' && input.timeout && stepContext.executionInfo?.duration) {
const timeoutMs = parseInt(input.timeout) || 30000
const actualDuration = stepContext.executionInfo.duration
// If execution duration is close to or exceeds timeout, classify as timeout
if (actualDuration >= (timeoutMs * 0.9)) { // 90% of timeout threshold
errorType = 'timeout'
this.logger.debug({
timeoutMs,
actualDuration,
stepName
}, 'Classified error as timeout based on duration analysis')
}
}
// Calculate duration from execution info if available
const duration = stepContext.executionInfo?.duration || 0
// Extract attempt count from logs
const attempts = job.totalTried || 1
return {
stepId: `${stepName}-${Date.now()}`,
errorType,
duration,
attempts,
finalError: errorMessage,
context: {
url: input.url,
method: input.method,
timeout: input.timeout,
statusCode: latestLog?.output?.status,
headers: input.headers
},
timestamp: new Date().toISOString()
}
} catch (error) {
this.logger.warn({
error: error instanceof Error ? error.message : 'Unknown error',
stepName
}, 'Failed to extract error details from job')
return null
}
}
/**
* Classifies error types based on error messages
*/
private classifyErrorType(errorMessage: string): string {
if (errorMessage.includes('timeout') || errorMessage.includes('ETIMEDOUT')) {
return 'timeout'
}
if (errorMessage.includes('ENOTFOUND') || errorMessage.includes('getaddrinfo')) {
return 'dns'
}
if (errorMessage.includes('ECONNREFUSED') || errorMessage.includes('ECONNRESET')) {
return 'connection'
}
if (errorMessage.includes('network') || errorMessage.includes('fetch')) {
return 'network'
}
return 'unknown'
}
/**
* Update workflow run with current context
*/
@@ -695,16 +751,16 @@ export class WorkflowExecutor {
try {
// Check if this is a comparison expression
const comparisonMatch = condition.match(/^(.+?)\s*(==|!=|>|<|>=|<=)\s*(.+)$/)
if (comparisonMatch) {
const [, leftExpr, operator, rightExpr] = comparisonMatch
// Evaluate left side (should be JSONPath)
const leftValue = this.resolveJSONPathValue(leftExpr.trim(), context)
// Parse right side (could be string, number, boolean, or JSONPath)
const rightValue = this.parseConditionValue(rightExpr.trim(), context)
this.logger.debug({
condition,
leftExpr: leftExpr.trim(),
@@ -715,32 +771,32 @@ export class WorkflowExecutor {
leftType: typeof leftValue,
rightType: typeof rightValue
}, 'Evaluating comparison condition')
// Perform comparison
let result: boolean
switch (operator) {
case '==':
result = leftValue === rightValue
break
case '!=':
result = leftValue !== rightValue
break
case '>':
result = Number(leftValue) > Number(rightValue)
break
case '<':
result = Number(leftValue) < Number(rightValue)
break
case '>=':
result = Number(leftValue) >= Number(rightValue)
break
case '<=':
result = Number(leftValue) <= Number(rightValue)
break
case '==':
result = leftValue === rightValue
break
case '>':
result = Number(leftValue) > Number(rightValue)
break
case '>=':
result = Number(leftValue) >= Number(rightValue)
break
default:
throw new Error(`Unknown comparison operator: ${operator}`)
}
this.logger.debug({
condition,
result,
@@ -748,7 +804,7 @@ export class WorkflowExecutor {
rightValue,
operator
}, 'Comparison condition evaluation completed')
return result
} else {
// Treat as simple JSONPath boolean evaluation
@@ -792,49 +848,6 @@ export class WorkflowExecutor {
return false
}
}
/**
* Resolve a JSONPath value from the context
*/
private resolveJSONPathValue(expr: string, context: ExecutionContext): any {
if (expr.startsWith('$')) {
const result = JSONPath({
json: context,
path: expr,
wrap: false
})
// Return first result if array, otherwise the result itself
return Array.isArray(result) && result.length > 0 ? result[0] : result
}
return expr
}
/**
* Parse a condition value (string literal, number, boolean, or JSONPath)
*/
private parseConditionValue(expr: string, context: ExecutionContext): any {
// Handle string literals
if ((expr.startsWith('"') && expr.endsWith('"')) || (expr.startsWith("'") && expr.endsWith("'"))) {
return expr.slice(1, -1) // Remove quotes
}
// Handle boolean literals
if (expr === 'true') return true
if (expr === 'false') return false
// Handle number literals
if (/^-?\d+(\.\d+)?$/.test(expr)) {
return Number(expr)
}
// Handle JSONPath expressions
if (expr.startsWith('$')) {
return this.resolveJSONPathValue(expr, context)
}
// Return as string if nothing else matches
return expr
}
/**
* Execute a workflow with the given context
@@ -977,148 +990,4 @@ export class WorkflowExecutor {
throw error
}
}
/**
* Find and execute workflows triggered by a collection operation
*/
async executeTriggeredWorkflows(
collection: string,
operation: 'create' | 'delete' | 'read' | 'update',
doc: unknown,
previousDoc: unknown,
req: PayloadRequest
): Promise<void> {
this.logger.info({
collection,
operation,
docId: (doc as any)?.id
}, 'executeTriggeredWorkflows called')
try {
// Find workflows with matching triggers
const workflows = await this.payload.find({
collection: 'workflows',
depth: 2, // Include steps and triggers
limit: 100,
req
})
this.logger.info({
workflowCount: workflows.docs.length
}, 'Found workflows to check')
for (const workflow of workflows.docs) {
// Check if this workflow has a matching trigger
const triggers = workflow.triggers as Array<{
condition?: string
type: string
parameters?: {
collection?: string
collectionSlug?: string
operation?: string
[key: string]: any
}
}>
this.logger.debug({
workflowId: workflow.id,
workflowName: workflow.name,
triggerCount: triggers?.length || 0
}, 'Checking workflow triggers')
const matchingTriggers = triggers?.filter(trigger =>
trigger.type === 'collection-trigger' &&
(trigger.parameters?.collection === collection || trigger.parameters?.collectionSlug === collection) &&
trigger.parameters?.operation === operation
) || []
this.logger.info({
workflowId: workflow.id,
workflowName: workflow.name,
matchingTriggerCount: matchingTriggers.length,
targetCollection: collection,
targetOperation: operation
}, 'Matching triggers found')
for (const trigger of matchingTriggers) {
this.logger.info({
workflowId: workflow.id,
workflowName: workflow.name,
triggerDetails: {
type: trigger.type,
collection: trigger.parameters?.collection,
collectionSlug: trigger.parameters?.collectionSlug,
operation: trigger.parameters?.operation,
hasCondition: !!trigger.condition
}
}, 'Processing matching trigger - about to execute workflow')
// Create execution context for condition evaluation
const context: ExecutionContext = {
steps: {},
trigger: {
type: 'collection',
collection,
doc,
operation,
previousDoc,
req
}
}
// Check trigger condition if present
if (trigger.condition) {
this.logger.debug({
collection,
operation,
condition: trigger.condition,
workflowId: workflow.id,
workflowName: workflow.name
}, 'Evaluating trigger condition')
const conditionMet = this.evaluateCondition(trigger.condition, context)
if (!conditionMet) {
this.logger.info({
collection,
condition: trigger.condition,
operation,
workflowId: workflow.id,
workflowName: workflow.name,
docSnapshot: JSON.stringify(doc).substring(0, 200)
}, 'Trigger condition not met, skipping workflow')
continue
}
this.logger.info({
collection,
condition: trigger.condition,
operation,
workflowId: workflow.id,
workflowName: workflow.name,
docSnapshot: JSON.stringify(doc).substring(0, 200)
}, 'Trigger condition met')
}
this.logger.info({
collection,
operation,
workflowId: workflow.id,
workflowName: workflow.name
}, 'Triggering workflow')
// Execute the workflow
await this.execute(workflow as PayloadWorkflow, context, req)
}
}
} catch (error) {
this.logger.error({ error: error instanceof Error ? error.message : 'Unknown error' }, 'Workflow execution failed')
this.logger.error({
collection,
error: error instanceof Error ? error.message : 'Unknown error',
operation
}, 'Failed to execute triggered workflows')
}
}
}