mirror of
https://github.com/xtr-dev/payload-automation.git
synced 2025-12-07 23:53:24 +00:00
Remove cron trigger implementation
- Remove cron-trigger.ts and cron-scheduler.ts files - Clean up cron-related code from plugin index and workflow hooks - Remove cron references from workflow executor types - Add cron to NOT-IMPLEMENTING.md with webhook alternatives - Update README with scheduled workflow documentation using external services - Suggest GitHub Actions and Vercel Cron as reliable alternatives Benefits of external scheduling: - Better reliability and process isolation - Easier debugging and monitoring - Leverages existing cloud infrastructure - Reduces plugin complexity and maintenance burden
This commit is contained in:
58
.claude/agents/source-docs-generator.md
Normal file
58
.claude/agents/source-docs-generator.md
Normal file
@@ -0,0 +1,58 @@
|
||||
---
|
||||
name: source-docs-generator
|
||||
description: Use this agent when you need to generate or update documentation files for source code. Examples: <example>Context: User wants to document their codebase by creating .md files for each source file. user: 'I need documentation for all my source files in the src directory' assistant: 'I'll use the source-docs-generator agent to create documentation files for your source code' <commentary>The user is requesting documentation generation for source files, which is exactly what the source-docs-generator agent is designed for.</commentary></example> <example>Context: User has added new source files and wants documentation updated. user: 'Can you update the docs for the new files I added to src/components?' assistant: 'I'll use the source-docs-generator agent to check for new or updated source files and generate corresponding documentation' <commentary>The agent will check existing docs and only update what's needed.</commentary></example>
|
||||
model: sonnet
|
||||
---
|
||||
|
||||
You are a Source Code Documentation Generator, an expert technical writer specializing in creating clear, comprehensive documentation for source code files. Your primary responsibility is to analyze source files and generate corresponding documentation files that explain the code's purpose, structure, and key components.
|
||||
|
||||
Your process for each task:
|
||||
|
||||
1. **Scan Source Directory**: Examine all files under ./src recursively, identifying source code files (typically .ts, .tsx, .js, .jsx, .py, etc.)
|
||||
|
||||
2. **Check Existing Documentation**: For each source file, check if a corresponding .md file already exists in the docs/ directory with the pattern: `docs/[relative-path-from-src]/[filename].[extension].md`
|
||||
|
||||
3. **Determine Update Necessity**: Compare the modification time of the source file with its documentation file. Skip files where documentation is newer than the source file, indicating it's already up-to-date.
|
||||
|
||||
4. **Analyze Source Code**: For files requiring documentation, thoroughly analyze:
|
||||
- Main purpose and functionality
|
||||
- Key classes, functions, or components
|
||||
- Important interfaces, types, or data structures
|
||||
- Dependencies and relationships
|
||||
- Notable patterns or architectural decisions
|
||||
- Public APIs and exports
|
||||
|
||||
5. **Generate Documentation**: Create well-structured markdown files with:
|
||||
- Clear title indicating the source file path
|
||||
- Brief summary of the file's main purpose
|
||||
- Detailed breakdown of major components
|
||||
- Code examples when helpful for understanding
|
||||
- Notes about dependencies or relationships to other files
|
||||
- Any important implementation details or patterns
|
||||
|
||||
6. **Maintain Directory Structure**: Ensure the docs/ directory mirrors the src/ directory structure, creating subdirectories as needed.
|
||||
|
||||
7. **Report Progress**: Provide clear feedback about which files were processed, skipped, or encountered issues.
|
||||
|
||||
Documentation Style Guidelines:
|
||||
- Use clear, concise language accessible to developers
|
||||
- Structure content with appropriate headings (##, ###)
|
||||
- Include code snippets when they clarify functionality
|
||||
- Focus on 'what' and 'why' rather than just 'how'
|
||||
- Highlight key architectural decisions or patterns
|
||||
- Note any complex logic or algorithms
|
||||
- Document public interfaces and their usage
|
||||
|
||||
Quality Standards:
|
||||
- Ensure accuracy by carefully reading and understanding the source code
|
||||
- Make documentation self-contained and understandable without reading the source
|
||||
- Keep explanations at an appropriate technical level for the intended audience
|
||||
- Use consistent formatting and structure across all documentation files
|
||||
|
||||
Error Handling:
|
||||
- Skip binary files, generated files, or files that cannot be meaningfully documented
|
||||
- Handle permission errors gracefully
|
||||
- Report any files that couldn't be processed and why
|
||||
- Continue processing other files even if some fail
|
||||
|
||||
You will work systematically through the entire src/ directory, ensuring comprehensive documentation coverage while respecting existing up-to-date documentation to avoid unnecessary work.
|
||||
@@ -48,7 +48,8 @@ This document lists workflow steps and triggers that are intentionally **not** b
|
||||
- **Performance Alerts** - Requires monitoring infrastructure
|
||||
- **Error Events** - Better handled by dedicated error tracking
|
||||
|
||||
### Advanced Time-Based
|
||||
### Time-Based
|
||||
- **Cron Triggers** - Complex timezone handling, process management, and reliability concerns. Use webhook triggers with external cron services instead (GitHub Actions, Vercel Cron, AWS EventBridge, etc.)
|
||||
- **Recurring Patterns** (e.g., "every 2nd Tuesday") - Complex parsing and timezone handling
|
||||
- **Date Range Triggers** - Can be achieved with conditional logic in workflows
|
||||
|
||||
@@ -65,4 +66,36 @@ This document lists workflow steps and triggers that are intentionally **not** b
|
||||
- Implement custom steps using the plugin's TaskConfig interface
|
||||
- Use HTTP Request step for most external integrations
|
||||
- Create custom triggers through Payload hooks
|
||||
- Build specialized workflow packages on top of this plugin
|
||||
- Build specialized workflow packages on top of this plugin
|
||||
|
||||
### Cron Alternative: Webhook + External Service
|
||||
|
||||
Instead of built-in cron triggers, use webhook triggers with external cron services:
|
||||
|
||||
**GitHub Actions** (Free):
|
||||
```yaml
|
||||
# .github/workflows/daily-report.yml
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 9 * * *' # Daily at 9 AM UTC
|
||||
jobs:
|
||||
trigger-workflow:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: curl -X POST https://your-app.com/api/workflows-webhook/daily-report
|
||||
```
|
||||
|
||||
**Vercel Cron** (Serverless):
|
||||
```js
|
||||
// api/cron/daily.js
|
||||
export default async function handler(req, res) {
|
||||
await fetch('https://your-app.com/api/workflows-webhook/daily-report', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ source: 'vercel-cron' })
|
||||
});
|
||||
res.status(200).json({ success: true });
|
||||
}
|
||||
```
|
||||
|
||||
**Benefits**: Better reliability, proper process isolation, easier debugging, and leverages existing infrastructure.
|
||||
33
README.md
33
README.md
@@ -9,6 +9,7 @@ A comprehensive workflow automation plugin for PayloadCMS 3.x that enables visua
|
||||
- 🔄 **Visual Workflow Builder** - Create complex workflows with drag-and-drop interface
|
||||
- ⚡ **Parallel Execution** - Smart dependency resolution for optimal performance
|
||||
- 🎯 **Multiple Triggers** - Collection hooks, webhooks, manual execution
|
||||
- ⏰ **Scheduled Workflows** - Use webhook triggers with external cron services
|
||||
- 📊 **Execution Tracking** - Complete history and monitoring of workflow runs
|
||||
- 🔧 **Extensible Steps** - HTTP requests, document CRUD, email notifications
|
||||
- 🔍 **JSONPath Integration** - Dynamic data interpolation and transformation
|
||||
@@ -183,6 +184,38 @@ For debugging, use `debug` or `info`:
|
||||
PAYLOAD_AUTOMATION_LOG_LEVEL=debug npm run dev
|
||||
```
|
||||
|
||||
## Scheduled Workflows
|
||||
|
||||
For scheduled workflows, use **webhook triggers** with external cron services instead of built-in cron triggers:
|
||||
|
||||
### GitHub Actions (Free)
|
||||
```yaml
|
||||
# .github/workflows/daily-report.yml
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 9 * * *' # Daily at 9 AM UTC
|
||||
jobs:
|
||||
trigger-workflow:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: curl -X POST https://your-app.com/api/workflows-webhook/daily-report
|
||||
```
|
||||
|
||||
### Vercel Cron (Serverless)
|
||||
```js
|
||||
// api/cron/daily.js
|
||||
export default async function handler(req, res) {
|
||||
await fetch('https://your-app.com/api/workflows-webhook/daily-report', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ source: 'vercel-cron' })
|
||||
});
|
||||
res.status(200).json({ success: true });
|
||||
}
|
||||
```
|
||||
|
||||
**Benefits**: Better reliability, proper process isolation, easier debugging, and leverages existing infrastructure.
|
||||
|
||||
## Documentation
|
||||
|
||||
Full documentation coming soon. For now, explore the development environment in the repository for examples and patterns.
|
||||
|
||||
@@ -2,16 +2,8 @@ import type {CollectionConfig, Field} from 'payload'
|
||||
|
||||
import type {WorkflowsPluginConfig} from "../plugin/config-types.js"
|
||||
|
||||
import {
|
||||
collectionTrigger,
|
||||
cronTrigger,
|
||||
globalTrigger,
|
||||
webhookTrigger
|
||||
} from '../triggers/index.js'
|
||||
import {trigger} from '../triggers/helpers.js'
|
||||
|
||||
export const createWorkflowCollection: <T extends string>(options: WorkflowsPluginConfig<T>) => CollectionConfig = (options) => {
|
||||
const {steps, collectionTriggers} = options
|
||||
const {steps} = options
|
||||
const triggers = (options.triggers || []).map(t => t(options))
|
||||
return {
|
||||
slug: 'workflows',
|
||||
@@ -72,11 +64,6 @@ export const createWorkflowCollection: <T extends string>(options: WorkflowsPlug
|
||||
},
|
||||
defaultValue: {}
|
||||
},
|
||||
// Virtual fields for built-in triggers
|
||||
...trigger({slug: 'collection', fields: collectionTrigger(options).fields}).fields,
|
||||
...trigger({slug: 'webhook', fields: webhookTrigger().fields}).fields,
|
||||
...trigger({slug: 'global', fields: globalTrigger().fields}).fields,
|
||||
...trigger({slug: 'cron', fields: cronTrigger().fields}).fields,
|
||||
{
|
||||
name: 'condition',
|
||||
type: 'text',
|
||||
@@ -86,8 +73,6 @@ export const createWorkflowCollection: <T extends string>(options: WorkflowsPlug
|
||||
required: false
|
||||
},
|
||||
// Virtual fields for custom triggers
|
||||
// Note: Custom trigger fields from trigger-helpers already have unique names
|
||||
// We just need to pass them through without modification
|
||||
...(triggers || []).flatMap(t => (t.fields || []))
|
||||
]
|
||||
},
|
||||
|
||||
@@ -13,8 +13,6 @@ export type PayloadWorkflow = {
|
||||
collectionSlug?: string | null
|
||||
operation?: string | null
|
||||
webhookPath?: string | null
|
||||
cronExpression?: string | null
|
||||
timezone?: string | null
|
||||
global?: string | null
|
||||
globalOperation?: string | null
|
||||
[key: string]: unknown
|
||||
|
||||
@@ -1,642 +0,0 @@
|
||||
import type {Config, Payload, TaskConfig} from 'payload'
|
||||
|
||||
import cron from 'node-cron'
|
||||
|
||||
import {type PayloadWorkflow, WorkflowExecutor} from '../core/workflow-executor.js'
|
||||
import {getConfigLogger} from './logger.js'
|
||||
|
||||
/**
|
||||
* Generate dynamic cron tasks for all workflows with cron triggers
|
||||
* This is called at config time to register all scheduled tasks
|
||||
*/
|
||||
export function generateCronTasks(config: Config): void {
|
||||
const logger = getConfigLogger()
|
||||
|
||||
// Note: We can't query the database at config time, so we'll need a different approach
|
||||
// We'll create a single task that handles all cron-triggered workflows
|
||||
const cronTask: TaskConfig = {
|
||||
slug: 'workflow-cron-executor',
|
||||
handler: async ({ input, req }) => {
|
||||
const { cronExpression, timezone, workflowId } = input as {
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
workflowId: string
|
||||
}
|
||||
|
||||
const logger = req.payload.logger.child({ plugin: '@xtr-dev/payload-automation' })
|
||||
|
||||
try {
|
||||
// Get the workflow
|
||||
const workflow = await req.payload.findByID({
|
||||
id: workflowId,
|
||||
collection: 'workflows',
|
||||
depth: 2,
|
||||
req
|
||||
})
|
||||
|
||||
if (!workflow) {
|
||||
throw new Error(`Workflow ${workflowId} not found`)
|
||||
}
|
||||
|
||||
// Create execution context for cron trigger
|
||||
const context = {
|
||||
steps: {},
|
||||
trigger: {
|
||||
type: 'cron',
|
||||
req,
|
||||
triggeredAt: new Date().toISOString()
|
||||
}
|
||||
}
|
||||
|
||||
// Create executor
|
||||
const executor = new WorkflowExecutor(req.payload, logger)
|
||||
|
||||
// Find the matching cron trigger and check its condition if present
|
||||
const triggers = workflow.triggers as Array<{
|
||||
condition?: string
|
||||
parameters?: {
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
[key: string]: any
|
||||
}
|
||||
type: string
|
||||
}>
|
||||
|
||||
const matchingTrigger = triggers?.find(trigger =>
|
||||
trigger.type === 'cron-trigger' &&
|
||||
trigger.parameters?.cronExpression === cronExpression
|
||||
)
|
||||
|
||||
// Check trigger condition if present
|
||||
if (matchingTrigger?.condition) {
|
||||
const conditionMet = executor.evaluateCondition(matchingTrigger.condition, context)
|
||||
|
||||
if (!conditionMet) {
|
||||
logger.info({
|
||||
condition: matchingTrigger.condition,
|
||||
cronExpression,
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Cron trigger condition not met, skipping workflow execution')
|
||||
|
||||
// Re-queue for next execution but don't run workflow
|
||||
if (cronExpression) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
}
|
||||
|
||||
return {
|
||||
output: {
|
||||
executedAt: new Date().toISOString(),
|
||||
reason: 'Condition not met',
|
||||
status: 'skipped',
|
||||
workflowId
|
||||
},
|
||||
state: 'succeeded'
|
||||
}
|
||||
}
|
||||
|
||||
logger.info({
|
||||
condition: matchingTrigger.condition,
|
||||
cronExpression,
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Cron trigger condition met')
|
||||
}
|
||||
|
||||
// Execute the workflow
|
||||
await executor.execute(workflow as PayloadWorkflow, context, req)
|
||||
|
||||
// Re-queue the job for the next scheduled execution if cronExpression is provided
|
||||
if (cronExpression) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
}
|
||||
|
||||
return {
|
||||
output: {
|
||||
executedAt: new Date().toISOString(),
|
||||
status: 'completed',
|
||||
workflowId
|
||||
},
|
||||
state: 'succeeded'
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Cron job execution failed')
|
||||
|
||||
// Re-queue even on failure to ensure continuity (unless it's a validation error)
|
||||
if (cronExpression && !(error instanceof Error && error.message.includes('Invalid cron'))) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
.catch((requeueError) => {
|
||||
logger.error({
|
||||
error: requeueError instanceof Error ? requeueError.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Failed to re-queue cron job after execution failure')
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
output: {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
},
|
||||
state: 'failed'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add the cron task to config if not already present
|
||||
if (!config.jobs) {
|
||||
config.jobs = { tasks: [] }
|
||||
}
|
||||
|
||||
if (!config.jobs.tasks) {
|
||||
config.jobs.tasks = []
|
||||
}
|
||||
|
||||
if (!config.jobs.tasks.find(task => task.slug === cronTask.slug)) {
|
||||
logger.debug(`Registering cron executor task: ${cronTask.slug}`)
|
||||
config.jobs.tasks.push(cronTask)
|
||||
} else {
|
||||
logger.debug(`Cron executor task ${cronTask.slug} already registered, skipping`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register cron jobs for workflows with cron triggers
|
||||
* This is called at runtime after PayloadCMS is initialized
|
||||
*/
|
||||
export async function registerCronJobs(payload: Payload, logger: Payload['logger']): Promise<void> {
|
||||
try {
|
||||
// Find all workflows with cron triggers
|
||||
const workflows = await payload.find({
|
||||
collection: 'workflows',
|
||||
depth: 0,
|
||||
limit: 1000,
|
||||
where: {
|
||||
'triggers.type': {
|
||||
equals: 'cron-trigger'
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
logger.info(`Found ${workflows.docs.length} workflows with cron triggers`)
|
||||
|
||||
for (const workflow of workflows.docs) {
|
||||
const triggers = workflow.triggers as Array<{
|
||||
parameters?: {
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
[key: string]: any
|
||||
}
|
||||
type: string
|
||||
}>
|
||||
|
||||
// Find all cron triggers for this workflow
|
||||
const cronTriggers = triggers?.filter(t => t.type === 'cron-trigger') || []
|
||||
|
||||
for (const trigger of cronTriggers) {
|
||||
if (trigger.parameters?.cronExpression) {
|
||||
try {
|
||||
// Validate cron expression before queueing
|
||||
if (!validateCronExpression(trigger.parameters.cronExpression)) {
|
||||
logger.error({
|
||||
cronExpression: trigger.parameters.cronExpression,
|
||||
workflowId: workflow.id,
|
||||
workflowName: workflow.name
|
||||
}, 'Invalid cron expression format')
|
||||
continue
|
||||
}
|
||||
|
||||
// Validate timezone if provided
|
||||
if (trigger.parameters?.timezone) {
|
||||
try {
|
||||
// Test if timezone is valid by trying to create a date with it
|
||||
new Intl.DateTimeFormat('en', { timeZone: trigger.parameters.timezone })
|
||||
} catch {
|
||||
logger.error({
|
||||
timezone: trigger.parameters.timezone,
|
||||
workflowId: workflow.id,
|
||||
workflowName: workflow.name
|
||||
}, 'Invalid timezone specified')
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate next execution time
|
||||
const nextExecution = getNextCronTime(trigger.parameters.cronExpression, trigger.parameters?.timezone)
|
||||
|
||||
// Queue the job
|
||||
await payload.jobs.queue({
|
||||
input: { cronExpression: trigger.parameters.cronExpression, timezone: trigger.parameters?.timezone, workflowId: workflow.id },
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: nextExecution
|
||||
})
|
||||
|
||||
logger.info({
|
||||
cronExpression: trigger.parameters.cronExpression,
|
||||
nextExecution: nextExecution.toISOString(),
|
||||
timezone: trigger.parameters?.timezone || 'UTC',
|
||||
workflowId: workflow.id,
|
||||
workflowName: workflow.name
|
||||
}, 'Queued initial cron job for workflow')
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
cronExpression: trigger.parameters.cronExpression,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
timezone: trigger.parameters?.timezone,
|
||||
workflowId: workflow.id,
|
||||
workflowName: workflow.name
|
||||
}, 'Failed to queue cron job')
|
||||
}
|
||||
} else {
|
||||
logger.warn({
|
||||
workflowId: workflow.id,
|
||||
workflowName: workflow.name
|
||||
}, 'Cron trigger found but no cron expression specified')
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
}, 'Failed to register cron jobs')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate a cron expression
|
||||
*/
|
||||
export function validateCronExpression(cronExpression: string): boolean {
|
||||
return cron.validate(cronExpression)
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the next time a cron expression should run
|
||||
*/
|
||||
function getNextCronTime(cronExpression: string, timezone?: string): Date {
|
||||
if (!validateCronExpression(cronExpression)) {
|
||||
throw new Error(`Invalid cron expression: ${cronExpression}`)
|
||||
}
|
||||
|
||||
const now = new Date()
|
||||
const options: { timezone?: string } = timezone ? { timezone } : {}
|
||||
|
||||
// Create a task to find the next execution time
|
||||
const task = cron.schedule(cronExpression, () => {}, {
|
||||
...options
|
||||
})
|
||||
|
||||
// Parse cron expression parts
|
||||
const cronParts = cronExpression.trim().split(/\s+/)
|
||||
if (cronParts.length !== 5) {
|
||||
void task.destroy()
|
||||
throw new Error(`Invalid cron format: ${cronExpression}. Expected 5 parts.`)
|
||||
}
|
||||
|
||||
const [minutePart, hourPart, dayPart, monthPart, weekdayPart] = cronParts
|
||||
|
||||
// Calculate next execution with proper lookahead for any schedule frequency
|
||||
// Start from next minute and look ahead systematically
|
||||
let testTime = new Date(now.getTime() + 60 * 1000) // Start 1 minute from now
|
||||
testTime.setSeconds(0, 0) // Reset seconds and milliseconds
|
||||
|
||||
// Maximum iterations to prevent infinite loops (covers ~2 years)
|
||||
const maxIterations = 2 * 365 * 24 * 60 // 2 years worth of minutes
|
||||
let iterations = 0
|
||||
|
||||
while (iterations < maxIterations) {
|
||||
const minute = testTime.getMinutes()
|
||||
const hour = testTime.getHours()
|
||||
const dayOfMonth = testTime.getDate()
|
||||
const month = testTime.getMonth() + 1
|
||||
const dayOfWeek = testTime.getDay()
|
||||
|
||||
if (matchesCronPart(minute, minutePart) &&
|
||||
matchesCronPart(hour, hourPart) &&
|
||||
matchesCronPart(dayOfMonth, dayPart) &&
|
||||
matchesCronPart(month, monthPart) &&
|
||||
matchesCronPart(dayOfWeek, weekdayPart)) {
|
||||
void task.destroy()
|
||||
return testTime
|
||||
}
|
||||
|
||||
// Increment time intelligently based on cron pattern
|
||||
testTime = incrementTimeForCronPattern(testTime, cronParts)
|
||||
iterations++
|
||||
}
|
||||
|
||||
void task.destroy()
|
||||
throw new Error(`Could not calculate next execution time for cron expression: ${cronExpression} within reasonable timeframe`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Intelligently increment time based on cron pattern to avoid unnecessary iterations
|
||||
*/
|
||||
function incrementTimeForCronPattern(currentTime: Date, cronParts: string[]): Date {
|
||||
const [minutePart, hourPart, _dayPart, _monthPart, _weekdayPart] = cronParts
|
||||
const nextTime = new Date(currentTime)
|
||||
|
||||
// If minute is specific (not wildcard), we can jump to next hour
|
||||
if (minutePart !== '*' && !minutePart.includes('/')) {
|
||||
const targetMinute = getNextValidCronValue(currentTime.getMinutes(), minutePart)
|
||||
if (targetMinute <= currentTime.getMinutes()) {
|
||||
// Move to next hour
|
||||
nextTime.setHours(nextTime.getHours() + 1, targetMinute, 0, 0)
|
||||
} else {
|
||||
nextTime.setMinutes(targetMinute, 0, 0)
|
||||
}
|
||||
return nextTime
|
||||
}
|
||||
|
||||
// If hour is specific and we're past it, jump to next day
|
||||
if (hourPart !== '*' && !hourPart.includes('/')) {
|
||||
const targetHour = getNextValidCronValue(currentTime.getHours(), hourPart)
|
||||
if (targetHour <= currentTime.getHours()) {
|
||||
// Move to next day
|
||||
nextTime.setDate(nextTime.getDate() + 1)
|
||||
nextTime.setHours(targetHour, 0, 0, 0)
|
||||
} else {
|
||||
nextTime.setHours(targetHour, 0, 0, 0)
|
||||
}
|
||||
return nextTime
|
||||
}
|
||||
|
||||
// Default: increment by 1 minute
|
||||
nextTime.setTime(nextTime.getTime() + 60 * 1000)
|
||||
return nextTime
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next valid value for a cron part
|
||||
*/
|
||||
function getNextValidCronValue(currentValue: number, cronPart: string): number {
|
||||
if (cronPart === '*') {return currentValue + 1}
|
||||
|
||||
// Handle specific values and ranges
|
||||
const values = parseCronPart(cronPart)
|
||||
return values.find(v => v > currentValue) || values[0]
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a cron part into an array of valid values
|
||||
*/
|
||||
function parseCronPart(cronPart: string): number[] {
|
||||
if (cronPart === '*') {return []}
|
||||
|
||||
const values: number[] = []
|
||||
|
||||
// Handle comma-separated values
|
||||
if (cronPart.includes(',')) {
|
||||
cronPart.split(',').forEach(part => {
|
||||
values.push(...parseCronPart(part.trim()))
|
||||
})
|
||||
return values.sort((a, b) => a - b)
|
||||
}
|
||||
|
||||
// Handle ranges
|
||||
if (cronPart.includes('-')) {
|
||||
const [start, end] = cronPart.split('-').map(n => parseInt(n, 10))
|
||||
for (let i = start; i <= end; i++) {
|
||||
values.push(i)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// Handle step values
|
||||
if (cronPart.includes('/')) {
|
||||
const [range, step] = cronPart.split('/')
|
||||
const stepNum = parseInt(step, 10)
|
||||
|
||||
if (range === '*') {
|
||||
// For wildcards with steps, return empty - handled elsewhere
|
||||
return []
|
||||
}
|
||||
|
||||
const baseValues = parseCronPart(range)
|
||||
return baseValues.filter((_, index) => index % stepNum === 0)
|
||||
}
|
||||
|
||||
// Single value
|
||||
values.push(parseInt(cronPart, 10))
|
||||
return values
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a value matches a cron expression part
|
||||
*/
|
||||
function matchesCronPart(value: number, cronPart: string): boolean {
|
||||
if (cronPart === '*') {return true}
|
||||
|
||||
// Handle step values (e.g., */5)
|
||||
if (cronPart.includes('/')) {
|
||||
const [range, step] = cronPart.split('/')
|
||||
const stepNum = parseInt(step, 10)
|
||||
|
||||
if (range === '*') {
|
||||
return value % stepNum === 0
|
||||
}
|
||||
}
|
||||
|
||||
// Handle ranges (e.g., 1-5)
|
||||
if (cronPart.includes('-')) {
|
||||
const [start, end] = cronPart.split('-').map(n => parseInt(n, 10))
|
||||
return value >= start && value <= end
|
||||
}
|
||||
|
||||
// Handle comma-separated values (e.g., 1,3,5)
|
||||
if (cronPart.includes(',')) {
|
||||
const values = cronPart.split(',').map(n => parseInt(n, 10))
|
||||
return values.includes(value)
|
||||
}
|
||||
|
||||
// Handle single value
|
||||
const cronValue = parseInt(cronPart, 10)
|
||||
return value === cronValue
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle re-queueing of cron jobs after they execute
|
||||
* This ensures the job runs again at the next scheduled time
|
||||
*/
|
||||
export async function requeueCronJob(
|
||||
workflowId: string,
|
||||
cronExpression: string,
|
||||
timezone: string | undefined,
|
||||
payload: Payload,
|
||||
logger: Payload['logger']
|
||||
): Promise<void> {
|
||||
try {
|
||||
// Queue the job to run at the next scheduled time
|
||||
await payload.jobs.queue({
|
||||
input: { cronExpression, timezone, workflowId },
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: getNextCronTime(cronExpression, timezone)
|
||||
})
|
||||
|
||||
logger.debug({
|
||||
nextRun: getNextCronTime(cronExpression, timezone),
|
||||
timezone: timezone || 'UTC',
|
||||
workflowId
|
||||
}, 'Re-queued cron job')
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Failed to re-queue cron job')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register or update cron jobs for a specific workflow
|
||||
*/
|
||||
export async function updateWorkflowCronJobs(
|
||||
workflowId: string,
|
||||
payload: Payload,
|
||||
logger: Payload['logger']
|
||||
): Promise<void> {
|
||||
try {
|
||||
// First, cancel any existing cron jobs for this workflow
|
||||
cancelWorkflowCronJobs(workflowId, payload, logger)
|
||||
|
||||
// Get the workflow
|
||||
const workflow = await payload.findByID({
|
||||
id: workflowId,
|
||||
collection: 'workflows',
|
||||
depth: 0
|
||||
})
|
||||
|
||||
if (!workflow) {
|
||||
logger.warn({ workflowId }, 'Workflow not found for cron job update')
|
||||
return
|
||||
}
|
||||
|
||||
const triggers = workflow.triggers as Array<{
|
||||
parameters?: {
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
[key: string]: any
|
||||
}
|
||||
type: string
|
||||
}>
|
||||
|
||||
// Find all cron triggers for this workflow
|
||||
const cronTriggers = triggers?.filter(t => t.type === 'cron-trigger') || []
|
||||
|
||||
if (cronTriggers.length === 0) {
|
||||
logger.debug({ workflowId }, 'No cron triggers found for workflow')
|
||||
return
|
||||
}
|
||||
|
||||
let scheduledJobs = 0
|
||||
|
||||
for (const trigger of cronTriggers) {
|
||||
if (trigger.parameters?.cronExpression) {
|
||||
try {
|
||||
// Validate cron expression before queueing
|
||||
if (!validateCronExpression(trigger.parameters.cronExpression)) {
|
||||
logger.error({
|
||||
cronExpression: trigger.parameters.cronExpression,
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Invalid cron expression format')
|
||||
continue
|
||||
}
|
||||
|
||||
// Validate timezone if provided
|
||||
if (trigger.parameters?.timezone) {
|
||||
try {
|
||||
new Intl.DateTimeFormat('en', { timeZone: trigger.parameters.timezone })
|
||||
} catch {
|
||||
logger.error({
|
||||
timezone: trigger.parameters.timezone,
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Invalid timezone specified')
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate next execution time
|
||||
const nextExecution = getNextCronTime(trigger.parameters.cronExpression, trigger.parameters?.timezone)
|
||||
|
||||
// Queue the job
|
||||
await payload.jobs.queue({
|
||||
input: { cronExpression: trigger.parameters.cronExpression, timezone: trigger.parameters?.timezone, workflowId },
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: nextExecution
|
||||
})
|
||||
|
||||
scheduledJobs++
|
||||
|
||||
logger.info({
|
||||
cronExpression: trigger.parameters.cronExpression,
|
||||
nextExecution: nextExecution.toISOString(),
|
||||
timezone: trigger.parameters?.timezone || 'UTC',
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Scheduled cron job for workflow')
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
cronExpression: trigger.parameters?.cronExpression,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
timezone: trigger.parameters?.timezone,
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Failed to schedule cron job')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (scheduledJobs > 0) {
|
||||
logger.info({ scheduledJobs, workflowId }, 'Updated cron jobs for workflow')
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Failed to update workflow cron jobs')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel all cron jobs for a specific workflow
|
||||
*/
|
||||
export function cancelWorkflowCronJobs(
|
||||
workflowId: string,
|
||||
payload: Payload,
|
||||
logger: Payload['logger']
|
||||
): void {
|
||||
try {
|
||||
// Note: PayloadCMS job system doesn't have a built-in way to cancel specific jobs by input
|
||||
// This is a limitation we need to work around
|
||||
// For now, we log that we would cancel jobs for this workflow
|
||||
logger.debug({ workflowId }, 'Would cancel existing cron jobs for workflow (PayloadCMS limitation: cannot selectively cancel jobs)')
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Failed to cancel workflow cron jobs')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove cron jobs for a deleted workflow
|
||||
*/
|
||||
export function removeWorkflowCronJobs(
|
||||
workflowId: string,
|
||||
payload: Payload,
|
||||
logger: Payload['logger']
|
||||
): void {
|
||||
try {
|
||||
cancelWorkflowCronJobs(workflowId, payload, logger)
|
||||
logger.info({ workflowId }, 'Removed cron jobs for deleted workflow')
|
||||
} catch (error) {
|
||||
logger.error({
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Failed to remove workflow cron jobs')
|
||||
}
|
||||
}
|
||||
@@ -5,8 +5,6 @@ import type {CollectionTriggerConfigCrud, WorkflowsPluginConfig} from "./config-
|
||||
import {createWorkflowCollection} from '../collections/Workflow.js'
|
||||
import {WorkflowRunsCollection} from '../collections/WorkflowRuns.js'
|
||||
import {WorkflowExecutor} from '../core/workflow-executor.js'
|
||||
import {generateCronTasks, registerCronJobs} from './cron-scheduler.js'
|
||||
import {initCollectionHooks} from "./init-collection-hooks.js"
|
||||
import {initGlobalHooks} from "./init-global-hooks.js"
|
||||
import {initStepTasks} from "./init-step-tasks.js"
|
||||
import {initWebhookEndpoint} from "./init-webhook.js"
|
||||
@@ -185,7 +183,7 @@ export const workflowsPlugin =
|
||||
|
||||
if (!registry.isInitialized) {
|
||||
logger.warn('Workflow executor not yet initialized, attempting lazy initialization')
|
||||
|
||||
|
||||
try {
|
||||
// Try to create executor if we have a payload instance
|
||||
if (args.req?.payload) {
|
||||
@@ -271,10 +269,7 @@ export const workflowsPlugin =
|
||||
config.jobs = {tasks: []}
|
||||
}
|
||||
|
||||
const configLogger = getConfigLogger()
|
||||
|
||||
// Generate cron tasks for workflows with cron triggers
|
||||
generateCronTasks(config)
|
||||
|
||||
for (const step of pluginOptions.steps) {
|
||||
if (!config.jobs?.tasks?.find(task => task.slug === step.slug)) {
|
||||
@@ -320,9 +315,6 @@ export const workflowsPlugin =
|
||||
logger.info('Initializing step tasks...')
|
||||
initStepTasks(pluginOptions, payload, logger)
|
||||
|
||||
// Register cron jobs for workflows with cron triggers
|
||||
logger.info('Registering cron jobs...')
|
||||
await registerCronJobs(payload, logger)
|
||||
|
||||
logger.info('Plugin initialized successfully - all hooks registered')
|
||||
}
|
||||
|
||||
@@ -1,56 +1,12 @@
|
||||
import type {Payload} from 'payload'
|
||||
|
||||
import {updateWorkflowCronJobs, removeWorkflowCronJobs} from './cron-scheduler.js'
|
||||
import type { Payload } from 'payload'
|
||||
|
||||
/**
|
||||
* Initialize hooks for the workflows collection itself
|
||||
* to manage cron jobs when workflows are created/updated
|
||||
* Initialize hooks for the workflows collection
|
||||
* Currently minimal - can be extended for future workflow management features
|
||||
*/
|
||||
export function initWorkflowHooks(payload: Payload, logger: Payload['logger']): void {
|
||||
// Add afterChange hook to workflows collection to update cron jobs
|
||||
const workflowsCollection = payload.collections.workflows
|
||||
// Future workflow hooks can be added here
|
||||
// For example: workflow validation, cleanup, statistics, etc.
|
||||
|
||||
if (!workflowsCollection) {
|
||||
logger.warn('Workflows collection not found, cannot initialize workflow hooks')
|
||||
return
|
||||
}
|
||||
|
||||
// Add afterChange hook to register/update cron jobs
|
||||
if (!workflowsCollection.config.hooks?.afterChange) {
|
||||
if (!workflowsCollection.config.hooks) {
|
||||
// @ts-expect-error - hooks object will be populated by Payload
|
||||
workflowsCollection.config.hooks = {}
|
||||
}
|
||||
workflowsCollection.config.hooks.afterChange = []
|
||||
}
|
||||
|
||||
workflowsCollection.config.hooks.afterChange.push(async ({ doc, operation }) => {
|
||||
if (operation === 'create' || operation === 'update') {
|
||||
logger.debug({
|
||||
operation,
|
||||
workflowId: doc.id,
|
||||
workflowName: doc.name
|
||||
}, 'Workflow changed, updating cron jobs selectively')
|
||||
|
||||
// Update cron jobs for this specific workflow only
|
||||
await updateWorkflowCronJobs(doc.id, payload, logger)
|
||||
}
|
||||
})
|
||||
|
||||
// Add afterDelete hook to clean up cron jobs
|
||||
if (!workflowsCollection.config.hooks?.afterDelete) {
|
||||
workflowsCollection.config.hooks.afterDelete = []
|
||||
}
|
||||
|
||||
workflowsCollection.config.hooks.afterDelete.push(async ({ doc }) => {
|
||||
logger.debug({
|
||||
workflowId: doc.id,
|
||||
workflowName: doc.name
|
||||
}, 'Workflow deleted, removing cron jobs')
|
||||
|
||||
// Remove cron jobs for the deleted workflow
|
||||
removeWorkflowCronJobs(doc.id, payload, logger)
|
||||
})
|
||||
|
||||
logger.info('Workflow hooks initialized for cron job management')
|
||||
logger.debug('Workflow hooks initialized')
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
import type {TriggerConfig} from '../plugin/config-types.js'
|
||||
|
||||
export const cronTrigger: TriggerConfig = () => ({
|
||||
slug: 'cron',
|
||||
fields: [
|
||||
{
|
||||
name: 'cronExpression',
|
||||
type: 'text',
|
||||
admin: {
|
||||
description: 'Cron expression for scheduled execution (e.g., "0 0 * * *" for daily at midnight)',
|
||||
placeholder: '0 0 * * *'
|
||||
},
|
||||
validate: (value: any, {siblingData}: any) => {
|
||||
const cronValue = value || siblingData?.parameters?.cronExpression
|
||||
if (siblingData?.type === 'cron' && !cronValue) {
|
||||
return 'Cron expression is required for cron triggers'
|
||||
}
|
||||
|
||||
// Validate cron expression format if provided
|
||||
if (siblingData?.type === 'cron' && cronValue) {
|
||||
// Basic format validation - should be 5 parts separated by spaces
|
||||
const cronParts = cronValue.trim().split(/\s+/)
|
||||
if (cronParts.length !== 5) {
|
||||
return 'Invalid cron expression format. Expected 5 parts: "minute hour day month weekday" (e.g., "0 9 * * 1")'
|
||||
}
|
||||
|
||||
// Additional validation could use node-cron but we avoid dynamic imports here
|
||||
// The main validation happens at runtime in the cron scheduler
|
||||
}
|
||||
|
||||
return true
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'timezone',
|
||||
type: 'text',
|
||||
admin: {
|
||||
description: 'Timezone for cron execution (e.g., "America/New_York", "Europe/London"). Defaults to UTC.',
|
||||
placeholder: 'UTC'
|
||||
},
|
||||
defaultValue: 'UTC',
|
||||
validate: (value: any, {siblingData}: any) => {
|
||||
const tzValue = value || siblingData?.parameters?.timezone
|
||||
if (siblingData?.type === 'cron' && tzValue) {
|
||||
try {
|
||||
// Test if timezone is valid by trying to create a date with it
|
||||
new Intl.DateTimeFormat('en', {timeZone: tzValue})
|
||||
return true
|
||||
} catch {
|
||||
return `Invalid timezone: ${tzValue}. Please use a valid IANA timezone identifier (e.g., "America/New_York", "Europe/London")`
|
||||
}
|
||||
}
|
||||
return true
|
||||
},
|
||||
}
|
||||
]
|
||||
})
|
||||
@@ -1,4 +1,3 @@
|
||||
export { collectionTrigger } from './collection-trigger.js'
|
||||
export { cronTrigger } from './cron-trigger.js'
|
||||
export { globalTrigger } from './global-trigger.js'
|
||||
export { webhookTrigger } from './webhook-trigger.js'
|
||||
|
||||
Reference in New Issue
Block a user