Add automatic job scheduling and rescheduling

- Add scheduleEmailProcessingJob helper to check and schedule jobs on init
- Only schedule if no existing uncompleted job exists
- Update job handler to always reschedule itself after completion (5min interval)
- Job reschedules regardless of success/failure for continuous processing
- Initial job starts 1 minute after init, subsequent jobs every 5 minutes

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-13 16:51:59 +02:00
parent fa54c5622c
commit 860dd7e921

View File

@@ -4,6 +4,47 @@ import { MailingService } from './services/MailingService.js'
import { createEmailTemplatesCollection } from './collections/EmailTemplates.js'
import Emails from './collections/Emails.js'
// Helper function to schedule the email processing job
async function scheduleEmailProcessingJob(payload: any, queueName: string): Promise<void> {
const jobSlug = 'process-email-queue'
// Check if there's already a scheduled job for this task
const existingJobs = await payload.find({
collection: 'payload-jobs',
where: {
and: [
{
taskSlug: {
equals: jobSlug,
},
},
{
hasCompleted: {
equals: false,
},
},
],
},
limit: 1,
})
// If no existing job, schedule a new one
if (existingJobs.docs.length === 0) {
await payload.create({
collection: 'payload-jobs',
data: {
taskSlug: jobSlug,
input: {},
queue: queueName,
waitUntil: new Date(Date.now() + 60000), // Start in 1 minute
},
})
console.log(`🔄 Scheduled email processing job in queue: ${queueName}`)
} else {
console.log(`✅ Email processing job already scheduled in queue: ${queueName}`)
}
}
export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => {
const queueName = pluginConfig.queue || 'default'
@@ -80,8 +121,11 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
{
slug: 'process-email-queue',
handler: async ({ job, req }: { job: any; req: any }) => {
const payload = (req as any).payload
let jobResult = null
try {
const mailingService = new MailingService((req as any).payload, pluginConfig)
const mailingService = new MailingService(payload, pluginConfig)
console.log('🔄 Processing email queue (pending + failed emails)...')
@@ -91,19 +135,43 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
// Then retry failed emails
await mailingService.retryFailedEmails()
return {
jobResult = {
output: {
success: true,
message: 'Email queue processed successfully (pending and failed emails)'
}
}
console.log('✅ Email queue processing completed successfully')
} catch (error) {
console.error('❌ Error processing email queue:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Properly fail the job by throwing the error
throw new Error(`Email queue processing failed: ${errorMessage}`)
jobResult = new Error(`Email queue processing failed: ${errorMessage}`)
}
// Always reschedule the next job (success or failure)
try {
await payload.create({
collection: 'payload-jobs',
data: {
taskSlug: 'process-email-queue',
input: {},
queue: queueName,
waitUntil: new Date(Date.now() + 300000), // Reschedule in 5 minutes
},
})
console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`)
} catch (rescheduleError) {
console.error('❌ Failed to reschedule email processing job:', rescheduleError)
}
// Return the original result or throw the error
if (jobResult instanceof Error) {
throw jobResult
}
return jobResult
},
interfaceName: 'ProcessEmailQueueJob',
},
@@ -130,6 +198,13 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
console.log('PayloadCMS Mailing Plugin initialized successfully')
// Schedule the email processing job if not already scheduled
try {
await scheduleEmailProcessingJob(payload, queueName)
} catch (error) {
console.error('Failed to schedule email processing job:', error)
}
// Call onReady callback if provided
if (pluginConfig.onReady) {
await pluginConfig.onReady(payload)