Fix race conditions and add validation for job scheduling

- Reuse duplicate prevention logic in rescheduling to prevent race conditions
- Add queueName validation in plugin initialization and helper function
- Enhanced scheduleEmailProcessingJob to return boolean and accept delay parameter
- Improve error handling: rescheduling failures warn but don't fail current job
- Prevent duplicate jobs when multiple handlers complete simultaneously

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-13 16:56:51 +02:00
parent 860dd7e921
commit 159a99a1ec

View File

@@ -5,7 +5,11 @@ import { createEmailTemplatesCollection } from './collections/EmailTemplates.js'
import Emails from './collections/Emails.js' import Emails from './collections/Emails.js'
// Helper function to schedule the email processing job // Helper function to schedule the email processing job
async function scheduleEmailProcessingJob(payload: any, queueName: string): Promise<void> { async function scheduleEmailProcessingJob(payload: any, queueName: string, delayMs: number = 60000): Promise<boolean> {
if (!queueName || typeof queueName !== 'string') {
throw new Error('Invalid queueName: must be a non-empty string')
}
const jobSlug = 'process-email-queue' const jobSlug = 'process-email-queue'
// Check if there's already a scheduled job for this task // Check if there's already a scheduled job for this task
@@ -36,18 +40,25 @@ async function scheduleEmailProcessingJob(payload: any, queueName: string): Prom
taskSlug: jobSlug, taskSlug: jobSlug,
input: {}, input: {},
queue: queueName, queue: queueName,
waitUntil: new Date(Date.now() + 60000), // Start in 1 minute waitUntil: new Date(Date.now() + delayMs),
}, },
}) })
console.log(`🔄 Scheduled email processing job in queue: ${queueName}`) console.log(`🔄 Scheduled email processing job in queue: ${queueName}`)
return true
} else { } else {
console.log(`✅ Email processing job already scheduled in queue: ${queueName}`) console.log(`✅ Email processing job already scheduled in queue: ${queueName}`)
return false
} }
} }
export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => {
const queueName = pluginConfig.queue || 'default' const queueName = pluginConfig.queue || 'default'
// Validate queueName
if (!queueName || typeof queueName !== 'string') {
throw new Error('Invalid queue configuration: queue must be a non-empty string')
}
// Handle templates collection configuration // Handle templates collection configuration
const templatesConfig = pluginConfig.collections?.templates const templatesConfig = pluginConfig.collections?.templates
const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates' const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates'
@@ -151,20 +162,18 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
jobResult = new Error(`Email queue processing failed: ${errorMessage}`) jobResult = new Error(`Email queue processing failed: ${errorMessage}`)
} }
// Always reschedule the next job (success or failure) // Always reschedule the next job (success or failure) using duplicate prevention
let rescheduled = false
try { try {
await payload.create({ rescheduled = await scheduleEmailProcessingJob(payload, queueName, 300000) // Reschedule in 5 minutes
collection: 'payload-jobs', if (rescheduled) {
data: { console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`)
taskSlug: 'process-email-queue', }
input: {},
queue: queueName,
waitUntil: new Date(Date.now() + 300000), // Reschedule in 5 minutes
},
})
console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`)
} catch (rescheduleError) { } catch (rescheduleError) {
console.error('❌ Failed to reschedule email processing job:', rescheduleError) console.error('❌ Failed to reschedule email processing job:', rescheduleError)
// If rescheduling fails, we should warn but not fail the current job
// since the email processing itself may have succeeded
console.warn('⚠️ Email processing completed but next job could not be scheduled')
} }
// Return the original result or throw the error // Return the original result or throw the error