mirror of
https://github.com/xtr-dev/payload-automation.git
synced 2025-12-11 09:13:24 +00:00
Implement independent error storage system and comprehensive improvements
Major Features: • Add persistent error tracking for timeout/network failures that bypasses PayloadCMS output limitations • Implement smart error classification (timeout, DNS, connection, network) with duration-based detection • Add comprehensive test infrastructure with MongoDB in-memory testing and enhanced mocking • Fix HTTP request handler error preservation with detailed context storage • Add independent execution tracking with success/failure status and duration metrics Technical Improvements: • Update JSONPath documentation to use correct $.trigger.doc syntax across all step types • Fix PayloadCMS job execution to use runByID instead of run() for reliable task processing • Add enhanced HTTP error handling that preserves outputs for 4xx/5xx status codes • Implement proper nock configuration with undici for Node.js 22 fetch interception • Add comprehensive unit tests for WorkflowExecutor with mocked PayloadCMS instances Developer Experience: • Add detailed error information in workflow context with URL, method, timeout, attempts • Update README with HTTP error handling patterns and enhanced error tracking examples • Add test helpers and setup infrastructure for reliable integration testing • Fix workflow step validation and JSONPath field descriptions Breaking Changes: None - fully backward compatible 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -154,15 +154,27 @@ export class WorkflowExecutor {
|
||||
error: undefined,
|
||||
input: undefined,
|
||||
output: undefined,
|
||||
state: 'running'
|
||||
state: 'running',
|
||||
_startTime: Date.now() // Track execution start time for independent duration tracking
|
||||
}
|
||||
|
||||
// Move taskSlug declaration outside try block so it's accessible in catch
|
||||
const taskSlug = step.step // Use the 'step' field for task type
|
||||
|
||||
try {
|
||||
// Extract input data from step - PayloadCMS flattens inputSchema fields to step level
|
||||
const inputFields: Record<string, unknown> = {}
|
||||
|
||||
// Get all fields except the core step fields
|
||||
const coreFields = ['step', 'name', 'dependencies', 'condition']
|
||||
for (const [key, value] of Object.entries(step)) {
|
||||
if (!coreFields.includes(key)) {
|
||||
inputFields[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve input data using JSONPath
|
||||
const resolvedInput = this.resolveStepInput(step.input as Record<string, unknown> || {}, context)
|
||||
const resolvedInput = this.resolveStepInput(inputFields, context)
|
||||
context.steps[stepName].input = resolvedInput
|
||||
|
||||
if (!taskSlug) {
|
||||
@@ -182,11 +194,21 @@ export class WorkflowExecutor {
|
||||
task: taskSlug
|
||||
})
|
||||
|
||||
// Run the job immediately
|
||||
await this.payload.jobs.run({
|
||||
limit: 1,
|
||||
// Run the specific job immediately and wait for completion
|
||||
this.logger.info({ jobId: job.id }, 'Running job immediately using runByID')
|
||||
const runResults = await this.payload.jobs.runByID({
|
||||
id: job.id,
|
||||
req
|
||||
})
|
||||
|
||||
this.logger.info({
|
||||
jobId: job.id,
|
||||
runResult: runResults,
|
||||
hasResult: !!runResults
|
||||
}, 'Job run completed')
|
||||
|
||||
// Give a small delay to ensure job is fully processed
|
||||
await new Promise(resolve => setTimeout(resolve, 100))
|
||||
|
||||
// Get the job result
|
||||
const completedJob = await this.payload.findByID({
|
||||
@@ -195,6 +217,13 @@ export class WorkflowExecutor {
|
||||
req
|
||||
})
|
||||
|
||||
this.logger.info({
|
||||
jobId: job.id,
|
||||
totalTried: completedJob.totalTried,
|
||||
hasError: completedJob.hasError,
|
||||
taskStatus: completedJob.taskStatus ? Object.keys(completedJob.taskStatus) : 'null'
|
||||
}, 'Retrieved job results')
|
||||
|
||||
const taskStatus = completedJob.taskStatus?.[completedJob.taskSlug]?.[completedJob.totalTried]
|
||||
const isComplete = taskStatus?.complete === true
|
||||
const hasError = completedJob.hasError || !isComplete
|
||||
@@ -213,9 +242,37 @@ export class WorkflowExecutor {
|
||||
errorMessage = completedJob.error.message || completedJob.error
|
||||
}
|
||||
|
||||
// Final fallback to generic message
|
||||
// Try to get error from task output if available
|
||||
if (!errorMessage && taskStatus?.output?.error) {
|
||||
errorMessage = taskStatus.output.error
|
||||
}
|
||||
|
||||
// Check if task handler returned with state='failed'
|
||||
if (!errorMessage && taskStatus?.state === 'failed') {
|
||||
errorMessage = 'Task handler returned a failed state'
|
||||
// Try to get more specific error from output
|
||||
if (taskStatus.output?.error) {
|
||||
errorMessage = taskStatus.output.error
|
||||
}
|
||||
}
|
||||
|
||||
// Check for network errors in the job data
|
||||
if (!errorMessage && completedJob.result) {
|
||||
const result = completedJob.result
|
||||
if (result.error) {
|
||||
errorMessage = result.error
|
||||
}
|
||||
}
|
||||
|
||||
// Final fallback to generic message with more detail
|
||||
if (!errorMessage) {
|
||||
errorMessage = `Task ${taskSlug} failed without detailed error information`
|
||||
const jobDetails = {
|
||||
taskSlug,
|
||||
hasError: completedJob.hasError,
|
||||
taskStatus: taskStatus?.complete,
|
||||
totalTried: completedJob.totalTried
|
||||
}
|
||||
errorMessage = `Task ${taskSlug} failed without detailed error information. Job details: ${JSON.stringify(jobDetails)}`
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,6 +293,30 @@ export class WorkflowExecutor {
|
||||
context.steps[stepName].error = result.error
|
||||
}
|
||||
|
||||
// Independent execution tracking (not dependent on PayloadCMS task status)
|
||||
context.steps[stepName].executionInfo = {
|
||||
completed: true, // Step execution completed (regardless of success/failure)
|
||||
success: result.state === 'succeeded',
|
||||
executedAt: new Date().toISOString(),
|
||||
duration: Date.now() - (context.steps[stepName]._startTime || Date.now())
|
||||
}
|
||||
|
||||
// For failed steps, try to extract detailed error information from the job logs
|
||||
// This approach is more reliable than external storage and persists with the workflow
|
||||
if (result.state === 'failed') {
|
||||
const errorDetails = this.extractErrorDetailsFromJob(completedJob, context.steps[stepName], stepName)
|
||||
if (errorDetails) {
|
||||
context.steps[stepName].errorDetails = errorDetails
|
||||
|
||||
this.logger.info({
|
||||
stepName,
|
||||
errorType: errorDetails.errorType,
|
||||
duration: errorDetails.duration,
|
||||
attempts: errorDetails.attempts
|
||||
}, 'Extracted detailed error information for failed step')
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug({context}, 'Step execution context')
|
||||
|
||||
if (result.state !== 'succeeded') {
|
||||
@@ -257,6 +338,15 @@ export class WorkflowExecutor {
|
||||
context.steps[stepName].state = 'failed'
|
||||
context.steps[stepName].error = errorMessage
|
||||
|
||||
// Independent execution tracking for failed steps
|
||||
context.steps[stepName].executionInfo = {
|
||||
completed: true, // Execution attempted and completed (even if it failed)
|
||||
success: false,
|
||||
executedAt: new Date().toISOString(),
|
||||
duration: Date.now() - (context.steps[stepName]._startTime || Date.now()),
|
||||
failureReason: errorMessage
|
||||
}
|
||||
|
||||
this.logger.error({
|
||||
error: errorMessage,
|
||||
input: context.steps[stepName].input,
|
||||
@@ -447,6 +537,87 @@ export class WorkflowExecutor {
|
||||
return serialize(obj)
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts detailed error information from job logs and input
|
||||
*/
|
||||
private extractErrorDetailsFromJob(job: any, stepContext: any, stepName: string) {
|
||||
try {
|
||||
// Get error information from multiple sources
|
||||
const input = stepContext.input || {}
|
||||
const logs = job.log || []
|
||||
const latestLog = logs[logs.length - 1]
|
||||
|
||||
// Extract error message from job error or log
|
||||
const errorMessage = job.error?.message || latestLog?.error?.message || 'Unknown error'
|
||||
|
||||
// For timeout scenarios, check if it's a timeout based on duration and timeout setting
|
||||
let errorType = this.classifyErrorType(errorMessage)
|
||||
|
||||
// Special handling for HTTP timeouts - if task failed and duration exceeds timeout, it's likely a timeout
|
||||
if (errorType === 'unknown' && input.timeout && stepContext.executionInfo?.duration) {
|
||||
const timeoutMs = parseInt(input.timeout) || 30000
|
||||
const actualDuration = stepContext.executionInfo.duration
|
||||
|
||||
// If execution duration is close to or exceeds timeout, classify as timeout
|
||||
if (actualDuration >= (timeoutMs * 0.9)) { // 90% of timeout threshold
|
||||
errorType = 'timeout'
|
||||
this.logger.debug({
|
||||
timeoutMs,
|
||||
actualDuration,
|
||||
stepName
|
||||
}, 'Classified error as timeout based on duration analysis')
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate duration from execution info if available
|
||||
const duration = stepContext.executionInfo?.duration || 0
|
||||
|
||||
// Extract attempt count from logs
|
||||
const attempts = job.totalTried || 1
|
||||
|
||||
return {
|
||||
stepId: `${stepName}-${Date.now()}`,
|
||||
errorType,
|
||||
duration,
|
||||
attempts,
|
||||
finalError: errorMessage,
|
||||
context: {
|
||||
url: input.url,
|
||||
method: input.method,
|
||||
timeout: input.timeout,
|
||||
statusCode: latestLog?.output?.status,
|
||||
headers: input.headers
|
||||
},
|
||||
timestamp: new Date().toISOString()
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stepName
|
||||
}, 'Failed to extract error details from job')
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Classifies error types based on error messages
|
||||
*/
|
||||
private classifyErrorType(errorMessage: string): string {
|
||||
if (errorMessage.includes('timeout') || errorMessage.includes('ETIMEDOUT')) {
|
||||
return 'timeout'
|
||||
}
|
||||
if (errorMessage.includes('ENOTFOUND') || errorMessage.includes('getaddrinfo')) {
|
||||
return 'dns'
|
||||
}
|
||||
if (errorMessage.includes('ECONNREFUSED') || errorMessage.includes('ECONNRESET')) {
|
||||
return 'connection'
|
||||
}
|
||||
if (errorMessage.includes('network') || errorMessage.includes('fetch')) {
|
||||
return 'network'
|
||||
}
|
||||
return 'unknown'
|
||||
}
|
||||
|
||||
/**
|
||||
* Update workflow run with current context
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user