diff --git a/dev/app/test-trigger/route.ts b/dev/app/test-trigger/route.ts new file mode 100644 index 0000000..d2f6391 --- /dev/null +++ b/dev/app/test-trigger/route.ts @@ -0,0 +1,122 @@ +import { NextResponse } from 'next/server' +import { getPayload } from 'payload' +import config from '../../payload.config' + +export async function GET() { + console.log('Starting workflow trigger test...') + + // Get payload instance + const payload = await getPayload({ config }) + + try { + // Create a test user + const user = await payload.create({ + collection: 'users', + data: { + email: `test-${Date.now()}@example.com`, + password: 'password123' + } + }) + + console.log('Created test user:', user.id) + + // Create a workflow with collection trigger + const workflow = await payload.create({ + collection: 'workflows', + data: { + name: 'Test Post Creation Workflow', + description: 'Triggers when a post is created', + triggers: [ + { + type: 'collection-trigger', + collectionSlug: 'posts', + operation: 'create' + } + ], + steps: [ + { + name: 'log-post', + taskSlug: 'http-request-step', + input: JSON.stringify({ + url: 'https://httpbin.org/post', + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: { + message: 'Post created', + postId: '$.trigger.doc.id', + postTitle: '$.trigger.doc.title' + } + }) + } + ] + }, + user: user.id + }) + + console.log('Created workflow:', workflow.id, workflow.name) + console.log('Workflow triggers:', JSON.stringify(workflow.triggers, null, 2)) + + // Create a post to trigger the workflow + console.log('Creating post to trigger workflow...') + const post = await payload.create({ + collection: 'posts', + data: { + title: 'Test Post', + content: 'This should trigger the workflow', + _status: 'published' + }, + user: user.id + }) + + console.log('Created post:', post.id) + + // Wait a bit for workflow to execute + await new Promise(resolve => setTimeout(resolve, 3000)) + + // Check for workflow runs + const runs = await payload.find({ + collection: 'workflow-runs', + where: { + workflow: { + equals: workflow.id + } + } + }) + + console.log('Workflow runs found:', runs.totalDocs) + + const result = { + success: runs.totalDocs > 0, + workflowId: workflow.id, + postId: post.id, + runsFound: runs.totalDocs, + runs: runs.docs.map(r => ({ + id: r.id, + status: r.status, + triggeredBy: r.triggeredBy, + startedAt: r.startedAt, + completedAt: r.completedAt, + error: r.error + })) + } + + if (runs.totalDocs > 0) { + console.log('✅ SUCCESS: Workflow was triggered!') + console.log('Run status:', runs.docs[0].status) + console.log('Run context:', JSON.stringify(runs.docs[0].context, null, 2)) + } else { + console.log('❌ FAILURE: Workflow was not triggered') + } + + return NextResponse.json(result) + + } catch (error) { + console.error('Test failed:', error) + return NextResponse.json({ + success: false, + error: error instanceof Error ? error.message : 'Unknown error' + }, { status: 500 }) + } +} \ No newline at end of file diff --git a/dev/payload-types.ts b/dev/payload-types.ts index 5c8bb84..b478c49 100644 --- a/dev/payload-types.ts +++ b/dev/payload-types.ts @@ -217,7 +217,7 @@ export interface Workflow { /** * Collection that triggers the workflow */ - collection?: 'posts' | null; + collectionSlug?: 'posts' | null; /** * Collection operation that triggers the workflow */ @@ -242,6 +242,10 @@ export interface Workflow { * Timezone for cron execution (e.g., "America/New_York", "Europe/London"). Defaults to UTC. */ timezone?: string | null; + /** + * JSONPath expression that must evaluate to true for this trigger to execute the workflow (e.g., "$.doc.status == 'published'") + */ + condition?: string | null; id?: string | null; }[] | null; @@ -262,6 +266,10 @@ export interface Workflow { * Step names that must complete before this step can run */ dependencies?: string[] | null; + /** + * JSONPath expression that must evaluate to true for this step to execute (e.g., "$.trigger.doc.status == 'published'") + */ + condition?: string | null; id?: string | null; }[] | null; @@ -584,13 +592,14 @@ export interface WorkflowsSelect { | T | { type?: T; - collection?: T; + collectionSlug?: T; operation?: T; webhookPath?: T; global?: T; globalOperation?: T; cronExpression?: T; timezone?: T; + condition?: T; id?: T; }; steps?: @@ -600,6 +609,7 @@ export interface WorkflowsSelect { name?: T; input?: T; dependencies?: T; + condition?: T; id?: T; }; updatedAt?: T; @@ -741,7 +751,7 @@ export interface TaskCreateDocument { /** * The collection slug to create a document in */ - collection: string; + collectionSlug: string; /** * The document data to create */ diff --git a/dev/simple-trigger.spec.ts b/dev/simple-trigger.spec.ts new file mode 100644 index 0000000..e082fa6 --- /dev/null +++ b/dev/simple-trigger.spec.ts @@ -0,0 +1,132 @@ +import { describe, it, expect, beforeAll, afterAll } from 'vitest' +import type { Payload } from 'payload' +import { getPayload } from 'payload' +import config from './payload.config' + +describe('Workflow Trigger Test', () => { + let payload: Payload + + beforeAll(async () => { + payload = await getPayload({ config: await config }) + }, 60000) + + afterAll(async () => { + if (!payload) return + + try { + // Clear test data + const workflows = await payload.find({ + collection: 'workflows', + limit: 100 + }) + + for (const workflow of workflows.docs) { + await payload.delete({ + collection: 'workflows', + id: workflow.id + }) + } + + const runs = await payload.find({ + collection: 'workflow-runs', + limit: 100 + }) + + for (const run of runs.docs) { + await payload.delete({ + collection: 'workflow-runs', + id: run.id + }) + } + + const posts = await payload.find({ + collection: 'posts', + limit: 100 + }) + + for (const post of posts.docs) { + await payload.delete({ + collection: 'posts', + id: post.id + }) + } + } catch (error) { + console.warn('Cleanup failed:', error) + } + }, 30000) + + it('should create a workflow run when a post is created', async () => { + // Create a workflow with collection trigger + const workflow = await payload.create({ + collection: 'workflows', + data: { + name: 'Test Post Creation Workflow', + description: 'Triggers when a post is created', + triggers: [ + { + type: 'collection-trigger', + collectionSlug: 'posts', + operation: 'create' + } + ], + steps: [ + { + name: 'log-post', + step: 'http-request-step', + input: { + url: 'https://httpbin.org/post', + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: { + message: 'Post created', + postId: '$.trigger.doc.id', + postTitle: '$.trigger.doc.content' + } + } + } + ] + } + }) + + expect(workflow).toBeDefined() + expect(workflow.id).toBeDefined() + + // Create a post to trigger the workflow + const post = await payload.create({ + collection: 'posts', + data: { + content: 'This should trigger the workflow' + } + }) + + expect(post).toBeDefined() + expect(post.id).toBeDefined() + + // Wait a bit for workflow to execute + await new Promise(resolve => setTimeout(resolve, 3000)) + + // Check for workflow runs + const runs = await payload.find({ + collection: 'workflow-runs', + where: { + workflow: { + equals: workflow.id + } + }, + limit: 10 + }) + + expect(runs.totalDocs).toBeGreaterThan(0) + expect(runs.docs[0].workflow).toBe(typeof workflow.id === 'object' ? workflow.id.toString() : workflow.id) + + console.log('✅ Workflow run created successfully!') + console.log(`Run status: ${runs.docs[0].status}`) + console.log(`Run ID: ${runs.docs[0].id}`) + + if (runs.docs[0].status === 'failed' && runs.docs[0].error) { + console.log(`Error: ${runs.docs[0].error}`) + } + }, 30000) +}) \ No newline at end of file diff --git a/dev/test-trigger.ts b/dev/test-trigger.ts new file mode 100644 index 0000000..2eb0e9e --- /dev/null +++ b/dev/test-trigger.ts @@ -0,0 +1,104 @@ +import type { Payload } from 'payload' +import { getPayload } from 'payload' +import config from './payload.config' + +async function testWorkflowTrigger() { + console.log('Starting workflow trigger test...') + + // Get payload instance + const payload = await getPayload({ config }) + + try { + // Create a test user + const user = await payload.create({ + collection: 'users', + data: { + email: 'test@example.com', + password: 'password123' + } + }) + + console.log('Created test user:', user.id) + + // Create a workflow with collection trigger + const workflow = await payload.create({ + collection: 'workflows', + data: { + name: 'Test Post Creation Workflow', + description: 'Triggers when a post is created', + triggers: [ + { + type: 'collection-trigger', + collectionSlug: 'posts', + operation: 'create' + } + ], + steps: [ + { + name: 'log-post', + step: 'http-request-step', + input: { + url: 'https://httpbin.org/post', + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: { + message: 'Post created', + postId: '$.trigger.doc.id', + postTitle: '$.trigger.doc.title' + } + } + } + ] + }, + user: user.id + }) + + console.log('Created workflow:', workflow.id) + + // Create a post to trigger the workflow + console.log('Creating post to trigger workflow...') + const post = await payload.create({ + collection: 'posts', + data: { + title: 'Test Post', + content: 'This should trigger the workflow', + _status: 'published' + }, + user: user.id + }) + + console.log('Created post:', post.id) + + // Wait a bit for workflow to execute + await new Promise(resolve => setTimeout(resolve, 2000)) + + // Check for workflow runs + const runs = await payload.find({ + collection: 'workflow-runs', + where: { + workflow: { + equals: workflow.id + } + } + }) + + console.log('Workflow runs found:', runs.totalDocs) + + if (runs.totalDocs > 0) { + console.log('✅ SUCCESS: Workflow was triggered!') + console.log('Run status:', runs.docs[0].status) + console.log('Run context:', JSON.stringify(runs.docs[0].context, null, 2)) + } else { + console.log('❌ FAILURE: Workflow was not triggered') + } + + } catch (error) { + console.error('Test failed:', error) + } finally { + await payload.shutdown() + } +} + +testWorkflowTrigger().catch(console.error) \ No newline at end of file diff --git a/package.json b/package.json index ececda7..8fd3c74 100644 --- a/package.json +++ b/package.json @@ -85,6 +85,7 @@ "react-dom": "19.1.0", "rimraf": "3.0.2", "sharp": "0.34.3", + "tsx": "^4.20.5", "typescript": "5.7.3", "vitest": "^3.1.2" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2c0d391..24b7e34 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -90,6 +90,9 @@ importers: sharp: specifier: 0.34.3 version: 0.34.3 + tsx: + specifier: ^4.20.5 + version: 4.20.5 typescript: specifier: 5.7.3 version: 5.7.3 @@ -9617,7 +9620,6 @@ snapshots: get-tsconfig: 4.10.1 optionalDependencies: fsevents: 2.3.3 - optional: true type-check@0.4.0: dependencies: diff --git a/src/core/trigger-custom-workflow.ts b/src/core/trigger-custom-workflow.ts index 5840a57..f4878da 100644 --- a/src/core/trigger-custom-workflow.ts +++ b/src/core/trigger-custom-workflow.ts @@ -1,7 +1,7 @@ import type { Payload, PayloadRequest } from 'payload' import { initializeLogger } from '../plugin/logger.js' -import { type Workflow, WorkflowExecutor } from './workflow-executor.js' +import { type PayloadWorkflow, WorkflowExecutor } from './workflow-executor.js' export interface CustomTriggerOptions { /** @@ -142,7 +142,7 @@ export async function triggerCustomWorkflow( } // Execute the workflow - await executor.execute(workflow as Workflow, context, workflowReq) + await executor.execute(workflow as PayloadWorkflow, context, workflowReq) // Get the latest run for this workflow to get the run ID const runs = await payload.find({ @@ -255,7 +255,7 @@ export async function triggerWorkflowById( // Create executor and execute const executor = new WorkflowExecutor(payload, logger) - await executor.execute(workflow as Workflow, context, workflowReq) + await executor.execute(workflow as PayloadWorkflow, context, workflowReq) // Get the latest run to get the run ID const runs = await payload.find({ diff --git a/src/core/workflow-executor.ts b/src/core/workflow-executor.ts index c77375a..8e5b6e4 100644 --- a/src/core/workflow-executor.ts +++ b/src/core/workflow-executor.ts @@ -1,31 +1,39 @@ import type { Payload, PayloadRequest } from 'payload' +// We need to reference the generated types dynamically since they're not available at build time +// Using generic types and casting where necessary +export type PayloadWorkflow = { + id: number + name: string + description?: string | null + triggers?: Array<{ + type?: string | null + collectionSlug?: string | null + operation?: string | null + condition?: string | null + [key: string]: unknown + }> | null + steps?: Array<{ + step?: string | null + name?: string | null + input?: unknown + dependencies?: string[] | null + condition?: string | null + [key: string]: unknown + }> | null + [key: string]: unknown +} + import { JSONPath } from 'jsonpath-plus' -export type Workflow = { - _version?: number - id: string - name: string - steps: WorkflowStep[] - triggers: WorkflowTrigger[] +// Helper type to extract workflow step data from the generated types +export type WorkflowStep = NonNullable[0] & { + name: string // Ensure name is always present for our execution logic } -export type WorkflowStep = { - condition?: string - dependencies?: string[] - input?: null | Record - name: string - step: string -} - -export interface WorkflowTrigger { - collection?: string - condition?: string - global?: string - globalOperation?: string - operation?: string - type: string - webhookPath?: string +// Helper type to extract workflow trigger data from the generated types +export type WorkflowTrigger = NonNullable[0] & { + type: string // Ensure type is always present for our execution logic } export interface ExecutionContext { @@ -154,7 +162,7 @@ export class WorkflowExecutor { try { // Resolve input data using JSONPath - const resolvedInput = this.resolveStepInput(step.input || {}, context) + const resolvedInput = this.resolveStepInput(step.input as Record || {}, context) context.steps[stepName].input = resolvedInput if (!taskSlug) { @@ -398,6 +406,47 @@ export class WorkflowExecutor { return resolved } + /** + * Safely serialize an object, handling circular references and non-serializable values + */ + private safeSerialize(obj: unknown): unknown { + const seen = new WeakSet() + + const serialize = (value: unknown): unknown => { + if (value === null || typeof value !== 'object') { + return value + } + + if (seen.has(value as object)) { + return '[Circular Reference]' + } + + seen.add(value as object) + + if (Array.isArray(value)) { + return value.map(serialize) + } + + const result: Record = {} + for (const [key, val] of Object.entries(value as Record)) { + try { + // Skip non-serializable properties that are likely internal database objects + if (key === 'table' || key === 'schema' || key === '_' || key === '__') { + continue + } + result[key] = serialize(val) + } catch { + // Skip properties that can't be accessed or serialized + result[key] = '[Non-serializable]' + } + } + + return result + } + + return serialize(obj) + } + /** * Update workflow run with current context */ @@ -407,14 +456,14 @@ export class WorkflowExecutor { req: PayloadRequest ): Promise { const serializeContext = () => ({ - steps: context.steps, + steps: this.safeSerialize(context.steps), trigger: { type: context.trigger.type, collection: context.trigger.collection, - data: context.trigger.data, - doc: context.trigger.doc, + data: this.safeSerialize(context.trigger.data), + doc: this.safeSerialize(context.trigger.doc), operation: context.trigger.operation, - previousDoc: context.trigger.previousDoc, + previousDoc: this.safeSerialize(context.trigger.previousDoc), triggeredAt: context.trigger.triggeredAt, user: context.trigger.req?.user } @@ -486,21 +535,21 @@ export class WorkflowExecutor { /** * Execute a workflow with the given context */ - async execute(workflow: Workflow, context: ExecutionContext, req: PayloadRequest): Promise { + async execute(workflow: PayloadWorkflow, context: ExecutionContext, req: PayloadRequest): Promise { this.logger.info({ workflowId: workflow.id, workflowName: workflow.name }, 'Starting workflow execution') const serializeContext = () => ({ - steps: context.steps, + steps: this.safeSerialize(context.steps), trigger: { type: context.trigger.type, collection: context.trigger.collection, - data: context.trigger.data, - doc: context.trigger.doc, + data: this.safeSerialize(context.trigger.data), + doc: this.safeSerialize(context.trigger.doc), operation: context.trigger.operation, - previousDoc: context.trigger.previousDoc, + previousDoc: this.safeSerialize(context.trigger.previousDoc), triggeredAt: context.trigger.triggeredAt, user: context.trigger.req?.user } @@ -515,14 +564,14 @@ export class WorkflowExecutor { status: 'running', triggeredBy: context.trigger.req?.user?.email || 'system', workflow: workflow.id, - workflowVersion: workflow._version || 1 + workflowVersion: 1 // Default version since generated type doesn't have _version field }, req }) try { // Resolve execution order based on dependencies - const executionBatches = this.resolveExecutionOrder(workflow.steps) + const executionBatches = this.resolveExecutionOrder(workflow.steps as WorkflowStep[] || []) this.logger.info({ batchSizes: executionBatches.map(batch => batch.length), @@ -720,7 +769,7 @@ export class WorkflowExecutor { }, 'Triggering workflow') // Execute the workflow - await this.execute(workflow as Workflow, context, req) + await this.execute(workflow as PayloadWorkflow, context, req) } } } catch (error) { diff --git a/src/index.ts b/src/index.ts index fe355f9..0b2ab4a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,12 +6,15 @@ export type { CustomTriggerOptions, TriggerResult, ExecutionContext, - Workflow, - WorkflowStep, - WorkflowTrigger, WorkflowsPluginConfig } from './types/index.js' +export type { + PayloadWorkflow as Workflow, + WorkflowStep, + WorkflowTrigger +} from './core/workflow-executor.js' + // Server-side functions are NOT re-exported here to avoid bundling issues // Import server-side functions from the /server export instead diff --git a/src/plugin/cron-scheduler.ts b/src/plugin/cron-scheduler.ts index 68c0a00..8b81c3b 100644 --- a/src/plugin/cron-scheduler.ts +++ b/src/plugin/cron-scheduler.ts @@ -2,7 +2,7 @@ import type {Config, Payload, TaskConfig} from 'payload' import cron from 'node-cron' -import {type Workflow, WorkflowExecutor} from '../core/workflow-executor.js' +import {type PayloadWorkflow, WorkflowExecutor} from '../core/workflow-executor.js' import {getConfigLogger} from './logger.js' /** @@ -101,7 +101,7 @@ export function generateCronTasks(config: Config): void { } // Execute the workflow - await executor.execute(workflow as Workflow, context, req) + await executor.execute(workflow as PayloadWorkflow, context, req) // Re-queue the job for the next scheduled execution if cronExpression is provided if (cronExpression) { diff --git a/src/plugin/init-global-hooks.ts b/src/plugin/init-global-hooks.ts index fd301b4..71f8eec 100644 --- a/src/plugin/init-global-hooks.ts +++ b/src/plugin/init-global-hooks.ts @@ -1,7 +1,7 @@ import type { Payload, PayloadRequest } from "payload" import type { Logger } from "pino" -import type { WorkflowExecutor, Workflow } from "../core/workflow-executor.js" +import type { WorkflowExecutor, PayloadWorkflow } from "../core/workflow-executor.js" export function initGlobalHooks(payload: Payload, logger: Payload['logger'], executor: WorkflowExecutor) { // Get all globals from the config @@ -100,7 +100,7 @@ async function executeTriggeredGlobalWorkflows( } // Execute the workflow - await executor.execute(workflow as Workflow, context, req) + await executor.execute(workflow as PayloadWorkflow, context, req) } } catch (error) { logger.error({ diff --git a/src/plugin/init-webhook.ts b/src/plugin/init-webhook.ts index 98d2a0b..2acad30 100644 --- a/src/plugin/init-webhook.ts +++ b/src/plugin/init-webhook.ts @@ -1,6 +1,6 @@ import type {Config, PayloadRequest} from 'payload' -import {type Workflow, WorkflowExecutor} from '../core/workflow-executor.js' +import {type PayloadWorkflow, WorkflowExecutor} from '../core/workflow-executor.js' import {getConfigLogger, initializeLogger} from './logger.js' export function initWebhookEndpoint(config: Config, webhookPrefix = 'webhook'): void { @@ -110,7 +110,7 @@ export function initWebhookEndpoint(config: Config, webhookPrefix = 'webhook'): } // Execute the workflow - await executor.execute(workflow as Workflow, context, req) + await executor.execute(workflow as PayloadWorkflow, context, req) return { status: 'triggered', workflowId: workflow.id } } catch (error) { diff --git a/src/types/index.ts b/src/types/index.ts index 0cccd8e..6a670fb 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -27,30 +27,9 @@ export interface ExecutionContext { req: any // PayloadRequest } -export interface WorkflowStep { - id: string - type: string - input: Record - dependencies?: string[] -} - -export interface WorkflowTrigger { - type: 'collection' | 'global' | 'webhook' | 'cron' | 'manual' - collection?: string - global?: string - event?: 'create' | 'update' | 'delete' | 'read' - path?: string - cron?: string -} - -export interface Workflow { - id: string - name: string - description?: string - active: boolean - triggers: WorkflowTrigger[] - steps: WorkflowStep[] -} +// NOTE: Workflow, WorkflowStep, and WorkflowTrigger types are now imported from the generated PayloadCMS types +// These interfaces have been removed to avoid duplication and inconsistencies +// Import them from 'payload' or the generated payload-types.ts file instead export interface WorkflowsPluginConfig { collections?: string[]