From 159a99a1eca057d8b9f9dd4bb7b0d9899eb7ee6c Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sat, 13 Sep 2025 16:56:51 +0200 Subject: [PATCH] Fix race conditions and add validation for job scheduling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reuse duplicate prevention logic in rescheduling to prevent race conditions - Add queueName validation in plugin initialization and helper function - Enhanced scheduleEmailProcessingJob to return boolean and accept delay parameter - Improve error handling: rescheduling failures warn but don't fail current job - Prevent duplicate jobs when multiple handlers complete simultaneously 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/plugin.ts | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/plugin.ts b/src/plugin.ts index 8ad263e..5feeba8 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -5,7 +5,11 @@ import { createEmailTemplatesCollection } from './collections/EmailTemplates.js' import Emails from './collections/Emails.js' // Helper function to schedule the email processing job -async function scheduleEmailProcessingJob(payload: any, queueName: string): Promise { +async function scheduleEmailProcessingJob(payload: any, queueName: string, delayMs: number = 60000): Promise { + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queueName: must be a non-empty string') + } + const jobSlug = 'process-email-queue' // Check if there's already a scheduled job for this task @@ -36,18 +40,25 @@ async function scheduleEmailProcessingJob(payload: any, queueName: string): Prom taskSlug: jobSlug, input: {}, queue: queueName, - waitUntil: new Date(Date.now() + 60000), // Start in 1 minute + waitUntil: new Date(Date.now() + delayMs), }, }) console.log(`🔄 Scheduled email processing job in queue: ${queueName}`) + return true } else { console.log(`✅ Email processing job already scheduled in queue: ${queueName}`) + return false } } export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { const queueName = pluginConfig.queue || 'default' + // Validate queueName + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queue configuration: queue must be a non-empty string') + } + // Handle templates collection configuration const templatesConfig = pluginConfig.collections?.templates const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates' @@ -151,20 +162,18 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con jobResult = new Error(`Email queue processing failed: ${errorMessage}`) } - // Always reschedule the next job (success or failure) + // Always reschedule the next job (success or failure) using duplicate prevention + let rescheduled = false try { - await payload.create({ - collection: 'payload-jobs', - data: { - taskSlug: 'process-email-queue', - input: {}, - queue: queueName, - waitUntil: new Date(Date.now() + 300000), // Reschedule in 5 minutes - }, - }) - console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + rescheduled = await scheduleEmailProcessingJob(payload, queueName, 300000) // Reschedule in 5 minutes + if (rescheduled) { + console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + } } catch (rescheduleError) { console.error('❌ Failed to reschedule email processing job:', rescheduleError) + // If rescheduling fails, we should warn but not fail the current job + // since the email processing itself may have succeeded + console.warn('⚠️ Email processing completed but next job could not be scheduled') } // Return the original result or throw the error