Consolidate and simplify email job system

- Replace inline plugin task with jobs directory system
- Move sendTemplateEmailTask to jobs/sendEmailTask.ts and integrate with createMailingJobs()
- Simplify processEmailsJob to always process both pending and failed emails in one task
- Remove separate 'retry-failed' task type - retry logic now runs automatically
- Update MailingService to support lazy initialization for job context
- Update exports to include consolidated job system

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-13 19:10:32 +02:00
parent 2deefc8eaa
commit 43557c9a03
6 changed files with 135 additions and 151 deletions

View File

@@ -11,11 +11,9 @@ export { MailingService } from './services/MailingService.js'
export { default as EmailTemplates, createEmailTemplatesCollection } from './collections/EmailTemplates.js'
export { default as Emails } from './collections/Emails.js'
// Tasks
export { sendTemplateEmailTask, default as sendTemplateEmailTaskDefault } from './tasks/sendTemplateEmailTask.js'
export type { SendTemplateEmailInput } from './tasks/sendTemplateEmailTask.js'
// Jobs are integrated into the plugin configuration
// Jobs (includes the send email task)
export { createMailingJobs, sendEmailJob } from './jobs/index.js'
export type { SendEmailTaskInput } from './jobs/sendEmailTask.js'
// Utility functions for developers
export {

View File

@@ -1,4 +1,5 @@
import { processEmailsJob, ProcessEmailsJobData } from './processEmailsJob.js'
import { sendEmailJob } from './sendEmailTask.js'
import { MailingService } from '../services/MailingService.js'
export const createMailingJobs = (mailingService: MailingService): any[] => {
@@ -13,7 +14,9 @@ export const createMailingJobs = (mailingService: MailingService): any[] => {
},
interfaceName: 'ProcessEmailsJob',
},
sendEmailJob,
]
}
export * from './processEmailsJob.js'
export * from './sendEmailTask.js'

View File

