mirror of
https://github.com/xtr-dev/payload-automation.git
synced 2025-12-10 17:03:22 +00:00
Rename 'collection' field to 'collectionSlug' to avoid PayloadCMS reserved field conflicts
- Updated Workflow collection trigger field from 'collection' to 'collectionSlug' - Updated all document operation steps (create, read, update, delete) to use 'collectionSlug' - Updated corresponding handlers to destructure 'collectionSlug' instead of 'collection' - Removed debug console.log statements from logger configLogger methods - Fixed collection hook debug logs to use 'slug' instead of reserved 'collection' field 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import type {Config, Payload, TaskConfig} from 'payload'
|
||||
|
||||
import * as cron from 'node-cron'
|
||||
|
||||
import {type Workflow, WorkflowExecutor} from '../core/workflow-executor.js'
|
||||
@@ -10,20 +11,20 @@ import {getConfigLogger} from './logger.js'
|
||||
*/
|
||||
export function generateCronTasks(config: Config): void {
|
||||
const logger = getConfigLogger()
|
||||
|
||||
|
||||
// Note: We can't query the database at config time, so we'll need a different approach
|
||||
// We'll create a single task that handles all cron-triggered workflows
|
||||
const cronTask: TaskConfig = {
|
||||
slug: 'workflow-cron-executor',
|
||||
handler: async ({ input, req }) => {
|
||||
const { cronExpression, timezone, workflowId } = input as {
|
||||
const { cronExpression, timezone, workflowId } = input as {
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
workflowId: string
|
||||
}
|
||||
|
||||
|
||||
const logger = req.payload.logger.child({ plugin: '@xtr-dev/payload-automation' })
|
||||
|
||||
|
||||
try {
|
||||
// Get the workflow
|
||||
const workflow = await req.payload.findByID({
|
||||
@@ -32,11 +33,11 @@ export function generateCronTasks(config: Config): void {
|
||||
depth: 2,
|
||||
req
|
||||
})
|
||||
|
||||
|
||||
if (!workflow) {
|
||||
throw new Error(`Workflow ${workflowId} not found`)
|
||||
}
|
||||
|
||||
|
||||
// Create execution context for cron trigger
|
||||
const context = {
|
||||
steps: {},
|
||||
@@ -46,10 +47,10 @@ export function generateCronTasks(config: Config): void {
|
||||
triggeredAt: new Date().toISOString()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create executor
|
||||
const executor = new WorkflowExecutor(req.payload, logger)
|
||||
|
||||
|
||||
// Find the matching cron trigger and check its condition if present
|
||||
const triggers = workflow.triggers as Array<{
|
||||
condition?: string
|
||||
@@ -66,7 +67,7 @@ export function generateCronTasks(config: Config): void {
|
||||
// Check trigger condition if present
|
||||
if (matchingTrigger?.condition) {
|
||||
const conditionMet = executor.evaluateCondition(matchingTrigger.condition, context)
|
||||
|
||||
|
||||
if (!conditionMet) {
|
||||
logger.info({
|
||||
condition: matchingTrigger.condition,
|
||||
@@ -74,23 +75,23 @@ export function generateCronTasks(config: Config): void {
|
||||
workflowId,
|
||||
workflowName: workflow.name
|
||||
}, 'Cron trigger condition not met, skipping workflow execution')
|
||||
|
||||
|
||||
// Re-queue for next execution but don't run workflow
|
||||
if (cronExpression) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
output: {
|
||||
executedAt: new Date().toISOString(),
|
||||
status: 'skipped',
|
||||
reason: 'Condition not met',
|
||||
status: 'skipped',
|
||||
workflowId
|
||||
},
|
||||
state: 'succeeded'
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
logger.info({
|
||||
condition: matchingTrigger.condition,
|
||||
cronExpression,
|
||||
@@ -98,15 +99,15 @@ export function generateCronTasks(config: Config): void {
|
||||
workflowName: workflow.name
|
||||
}, 'Cron trigger condition met')
|
||||
}
|
||||
|
||||
|
||||
// Execute the workflow
|
||||
await executor.execute(workflow as Workflow, context, req)
|
||||
|
||||
|
||||
// Re-queue the job for the next scheduled execution if cronExpression is provided
|
||||
if (cronExpression) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
output: {
|
||||
executedAt: new Date().toISOString(),
|
||||
@@ -120,7 +121,7 @@ export function generateCronTasks(config: Config): void {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
workflowId
|
||||
}, 'Cron job execution failed')
|
||||
|
||||
|
||||
// Re-queue even on failure to ensure continuity (unless it's a validation error)
|
||||
if (cronExpression && !(error instanceof Error && error.message.includes('Invalid cron'))) {
|
||||
void requeueCronJob(workflowId, cronExpression, timezone, req.payload, logger)
|
||||
@@ -131,7 +132,7 @@ export function generateCronTasks(config: Config): void {
|
||||
}, 'Failed to re-queue cron job after execution failure')
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
output: {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
@@ -142,16 +143,16 @@ export function generateCronTasks(config: Config): void {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Add the cron task to config if not already present
|
||||
if (!config.jobs) {
|
||||
config.jobs = { tasks: [] }
|
||||
}
|
||||
|
||||
|
||||
if (!config.jobs.tasks) {
|
||||
config.jobs.tasks = []
|
||||
}
|
||||
|
||||
|
||||
if (!config.jobs.tasks.find(task => task.slug === cronTask.slug)) {
|
||||
logger.debug(`Registering cron executor task: ${cronTask.slug}`)
|
||||
config.jobs.tasks.push(cronTask)
|
||||
@@ -177,19 +178,19 @@ export async function registerCronJobs(payload: Payload, logger: Payload['logger
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
logger.info(`Found ${workflows.docs.length} workflows with cron triggers`)
|
||||
|
||||
|
||||
for (const workflow of workflows.docs) {
|
||||
const triggers = workflow.triggers as Array<{
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
type: string
|
||||
}>
|
||||
|
||||
|
||||
// Find all cron triggers for this workflow
|
||||
const cronTriggers = triggers?.filter(t => t.type === 'cron-trigger') || []
|
||||
|
||||
|
||||
for (const trigger of cronTriggers) {
|
||||
if (trigger.cronExpression) {
|
||||
try {
|
||||
@@ -202,7 +203,7 @@ export async function registerCronJobs(payload: Payload, logger: Payload['logger
|
||||
}, 'Invalid cron expression format')
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// Validate timezone if provided
|
||||
if (trigger.timezone) {
|
||||
try {
|
||||
@@ -217,17 +218,17 @@ export async function registerCronJobs(payload: Payload, logger: Payload['logger
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Calculate next execution time
|
||||
const nextExecution = getNextCronTime(trigger.cronExpression, trigger.timezone)
|
||||
|
||||
|
||||
// Queue the job
|
||||
await payload.jobs.queue({
|
||||
input: { cronExpression: trigger.cronExpression, timezone: trigger.timezone, workflowId: workflow.id },
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: nextExecution
|
||||
})
|
||||
|
||||
|
||||
logger.info({
|
||||
cronExpression: trigger.cronExpression,
|
||||
nextExecution: nextExecution.toISOString(),
|
||||
@@ -276,37 +277,37 @@ function getNextCronTime(cronExpression: string, timezone?: string): Date {
|
||||
|
||||
const now = new Date()
|
||||
const options: { timezone?: string } = timezone ? { timezone } : {}
|
||||
|
||||
|
||||
// Create a task to find the next execution time
|
||||
const task = cron.schedule(cronExpression, () => {}, {
|
||||
...options
|
||||
})
|
||||
|
||||
|
||||
// Parse cron expression parts
|
||||
const cronParts = cronExpression.trim().split(/\s+/)
|
||||
if (cronParts.length !== 5) {
|
||||
void task.destroy()
|
||||
throw new Error(`Invalid cron format: ${cronExpression}. Expected 5 parts.`)
|
||||
}
|
||||
|
||||
|
||||
const [minutePart, hourPart, dayPart, monthPart, weekdayPart] = cronParts
|
||||
|
||||
|
||||
// Calculate next execution with proper lookahead for any schedule frequency
|
||||
// Start from next minute and look ahead systematically
|
||||
let testTime = new Date(now.getTime() + 60 * 1000) // Start 1 minute from now
|
||||
testTime.setSeconds(0, 0) // Reset seconds and milliseconds
|
||||
|
||||
|
||||
// Maximum iterations to prevent infinite loops (covers ~2 years)
|
||||
const maxIterations = 2 * 365 * 24 * 60 // 2 years worth of minutes
|
||||
let iterations = 0
|
||||
|
||||
|
||||
while (iterations < maxIterations) {
|
||||
const minute = testTime.getMinutes()
|
||||
const hour = testTime.getHours()
|
||||
const dayOfMonth = testTime.getDate()
|
||||
const month = testTime.getMonth() + 1
|
||||
const dayOfWeek = testTime.getDay()
|
||||
|
||||
|
||||
if (matchesCronPart(minute, minutePart) &&
|
||||
matchesCronPart(hour, hourPart) &&
|
||||
matchesCronPart(dayOfMonth, dayPart) &&
|
||||
@@ -315,12 +316,12 @@ function getNextCronTime(cronExpression: string, timezone?: string): Date {
|
||||
void task.destroy()
|
||||
return testTime
|
||||
}
|
||||
|
||||
|
||||
// Increment time intelligently based on cron pattern
|
||||
testTime = incrementTimeForCronPattern(testTime, cronParts)
|
||||
iterations++
|
||||
}
|
||||
|
||||
|
||||
void task.destroy()
|
||||
throw new Error(`Could not calculate next execution time for cron expression: ${cronExpression} within reasonable timeframe`)
|
||||
}
|
||||
@@ -331,7 +332,7 @@ function getNextCronTime(cronExpression: string, timezone?: string): Date {
|
||||
function incrementTimeForCronPattern(currentTime: Date, cronParts: string[]): Date {
|
||||
const [minutePart, hourPart, _dayPart, _monthPart, _weekdayPart] = cronParts
|
||||
const nextTime = new Date(currentTime)
|
||||
|
||||
|
||||
// If minute is specific (not wildcard), we can jump to next hour
|
||||
if (minutePart !== '*' && !minutePart.includes('/')) {
|
||||
const targetMinute = getNextValidCronValue(currentTime.getMinutes(), minutePart)
|
||||
@@ -343,7 +344,7 @@ function incrementTimeForCronPattern(currentTime: Date, cronParts: string[]): Da
|
||||
}
|
||||
return nextTime
|
||||
}
|
||||
|
||||
|
||||
// If hour is specific and we're past it, jump to next day
|
||||
if (hourPart !== '*' && !hourPart.includes('/')) {
|
||||
const targetHour = getNextValidCronValue(currentTime.getHours(), hourPart)
|
||||
@@ -356,7 +357,7 @@ function incrementTimeForCronPattern(currentTime: Date, cronParts: string[]): Da
|
||||
}
|
||||
return nextTime
|
||||
}
|
||||
|
||||
|
||||
// Default: increment by 1 minute
|
||||
nextTime.setTime(nextTime.getTime() + 60 * 1000)
|
||||
return nextTime
|
||||
@@ -367,7 +368,7 @@ function incrementTimeForCronPattern(currentTime: Date, cronParts: string[]): Da
|
||||
*/
|
||||
function getNextValidCronValue(currentValue: number, cronPart: string): number {
|
||||
if (cronPart === '*') {return currentValue + 1}
|
||||
|
||||
|
||||
// Handle specific values and ranges
|
||||
const values = parseCronPart(cronPart)
|
||||
return values.find(v => v > currentValue) || values[0]
|
||||
@@ -378,9 +379,9 @@ function getNextValidCronValue(currentValue: number, cronPart: string): number {
|
||||
*/
|
||||
function parseCronPart(cronPart: string): number[] {
|
||||
if (cronPart === '*') {return []}
|
||||
|
||||
|
||||
const values: number[] = []
|
||||
|
||||
|
||||
// Handle comma-separated values
|
||||
if (cronPart.includes(',')) {
|
||||
cronPart.split(',').forEach(part => {
|
||||
@@ -388,7 +389,7 @@ function parseCronPart(cronPart: string): number[] {
|
||||
})
|
||||
return values.sort((a, b) => a - b)
|
||||
}
|
||||
|
||||
|
||||
// Handle ranges
|
||||
if (cronPart.includes('-')) {
|
||||
const [start, end] = cronPart.split('-').map(n => parseInt(n, 10))
|
||||
@@ -397,21 +398,21 @@ function parseCronPart(cronPart: string): number[] {
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
|
||||
// Handle step values
|
||||
if (cronPart.includes('/')) {
|
||||
const [range, step] = cronPart.split('/')
|
||||
const stepNum = parseInt(step, 10)
|
||||
|
||||
|
||||
if (range === '*') {
|
||||
// For wildcards with steps, return empty - handled elsewhere
|
||||
return []
|
||||
}
|
||||
|
||||
|
||||
const baseValues = parseCronPart(range)
|
||||
return baseValues.filter((_, index) => index % stepNum === 0)
|
||||
}
|
||||
|
||||
|
||||
// Single value
|
||||
values.push(parseInt(cronPart, 10))
|
||||
return values
|
||||
@@ -422,29 +423,29 @@ function parseCronPart(cronPart: string): number[] {
|
||||
*/
|
||||
function matchesCronPart(value: number, cronPart: string): boolean {
|
||||
if (cronPart === '*') {return true}
|
||||
|
||||
|
||||
// Handle step values (e.g., */5)
|
||||
if (cronPart.includes('/')) {
|
||||
const [range, step] = cronPart.split('/')
|
||||
const stepNum = parseInt(step, 10)
|
||||
|
||||
|
||||
if (range === '*') {
|
||||
return value % stepNum === 0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Handle ranges (e.g., 1-5)
|
||||
if (cronPart.includes('-')) {
|
||||
const [start, end] = cronPart.split('-').map(n => parseInt(n, 10))
|
||||
return value >= start && value <= end
|
||||
}
|
||||
|
||||
|
||||
// Handle comma-separated values (e.g., 1,3,5)
|
||||
if (cronPart.includes(',')) {
|
||||
const values = cronPart.split(',').map(n => parseInt(n, 10))
|
||||
return values.includes(value)
|
||||
}
|
||||
|
||||
|
||||
// Handle single value
|
||||
const cronValue = parseInt(cronPart, 10)
|
||||
return value === cronValue
|
||||
@@ -468,7 +469,7 @@ export async function requeueCronJob(
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: getNextCronTime(cronExpression, timezone)
|
||||
})
|
||||
|
||||
|
||||
logger.debug({
|
||||
nextRun: getNextCronTime(cronExpression, timezone),
|
||||
timezone: timezone || 'UTC',
|
||||
@@ -487,41 +488,41 @@ export async function requeueCronJob(
|
||||
*/
|
||||
export async function updateWorkflowCronJobs(
|
||||
workflowId: string,
|
||||
payload: Payload,
|
||||
payload: Payload,
|
||||
logger: Payload['logger']
|
||||
): Promise<void> {
|
||||
try {
|
||||
// First, cancel any existing cron jobs for this workflow
|
||||
cancelWorkflowCronJobs(workflowId, payload, logger)
|
||||
|
||||
|
||||
// Get the workflow
|
||||
const workflow = await payload.findByID({
|
||||
id: workflowId,
|
||||
collection: 'workflows',
|
||||
depth: 0
|
||||
})
|
||||
|
||||
|
||||
if (!workflow) {
|
||||
logger.warn({ workflowId }, 'Workflow not found for cron job update')
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
const triggers = workflow.triggers as Array<{
|
||||
cronExpression?: string
|
||||
timezone?: string
|
||||
type: string
|
||||
}>
|
||||
|
||||
|
||||
// Find all cron triggers for this workflow
|
||||
const cronTriggers = triggers?.filter(t => t.type === 'cron-trigger') || []
|
||||
|
||||
|
||||
if (cronTriggers.length === 0) {
|
||||
logger.debug({ workflowId }, 'No cron triggers found for workflow')
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
let scheduledJobs = 0
|
||||
|
||||
|
||||
for (const trigger of cronTriggers) {
|
||||
if (trigger.cronExpression) {
|
||||
try {
|
||||
@@ -534,7 +535,7 @@ export async function updateWorkflowCronJobs(
|
||||
}, 'Invalid cron expression format')
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// Validate timezone if provided
|
||||
if (trigger.timezone) {
|
||||
try {
|
||||
@@ -548,19 +549,19 @@ export async function updateWorkflowCronJobs(
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Calculate next execution time
|
||||
const nextExecution = getNextCronTime(trigger.cronExpression, trigger.timezone)
|
||||
|
||||
|
||||
// Queue the job
|
||||
await payload.jobs.queue({
|
||||
input: { cronExpression: trigger.cronExpression, timezone: trigger.timezone, workflowId },
|
||||
task: 'workflow-cron-executor',
|
||||
waitUntil: nextExecution
|
||||
})
|
||||
|
||||
|
||||
scheduledJobs++
|
||||
|
||||
|
||||
logger.info({
|
||||
cronExpression: trigger.cronExpression,
|
||||
nextExecution: nextExecution.toISOString(),
|
||||
@@ -579,7 +580,7 @@ export async function updateWorkflowCronJobs(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (scheduledJobs > 0) {
|
||||
logger.info({ scheduledJobs, workflowId }, 'Updated cron jobs for workflow')
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user