diff --git a/package.json b/package.json index 2322fef..a404dc2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@xtr-dev/payload-mailing", - "version": "0.0.7", + "version": "0.0.8", "description": "Template-based email system with scheduling and job processing for PayloadCMS", "type": "module", "main": "dist/index.js", diff --git a/src/plugin.ts b/src/plugin.ts index ffdfc94..5feeba8 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -4,9 +4,61 @@ import { MailingService } from './services/MailingService.js' import { createEmailTemplatesCollection } from './collections/EmailTemplates.js' import Emails from './collections/Emails.js' +// Helper function to schedule the email processing job +async function scheduleEmailProcessingJob(payload: any, queueName: string, delayMs: number = 60000): Promise { + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queueName: must be a non-empty string') + } + + const jobSlug = 'process-email-queue' + + // Check if there's already a scheduled job for this task + const existingJobs = await payload.find({ + collection: 'payload-jobs', + where: { + and: [ + { + taskSlug: { + equals: jobSlug, + }, + }, + { + hasCompleted: { + equals: false, + }, + }, + ], + }, + limit: 1, + }) + + // If no existing job, schedule a new one + if (existingJobs.docs.length === 0) { + await payload.create({ + collection: 'payload-jobs', + data: { + taskSlug: jobSlug, + input: {}, + queue: queueName, + waitUntil: new Date(Date.now() + delayMs), + }, + }) + console.log(`🔄 Scheduled email processing job in queue: ${queueName}`) + return true + } else { + console.log(`✅ Email processing job already scheduled in queue: ${queueName}`) + return false + } +} + export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { const queueName = pluginConfig.queue || 'default' + // Validate queueName + if (!queueName || typeof queueName !== 'string') { + throw new Error('Invalid queue configuration: queue must be a non-empty string') + } + // Handle templates collection configuration const templatesConfig = pluginConfig.collections?.templates const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates' @@ -80,8 +132,11 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con { slug: 'process-email-queue', handler: async ({ job, req }: { job: any; req: any }) => { + const payload = (req as any).payload + let jobResult = null + try { - const mailingService = new MailingService((req as any).payload, pluginConfig) + const mailingService = new MailingService(payload, pluginConfig) console.log('🔄 Processing email queue (pending + failed emails)...') @@ -91,19 +146,41 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con // Then retry failed emails await mailingService.retryFailedEmails() - return { + jobResult = { output: { success: true, message: 'Email queue processed successfully (pending and failed emails)' } } + + console.log('✅ Email queue processing completed successfully') + } catch (error) { console.error('❌ Error processing email queue:', error) const errorMessage = error instanceof Error ? error.message : 'Unknown error' - // Properly fail the job by throwing the error - throw new Error(`Email queue processing failed: ${errorMessage}`) + jobResult = new Error(`Email queue processing failed: ${errorMessage}`) } + + // Always reschedule the next job (success or failure) using duplicate prevention + let rescheduled = false + try { + rescheduled = await scheduleEmailProcessingJob(payload, queueName, 300000) // Reschedule in 5 minutes + if (rescheduled) { + console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`) + } + } catch (rescheduleError) { + console.error('❌ Failed to reschedule email processing job:', rescheduleError) + // If rescheduling fails, we should warn but not fail the current job + // since the email processing itself may have succeeded + console.warn('⚠️ Email processing completed but next job could not be scheduled') + } + + // Return the original result or throw the error + if (jobResult instanceof Error) { + throw jobResult + } + return jobResult }, interfaceName: 'ProcessEmailQueueJob', }, @@ -130,6 +207,13 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con console.log('PayloadCMS Mailing Plugin initialized successfully') + // Schedule the email processing job if not already scheduled + try { + await scheduleEmailProcessingJob(payload, queueName) + } catch (error) { + console.error('Failed to schedule email processing job:', error) + } + // Call onReady callback if provided if (pluginConfig.onReady) { await pluginConfig.onReady(payload)