mirror of
https://github.com/xtr-dev/payload-automation.git
synced 2025-12-10 08:53:23 +00:00
Refactor: Remove executorRegistry and simplify to on-demand creation
- Remove executorRegistry singleton pattern - Create WorkflowExecutor on-demand in each hook execution - Replace all 'any' types with proper TypeScript types - Use CollectionAfterChangeHook and PayloadRequest types - Simplify code by removing unnecessary state management Benefits: - Simpler, more maintainable code - No shared state to manage - Each hook execution is independent - Proper TypeScript typing throughout
This commit is contained in:
@@ -1,6 +1,11 @@
|
|||||||
import type {Config} from 'payload'
|
import type {
|
||||||
|
CollectionAfterChangeHook,
|
||||||
|
Config,
|
||||||
|
PayloadRequest,
|
||||||
|
TypeWithID
|
||||||
|
} from 'payload'
|
||||||
|
|
||||||
import type {CollectionTriggerConfigCrud, WorkflowsPluginConfig} from "./config-types.js"
|
import type {WorkflowsPluginConfig} from "./config-types.js"
|
||||||
|
|
||||||
import {createWorkflowCollection} from '../collections/Workflow.js'
|
import {createWorkflowCollection} from '../collections/Workflow.js'
|
||||||
import {WorkflowRunsCollection} from '../collections/WorkflowRuns.js'
|
import {WorkflowRunsCollection} from '../collections/WorkflowRuns.js'
|
||||||
@@ -13,60 +18,46 @@ import {getConfigLogger, initializeLogger} from './logger.js'
|
|||||||
|
|
||||||
export {getLogger} from './logger.js'
|
export {getLogger} from './logger.js'
|
||||||
|
|
||||||
// Improved executor registry with proper error handling and logging
|
/**
|
||||||
interface ExecutorRegistry {
|
* Helper function to create failed workflow runs for tracking errors
|
||||||
executor: null | WorkflowExecutor
|
*/
|
||||||
isInitialized: boolean
|
const createFailedWorkflowRun = async (
|
||||||
logger: any | null
|
collectionSlug: string,
|
||||||
}
|
operation: string,
|
||||||
|
doc: TypeWithID,
|
||||||
const executorRegistry: ExecutorRegistry = {
|
previousDoc: TypeWithID,
|
||||||
executor: null,
|
req: PayloadRequest,
|
||||||
isInitialized: false,
|
errorMessage: string
|
||||||
logger: null
|
): Promise<void> => {
|
||||||
}
|
|
||||||
|
|
||||||
const setWorkflowExecutor = (executor: WorkflowExecutor, logger: any) => {
|
|
||||||
executorRegistry.executor = executor
|
|
||||||
executorRegistry.logger = logger
|
|
||||||
executorRegistry.isInitialized = true
|
|
||||||
|
|
||||||
logger.info('Workflow executor initialized and registered successfully')
|
|
||||||
}
|
|
||||||
|
|
||||||
const getExecutorRegistry = (): ExecutorRegistry => {
|
|
||||||
return executorRegistry
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to create failed workflow runs for tracking errors
|
|
||||||
const createFailedWorkflowRun = async (args: any, errorMessage: string, logger: any) => {
|
|
||||||
try {
|
try {
|
||||||
// Only create failed workflow runs if we have enough context
|
const logger = req?.payload?.logger || console
|
||||||
if (!args?.req?.payload || !args?.collection?.slug) {
|
|
||||||
|
// Only create failed workflow runs if we have a payload instance
|
||||||
|
if (!req?.payload || !collectionSlug) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find workflows that should have been triggered
|
// Find workflows that should have been triggered
|
||||||
const workflows = await args.req.payload.find({
|
const workflows = await req.payload.find({
|
||||||
collection: 'workflows',
|
collection: 'workflows',
|
||||||
limit: 10,
|
limit: 10,
|
||||||
req: args.req,
|
req,
|
||||||
where: {
|
where: {
|
||||||
'triggers.collectionSlug': {
|
'triggers.parameters.collectionSlug': {
|
||||||
equals: args.collection.slug
|
equals: collectionSlug
|
||||||
},
|
},
|
||||||
'triggers.operation': {
|
'triggers.parameters.operation': {
|
||||||
equals: args.operation
|
equals: operation
|
||||||
},
|
},
|
||||||
'triggers.type': {
|
'triggers.type': {
|
||||||
equals: 'collection-trigger'
|
equals: 'collection'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create failed workflow runs for each matching workflow
|
// Create failed workflow runs for each matching workflow
|
||||||
for (const workflow of workflows.docs) {
|
for (const workflow of workflows.docs) {
|
||||||
await args.req.payload.create({
|
await req.payload.create({
|
||||||
collection: 'workflow-runs',
|
collection: 'workflow-runs',
|
||||||
data: {
|
data: {
|
||||||
completedAt: new Date().toISOString(),
|
completedAt: new Date().toISOString(),
|
||||||
@@ -74,10 +65,10 @@ const createFailedWorkflowRun = async (args: any, errorMessage: string, logger:
|
|||||||
steps: {},
|
steps: {},
|
||||||
trigger: {
|
trigger: {
|
||||||
type: 'collection',
|
type: 'collection',
|
||||||
collection: args.collection.slug,
|
collection: collectionSlug,
|
||||||
doc: args.doc,
|
doc,
|
||||||
operation: args.operation,
|
operation,
|
||||||
previousDoc: args.previousDoc,
|
previousDoc,
|
||||||
triggeredAt: new Date().toISOString()
|
triggeredAt: new Date().toISOString()
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -92,11 +83,11 @@ const createFailedWorkflowRun = async (args: any, errorMessage: string, logger:
|
|||||||
startedAt: new Date().toISOString(),
|
startedAt: new Date().toISOString(),
|
||||||
status: 'failed',
|
status: 'failed',
|
||||||
steps: [],
|
steps: [],
|
||||||
triggeredBy: args?.req?.user?.email || 'system',
|
triggeredBy: req?.user?.email || 'system',
|
||||||
workflow: workflow.id,
|
workflow: workflow.id,
|
||||||
workflowVersion: 1
|
workflowVersion: 1
|
||||||
},
|
},
|
||||||
req: args.req
|
req
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,6 +100,7 @@ const createFailedWorkflowRun = async (args: any, errorMessage: string, logger:
|
|||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// Don't let workflow run creation failures break the original operation
|
// Don't let workflow run creation failures break the original operation
|
||||||
|
const logger = req?.payload?.logger || console
|
||||||
logger.warn({
|
logger.warn({
|
||||||
error: error instanceof Error ? error.message : 'Unknown error'
|
error: error instanceof Error ? error.message : 'Unknown error'
|
||||||
}, 'Failed to create failed workflow run record')
|
}, 'Failed to create failed workflow run record')
|
||||||
@@ -127,8 +119,70 @@ const applyCollectionsConfig = <T extends string>(pluginOptions: WorkflowsPlugin
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removed config-phase hook registration - user collections don't exist during config phase
|
/**
|
||||||
|
* Create a collection hook that executes workflows
|
||||||
|
*/
|
||||||
|
const createAutomationHook = <T extends TypeWithID>(): CollectionAfterChangeHook<T> => {
|
||||||
|
return async function payloadAutomationHook(args) {
|
||||||
|
const logger = args.req?.payload?.logger || console
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info({
|
||||||
|
collection: args.collection?.slug,
|
||||||
|
docId: args.doc?.id,
|
||||||
|
hookType: 'automation',
|
||||||
|
operation: args.operation
|
||||||
|
}, 'Collection automation hook triggered')
|
||||||
|
|
||||||
|
// Create executor on-demand
|
||||||
|
const executor = new WorkflowExecutor(args.req.payload, logger)
|
||||||
|
|
||||||
|
logger.debug('Executing triggered workflows...')
|
||||||
|
await executor.executeTriggeredWorkflows(
|
||||||
|
args.collection.slug,
|
||||||
|
args.operation,
|
||||||
|
args.doc,
|
||||||
|
args.previousDoc,
|
||||||
|
args.req
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info({
|
||||||
|
collection: args.collection?.slug,
|
||||||
|
docId: args.doc?.id,
|
||||||
|
operation: args.operation
|
||||||
|
}, 'Workflow execution completed successfully')
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||||
|
|
||||||
|
logger.error({
|
||||||
|
collection: args.collection?.slug,
|
||||||
|
docId: args.doc?.id,
|
||||||
|
error: errorMessage,
|
||||||
|
errorStack: error instanceof Error ? error.stack : undefined,
|
||||||
|
operation: args.operation
|
||||||
|
}, 'Hook execution failed')
|
||||||
|
|
||||||
|
// Create a failed workflow run to track this error
|
||||||
|
try {
|
||||||
|
await createFailedWorkflowRun(
|
||||||
|
args.collection.slug,
|
||||||
|
args.operation,
|
||||||
|
args.doc,
|
||||||
|
args.previousDoc,
|
||||||
|
args.req,
|
||||||
|
errorMessage
|
||||||
|
)
|
||||||
|
} catch (createError) {
|
||||||
|
logger.error({
|
||||||
|
error: createError instanceof Error ? createError.message : 'Unknown error'
|
||||||
|
}, 'Failed to create workflow run for hook error')
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't throw to prevent breaking the original operation
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export const workflowsPlugin =
|
export const workflowsPlugin =
|
||||||
<TSlug extends string>(pluginOptions: WorkflowsPluginConfig<TSlug>) =>
|
<TSlug extends string>(pluginOptions: WorkflowsPluginConfig<TSlug>) =>
|
||||||
@@ -165,102 +219,14 @@ export const workflowsPlugin =
|
|||||||
collection.hooks.afterChange = []
|
collection.hooks.afterChange = []
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a reliable hook function with proper dependency injection
|
|
||||||
const automationHook = Object.assign(
|
|
||||||
async function payloadAutomationHook(args: any) {
|
|
||||||
const registry = getExecutorRegistry()
|
|
||||||
|
|
||||||
// Use proper logger if available, fallback to args.req.payload.logger
|
|
||||||
const logger = registry.logger || args?.req?.payload?.logger || console
|
|
||||||
|
|
||||||
try {
|
|
||||||
logger.info({
|
|
||||||
collection: args?.collection?.slug,
|
|
||||||
docId: args?.doc?.id,
|
|
||||||
hookType: 'automation',
|
|
||||||
operation: args?.operation
|
|
||||||
}, 'Collection automation hook triggered')
|
|
||||||
|
|
||||||
if (!registry.isInitialized) {
|
|
||||||
logger.warn('Workflow executor not yet initialized, attempting lazy initialization')
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Try to create executor if we have a payload instance
|
|
||||||
if (args.req?.payload) {
|
|
||||||
logger.info('Creating workflow executor via lazy initialization')
|
|
||||||
const { WorkflowExecutor } = await import('../core/workflow-executor.js')
|
|
||||||
const executor = new WorkflowExecutor(args.req.payload, logger)
|
|
||||||
setWorkflowExecutor(executor, logger)
|
|
||||||
logger.info('Lazy initialization successful')
|
|
||||||
} else {
|
|
||||||
logger.error('Cannot lazy initialize - no payload instance available')
|
|
||||||
await createFailedWorkflowRun(args, 'Workflow executor not initialized and lazy initialization failed - no payload instance', logger)
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('Lazy initialization failed:', error)
|
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
|
||||||
await createFailedWorkflowRun(args, `Workflow executor lazy initialization failed: ${errorMessage}`, logger)
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Re-check registry after potential lazy initialization
|
|
||||||
const updatedRegistry = getExecutorRegistry()
|
|
||||||
if (!updatedRegistry.executor) {
|
|
||||||
logger.error('Workflow executor is null despite being marked as initialized')
|
|
||||||
// Create a failed workflow run to track this issue
|
|
||||||
await createFailedWorkflowRun(args, 'Executor not available after initialization', logger)
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug('Executing triggered workflows...')
|
|
||||||
await updatedRegistry.executor.executeTriggeredWorkflows(
|
|
||||||
args.collection.slug,
|
|
||||||
args.operation,
|
|
||||||
args.doc,
|
|
||||||
args.previousDoc,
|
|
||||||
args.req
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info({
|
|
||||||
collection: args?.collection?.slug,
|
|
||||||
docId: args?.doc?.id,
|
|
||||||
operation: args?.operation
|
|
||||||
}, 'Workflow execution completed successfully')
|
|
||||||
|
|
||||||
} catch (error) {
|
|
||||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
|
||||||
|
|
||||||
logger.error({
|
|
||||||
collection: args?.collection?.slug,
|
|
||||||
docId: args?.doc?.id,
|
|
||||||
error: errorMessage,
|
|
||||||
errorStack: error instanceof Error ? error.stack : undefined,
|
|
||||||
operation: args?.operation
|
|
||||||
}, 'Hook execution failed')
|
|
||||||
|
|
||||||
// Create a failed workflow run to track this error
|
|
||||||
try {
|
|
||||||
await createFailedWorkflowRun(args, errorMessage, logger)
|
|
||||||
} catch (createError) {
|
|
||||||
logger.error({
|
|
||||||
error: createError instanceof Error ? createError.message : 'Unknown error'
|
|
||||||
}, 'Failed to create workflow run for hook error')
|
|
||||||
}
|
|
||||||
|
|
||||||
// Don't throw to prevent breaking the original operation
|
|
||||||
}
|
|
||||||
|
|
||||||
return undefined
|
|
||||||
},
|
|
||||||
{
|
|
||||||
__isAutomationHook: true,
|
|
||||||
__version: '0.0.22'
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// Add the hook to the collection config
|
// Add the hook to the collection config
|
||||||
|
const automationHook = createAutomationHook()
|
||||||
|
// Mark it for debugging
|
||||||
|
Object.defineProperty(automationHook, '__isAutomationHook', {
|
||||||
|
value: true,
|
||||||
|
enumerable: false
|
||||||
|
})
|
||||||
|
|
||||||
collection.hooks.afterChange.push(automationHook)
|
collection.hooks.afterChange.push(automationHook)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -269,8 +235,6 @@ export const workflowsPlugin =
|
|||||||
config.jobs = {tasks: []}
|
config.jobs = {tasks: []}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
for (const step of pluginOptions.steps) {
|
for (const step of pluginOptions.steps) {
|
||||||
if (!config.jobs?.tasks?.find(task => task.slug === step.slug)) {
|
if (!config.jobs?.tasks?.find(task => task.slug === step.slug)) {
|
||||||
config.jobs?.tasks?.push(step)
|
config.jobs?.tasks?.push(step)
|
||||||
@@ -280,10 +244,9 @@ export const workflowsPlugin =
|
|||||||
// Initialize webhook endpoint
|
// Initialize webhook endpoint
|
||||||
initWebhookEndpoint(config, pluginOptions.webhookPrefix || 'webhook')
|
initWebhookEndpoint(config, pluginOptions.webhookPrefix || 'webhook')
|
||||||
|
|
||||||
// Set up onInit to register collection hooks and initialize features
|
// Set up onInit to initialize features
|
||||||
const incomingOnInit = config.onInit
|
const incomingOnInit = config.onInit
|
||||||
config.onInit = async (payload) => {
|
config.onInit = async (payload) => {
|
||||||
|
|
||||||
// Execute any existing onInit functions first
|
// Execute any existing onInit functions first
|
||||||
if (incomingOnInit) {
|
if (incomingOnInit) {
|
||||||
await incomingOnInit(payload)
|
await incomingOnInit(payload)
|
||||||
@@ -296,17 +259,9 @@ export const workflowsPlugin =
|
|||||||
// Log collection trigger configuration
|
// Log collection trigger configuration
|
||||||
logger.info(`Plugin configuration: ${Object.keys(pluginOptions.collectionTriggers || {}).length} collection triggers, ${pluginOptions.steps?.length || 0} steps`)
|
logger.info(`Plugin configuration: ${Object.keys(pluginOptions.collectionTriggers || {}).length} collection triggers, ${pluginOptions.steps?.length || 0} steps`)
|
||||||
|
|
||||||
// Create workflow executor instance
|
|
||||||
logger.debug('Creating workflow executor instance')
|
|
||||||
const executor = new WorkflowExecutor(payload, logger)
|
|
||||||
|
|
||||||
// Register executor with proper dependency injection
|
|
||||||
setWorkflowExecutor(executor, logger)
|
|
||||||
|
|
||||||
// Hooks are now registered during config phase - just log status
|
|
||||||
logger.info('Hooks were registered during config phase - executor now available')
|
|
||||||
|
|
||||||
logger.info('Initializing global hooks...')
|
logger.info('Initializing global hooks...')
|
||||||
|
// Create executor for global hooks
|
||||||
|
const executor = new WorkflowExecutor(payload, logger)
|
||||||
initGlobalHooks(payload, logger, executor)
|
initGlobalHooks(payload, logger, executor)
|
||||||
|
|
||||||
logger.info('Initializing workflow hooks...')
|
logger.info('Initializing workflow hooks...')
|
||||||
@@ -315,7 +270,6 @@ export const workflowsPlugin =
|
|||||||
logger.info('Initializing step tasks...')
|
logger.info('Initializing step tasks...')
|
||||||
initStepTasks(pluginOptions, payload, logger)
|
initStepTasks(pluginOptions, payload, logger)
|
||||||
|
|
||||||
|
|
||||||
logger.info('Plugin initialized successfully - all hooks registered')
|
logger.info('Plugin initialized successfully - all hooks registered')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user