From 860dd7e92160d552b026ae92c8f1b28da58c32d9 Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sat, 13 Sep 2025 16:51:59 +0200 Subject: [PATCH 1/3] Add automatic job scheduling and rescheduling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add scheduleEmailProcessingJob helper to check and schedule jobs on init - Only schedule if no existing uncompleted job exists - Update job handler to always reschedule itself after completion (5min interval) - Job reschedules regardless of success/failure for continuous processing - Initial job starts 1 minute after init, subsequent jobs every 5 minutes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/plugin.ts | 83 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/src/plugin.ts b/src/plugin.ts index ffdfc94..8ad263e 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -4,6 +4,47 @@ import { MailingService } from './services/MailingService.js' import { createEmailTemplatesCollection } from './collections/EmailTemplates.js' import Emails from './collections/Emails.js' +// Helper function to schedule the email processing job +async function scheduleEmailProcessingJob(payload: any, queueName: string): Promise { + const jobSlug = 'process-email-queue' + + // Check if there's already a scheduled job for this task + const existingJobs = await payload.find({ + collection: 'payload-jobs', + where: { + and: [ + { + taskSlug: { + equals: jobSlug, + }, + }, + { + hasCompleted: { + equals: false, + }, + }, + ], + }, + limit: 1, + }) + + // If no existing job, schedule a new one + if (existingJobs.docs.length === 0) { + await payload.create({ + collection: 'payload-jobs', + data: { + taskSlug: jobSlug, + input: {}, + queue: queueName, + waitUntil: new Date(Date.now() + 60000), // Start in 1 minute + }, + }) + console.log(`🔄 Scheduled email processing job in queue: ${queueName}`) + } else { + console.log(`✅ Email processing job already scheduled in queue: ${queueName}`) + } +} + export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { const queueName = pluginConfig.queue || 'default' @@ -80,8 +121,11 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con { slug: 'process-email-queue', handler: async ({ job, req }: { job: any; req: any }) => { + const payload = (req as any).payload + let jobResult = null + try { - const mailingService = new MailingService((req as any).payload, pluginConfig) + const mailingService = new MailingService(payload, pluginConfig) console.log('🔄 Processing email queue (pending + failed emails)...') @@ -91,19 +135,43 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con // Then retry failed emails await mailingService.retryFailedEmails() - return { + jobResult = { output: { success: true, message: 'Email queue processed successfully (pending and failed emails)' } } + + console.log('✅ Email queue processing completed successfully') + } catch (error) { console.error('❌ Error processing email queue:', error) const errorMessage = error instanceof Error ? error.message : 'Unknown error' - // Properly fail the job by throwing the error - throw new Error(`Email queue processing failed: ${errorMessage}`) + jobResult = new Error(`Email queue processing failed: ${errorMessage}`) } + + // Always reschedule the next job (success or failure) + try { + await payload.create({ + collection: 'payload-jobs', + data: { + taskSlug: 'process-email-queue', + input: {}, + queue: queueName, + waitUntil: new Date(Date.now() + 300000), // Reschedule in 5 minutes + }, + }) + console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + } catch (rescheduleError) { + console.error('❌ Failed to reschedule email processing job:', rescheduleError) + } + + // Return the original result or throw the error + if (jobResult instanceof Error) { + throw jobResult + } + return jobResult }, interfaceName: 'ProcessEmailQueueJob', }, @@ -130,6 +198,13 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con console.log('PayloadCMS Mailing Plugin initialized successfully') + // Schedule the email processing job if not already scheduled + try { + await scheduleEmailProcessingJob(payload, queueName) + } catch (error) { + console.error('Failed to schedule email processing job:', error) + } + // Call onReady callback if provided if (pluginConfig.onReady) { await pluginConfig.onReady(payload) From 159a99a1eca057d8b9f9dd4bb7b0d9899eb7ee6c Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sat, 13 Sep 2025 16:56:51 +0200 Subject: [PATCH 2/3] Fix race conditions and add validation for job scheduling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reuse duplicate prevention logic in rescheduling to prevent race conditions - Add queueName validation in plugin initialization and helper function - Enhanced scheduleEmailProcessingJob to return boolean and accept delay parameter - Improve error handling: rescheduling failures warn but don't fail current job - Prevent duplicate jobs when multiple handlers complete simultaneously 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/plugin.ts | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/plugin.ts b/src/plugin.ts index 8ad263e..5feeba8 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -5,7 +5,11 @@ import { createEmailTemplatesCollection } from './collections/EmailTemplates.js' import Emails from './collections/Emails.js' // Helper function to schedule the email processing job -async function scheduleEmailProcessingJob(payload: any, queueName: string): Promise { +async function scheduleEmailProcessingJob(payload: any, queueName: string, delayMs: number = 60000): Promise { + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queueName: must be a non-empty string') + } + const jobSlug = 'process-email-queue' // Check if there's already a scheduled job for this task @@ -36,18 +40,25 @@ async function scheduleEmailProcessingJob(payload: any, queueName: string): Prom taskSlug: jobSlug, input: {}, queue: queueName, - waitUntil: new Date(Date.now() + 60000), // Start in 1 minute + waitUntil: new Date(Date.now() + delayMs), }, }) console.log(`🔄 Scheduled email processing job in queue: ${queueName}`) + return true } else { console.log(`✅ Email processing job already scheduled in queue: ${queueName}`) + return false } } export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { const queueName = pluginConfig.queue || 'default' + // Validate queueName + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queue configuration: queue must be a non-empty string') + } + // Handle templates collection configuration const templatesConfig = pluginConfig.collections?.templates const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates' @@ -151,20 +162,18 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con jobResult = new Error(`Email queue processing failed: ${errorMessage}`) } - // Always reschedule the next job (success or failure) + // Always reschedule the next job (success or failure) using duplicate prevention + let rescheduled = false try { - await payload.create({ - collection: 'payload-jobs', - data: { - taskSlug: 'process-email-queue', - input: {}, - queue: queueName, - waitUntil: new Date(Date.now() + 300000), // Reschedule in 5 minutes - }, - }) - console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + rescheduled = await scheduleEmailProcessingJob(payload, queueName, 300000) // Reschedule in 5 minutes + if (rescheduled) { + console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + } } catch (rescheduleError) { console.error('❌ Failed to reschedule email processing job:', rescheduleError) + // If rescheduling fails, we should warn but not fail the current job + // since the email processing itself may have succeeded + console.warn('⚠️ Email processing completed but next job could not be scheduled') } // Return the original result or throw the error From 243f7c96cf6c9279e304ba8f636de8a126df8f76 Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sat, 13 Sep 2025 16:57:50 +0200 Subject: [PATCH 3/3] Bump package version to 0.0.8 in `package.json`. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 2322fef..a404dc2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@xtr-dev/payload-mailing", - "version": "0.0.7", + "version": "0.0.8", "description": "Template-based email system with scheduling and job processing for PayloadCMS", "type": "module", "main": "dist/index.js",