@@ -2,7 +2,7 @@ import type { PayloadRequest } from 'payload'
import { MailingService } from '../services/MailingService.js'
export interface ProcessEmailsJobData {
type: 'process-emails' | 'retry-failed'
// No type needed - always processes both pending and failed emails
}
export const processEmailsJob = async (
@@ -10,18 +10,19 @@ export const processEmailsJob = async (
context: { req: PayloadRequest; mailingService: MailingService }
) => {
const { mailingService } = context
const { type } = job.data
try {
if (type === 'process-emails') {
await mailingService.processEmails()
console.log('Email processing completed successfully')
} else if (type === 'retry-failed') {
await mailingService.retryFailedEmails()
console.log('Failed email retry completed successfully')
}
console.log('🔄 Processing email queue (pending + failed emails)...')
// Process pending emails first
await mailingService.processEmails()
// Then retry failed emails
await mailingService.retryFailedEmails()
console.log('✅ Email queue processing completed successfully (pending and failed emails)')
} catch (error) {
console.error(`${type} job failed:`, error)
console.error('❌ Email queue processing failed:', error)
throw error
}
}
@@ -29,7 +30,6 @@ export const processEmailsJob = async (
export const scheduleEmailsJob = async (
payload: any,
queueName: string,
jobType: 'process-emails' | 'retry-failed',
delay?: number
) => {
if (!payload.jobs) {
@@ -41,10 +41,10 @@ export const scheduleEmailsJob = async (
await payload.jobs.queue({
queue: queueName,
task: 'processEmails',
input: { type: jobType },
input: {},
waitUntil: delay ? new Date(Date.now() + delay) : undefined,
})
} catch (error) {
console.error(`Failed to schedule ${jobType} job:`, error)
console.error('Failed to schedule email processing job:', error)
}
}

View File

@@ -1,28 +1,73 @@
import { renderTemplate } from '../utils/helpers.js'
export interface SendTemplateEmailInput {
templateSlug: string
export interface SendEmailTaskInput {
// Template mode fields
templateSlug?: string
variables?: Record<string, any>
// Direct email mode fields
subject?: string
html?: string
text?: string
// Common fields
to: string | string[]
cc?: string | string[]
bcc?: string | string[]
variables?: Record<string, any>
scheduledAt?: string // ISO date string
priority?: number
// Allow any additional fields that users might have in their email collection
[key: string]: any
}
export const sendTemplateEmailTask = {
slug: 'send-template-email',
label: 'Send Template Email',
export const sendEmailJob = {
slug: 'send-email',
label: 'Send Email',
inputSchema: [
{
name: 'templateSlug',
type: 'text' as const,
required: true,
label: 'Template Slug',
admin: {
description: 'The slug of the email template to render'
description: 'Use a template (leave empty for direct email)',
condition: (data: any) => !data.subject && !data.html
}
},
{
name: 'variables',
type: 'json' as const,
label: 'Template Variables',
admin: {
description: 'JSON object with variables for template rendering',
condition: (data: any) => Boolean(data.templateSlug)
}
},
{
name: 'subject',
type: 'text' as const,
label: 'Subject',
admin: {
description: 'Email subject (required if not using template)',
condition: (data: any) => !data.templateSlug
}
},
{
name: 'html',
type: 'textarea' as const,
label: 'HTML Content',
admin: {
description: 'HTML email content (required if not using template)',
condition: (data: any) => !data.templateSlug
}
},
{
name: 'text',
type: 'textarea' as const,
label: 'Text Content',
admin: {
description: 'Plain text email content (optional)',
condition: (data: any) => !data.templateSlug
}
},
{
@@ -50,14 +95,6 @@ export const sendTemplateEmailTask = {
description: 'Optional comma-separated list of BCC email addresses'
}
},
{
name: 'variables',
type: 'json' as const,
label: 'Template Variables',
admin: {
description: 'JSON object with variables for template rendering'
}
},
{
name: 'scheduledAt',
type: 'date' as const,
@@ -72,6 +109,7 @@ export const sendTemplateEmailTask = {
label: 'Priority',
min: 1,
max: 10,
defaultValue: 5,
admin: {
description: 'Email priority (1 = highest, 10 = lowest)'
}
@@ -79,15 +117,33 @@ export const sendTemplateEmailTask = {
],
handler: async ({ input, payload }: any) => {
// Cast input to our expected type
const taskInput = input as SendTemplateEmailInput
const taskInput = input as SendEmailTaskInput
try {
// Render the template
const { html, text, subject } = await renderTemplate(
payload,
taskInput.templateSlug,
taskInput.variables || {}
)
let html: string
let text: string | undefined
let subject: string
// Check if using template or direct email
if (taskInput.templateSlug) {
// Template mode: render the template
const rendered = await renderTemplate(
payload,
taskInput.templateSlug,
taskInput.variables || {}
)
html = rendered.html
text = rendered.text
subject = rendered.subject
} else {
// Direct email mode: use provided content
if (!taskInput.subject || !taskInput.html) {
throw new Error('Subject and HTML content are required when not using a template')
}
subject = taskInput.subject
html = taskInput.html
text = taskInput.text
}
// Parse email addresses
const parseEmails = (emails: string | string[] | undefined): string[] | undefined => {
@@ -130,7 +186,9 @@ export const sendTemplateEmailTask = {
success: true,
emailId: email.id,
message: `Email queued successfully with ID: ${email.id}`,
templateSlug: taskInput.templateSlug,
mode: taskInput.templateSlug ? 'template' : 'direct',
templateSlug: taskInput.templateSlug || null,
subject: subject,
recipients: emailData.to?.length || 0,
scheduledAt: emailData.scheduledAt || null
}
@@ -148,4 +206,4 @@ export const sendTemplateEmailTask = {
}
}
export default sendTemplateEmailTask
export default sendEmailJob

View File

@@ -3,53 +3,8 @@ import { MailingPluginConfig, MailingContext } from './types/index.js'
import { MailingService } from './services/MailingService.js'
import { createEmailTemplatesCollection } from './collections/EmailTemplates.js'
import Emails from './collections/Emails.js'
import { createMailingJobs, scheduleEmailsJob } from './jobs/index.js'
// Helper function to schedule the email processing job
async function scheduleEmailProcessingJob(payload: any, queueName: string, delayMs: number = 60000): Promise<boolean> {
if (!queueName || typeof queueName !== 'string') {
throw new Error('Invalid queueName: must be a non-empty string')
}
const jobSlug = 'process-email-queue'
// Check if there's already a scheduled job for this task
const existingJobs = await payload.find({
collection: 'payload-jobs',
where: {
and: [
{
taskSlug: {
equals: jobSlug,
},
},
{
hasCompleted: {
equals: false,
},
},
],
},
limit: 1,
})
// If no existing job, schedule a new one
if (existingJobs.docs.length === 0) {
await payload.create({
collection: 'payload-jobs',
data: {
taskSlug: jobSlug,
input: {},
queue: queueName,
waitUntil: new Date(Date.now() + delayMs),
},
})
console.log(`🔄 Scheduled email processing job in queue: ${queueName}`)
return true
} else {
console.log(`✅ Email processing job already scheduled in queue: ${queueName}`)
return false
}
}
export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => {
const queueName = pluginConfig.queue || 'default'
@@ -59,6 +14,9 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
throw new Error('Invalid queue configuration: queue must be a non-empty string')
}
// Initialize mailing service for jobs
const mailingService = new MailingService(null as any, pluginConfig) // payload will be set during onInit
// Handle templates collection configuration
const templatesConfig = pluginConfig.collections?.templates
const templatesSlug = typeof templatesConfig === 'string' ? templatesConfig : 'email-templates'
@@ -129,61 +87,7 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
...(config.jobs || {}),
tasks: [
...(config.jobs?.tasks || []),
{
slug: 'process-email-queue',
handler: async ({ job, req }: { job: any; req: any }) => {
const payload = (req as any).payload
let jobResult = null
try {
const mailingService = new MailingService(payload, pluginConfig)
console.log('🔄 Processing email queue (pending + failed emails)...')
// Process pending emails first
await mailingService.processEmails()
// Then retry failed emails
await mailingService.retryFailedEmails()
jobResult = {
output: {
success: true,
message: 'Email queue processed successfully (pending and failed emails)'
}
}
console.log('✅ Email queue processing completed successfully')
} catch (error) {
console.error('❌ Error processing email queue:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
jobResult = new Error(`Email queue processing failed: ${errorMessage}`)
}
// Always reschedule the next job (success or failure) using duplicate prevention
let rescheduled = false
try {
rescheduled = await scheduleEmailProcessingJob(payload, queueName, 300000) // Reschedule in 5 minutes
if (rescheduled) {
console.log(`🔄 Rescheduled next email processing job in ${queueName} queue`)
}
} catch (rescheduleError) {
console.error('❌ Failed to reschedule email processing job:', rescheduleError)
// If rescheduling fails, we should warn but not fail the current job
// since the email processing itself may have succeeded
console.warn('⚠️ Email processing completed but next job could not be scheduled')
}
// Return the original result or throw the error
if (jobResult instanceof Error) {
throw jobResult
}
return jobResult
},
interfaceName: 'ProcessEmailQueueJob',
},
...createMailingJobs(mailingService),
],
},
onInit: async (payload: any) => {
@@ -191,8 +95,8 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
await config.onInit(payload)
}
// Initialize mailing service
const mailingService = new MailingService(payload, pluginConfig)
// Update mailing service with payload instance
mailingService.payload = payload
// Add mailing context to payload for developer access
;(payload as any).mailing = {
@@ -207,9 +111,10 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
console.log('PayloadCMS Mailing Plugin initialized successfully')
// Schedule the email processing job if not already scheduled
// Schedule the initial email processing job
try {
await scheduleEmailProcessingJob(payload, queueName)
await scheduleEmailsJob(payload, queueName, 60000) // Schedule in 1 minute
console.log(`🔄 Scheduled initial email processing job in queue: ${queueName}`)
} catch (error) {
console.error('Failed to schedule email processing job:', error)
}

View File

@@ -13,12 +13,13 @@ import {
import { serializeRichTextToHTML, serializeRichTextToText } from '../utils/richTextSerializer.js'
export class MailingService implements IMailingService {
private payload: Payload
public payload: Payload
private config: MailingPluginConfig
private transporter!: Transporter | any
private templatesCollection: string
private emailsCollection: string
private liquid: Liquid | null | false = null
private transporterInitialized = false
constructor(payload: Payload, config: MailingPluginConfig) {
this.payload = payload
@@ -30,10 +31,15 @@ export class MailingService implements IMailingService {
const emailsConfig = config.collections?.emails
this.emailsCollection = typeof emailsConfig === 'string' ? emailsConfig : 'emails'
this.initializeTransporter()
// Only initialize transporter if payload is properly set
if (payload && payload.db) {
this.initializeTransporter()
}
}
private initializeTransporter(): void {
if (this.transporterInitialized) return
if (this.config.transport) {
if ('sendMail' in this.config.transport) {
this.transporter = this.config.transport
@@ -46,6 +52,17 @@ export class MailingService implements IMailingService {
} else {
throw new Error('Email transport configuration is required either in plugin config or Payload config')
}
this.transporterInitialized = true
}
private ensureInitialized(): void {
if (!this.payload || !this.payload.db) {
throw new Error('MailingService payload not properly initialized')
}
if (!this.transporterInitialized) {
this.initializeTransporter()
}
}
private getDefaultFrom(): string {
@@ -108,6 +125,7 @@ export class MailingService implements IMailingService {
}
async renderTemplate(templateSlug: string, variables: TemplateVariables): Promise<{ html: string; text: string; subject: string }> {
this.ensureInitialized()
const template = await this.getTemplateBySlug(templateSlug)
if (!template) {
@@ -125,6 +143,7 @@ export class MailingService implements IMailingService {
}
async processEmails(): Promise<void> {
this.ensureInitialized()
const currentTime = new Date().toISOString()
const { docs: pendingEmails } = await this.payload.find({
@@ -162,6 +181,7 @@ export class MailingService implements IMailingService {
}
async retryFailedEmails(): Promise<void> {
this.ensureInitialized()
const maxAttempts = this.config.retryAttempts || 3
const retryDelay = this.config.retryDelay || 300000 // 5 minutes
const retryTime = new Date(Date.now() - retryDelay).toISOString()