Compare commits

..

9 Commits

Author SHA1 Message Date
Bas
060b1914b6 Merge pull request #46 from xtr-dev/dev
Dev
2025-09-14 20:45:26 +02:00
70fb79cca4 Add has-many relationship from emails to processing jobs
 New Feature:
- Add 'jobs' relationship field to emails collection
- Shows all PayloadCMS jobs associated with each email
- Read-only field with smart filtering by emailId
- Visible in admin interface for better email tracking

🔍 Benefits:
- Track job status and history for each email
- Debug processing issues more easily
- Monitor job queue performance per email
- Complete email processing visibility
2025-09-14 20:41:19 +02:00
f5e04d33ba Simplify error handling in processEmailJob
Remove unnecessary instanceof check since String() handles all types consistently.
2025-09-14 20:38:47 +02:00
27d504079a Fix critical error handling and race condition issues
🔴 Critical fixes:
- Fix race condition: processImmediately now properly fails if job creation fails
- Fix silent job failures: job creation failures now throw errors instead of warnings
- Ensure atomic operations: either email + job succeed together, or both fail

⚠️ Improvements:
- Simplify error handling in processEmailJob to be more consistent
- Add proper validation for missing PayloadCMS jobs configuration
- Make error messages more descriptive and actionable
2025-09-14 20:32:23 +02:00
b6ec55bc45 Bump version to 0.4.6 2025-09-14 20:26:59 +02:00
dcce3324ce Remove redundant blank lines in plugin initialization logic 2025-09-14 20:26:53 +02:00
f1f55d4444 Remove onReady callback from plugin
The onReady callback is no longer needed since the plugin no longer
schedules initial processing jobs during initialization.
2025-09-14 20:25:59 +02:00
b8950932f3 Remove unnecessary initial email processing job scheduling
Since sendEmail() now automatically creates individual jobs for each email,
the plugin no longer needs to schedule an initial batch processing job.
2025-09-14 20:24:45 +02:00
caa3686f1a Flatten email processing structure to individual jobs per email
BREAKING CHANGE: Replaced batch email processing with individual jobs per email

Changes:
- Remove sendEmailTask.ts - no longer needed as each email gets its own job
- Add processEmailJob.ts - handles individual email processing
- Update sendEmail() to automatically create individual job per email
- Add processImmediately option to sendEmail() for instant processing
- Add processJobById() utility to run specific jobs immediately
- Update job registration to use new individual job structure
- Update dev API routes to use new processImmediately pattern
- Fix all TypeScript compilation errors

Benefits:
- Better job queue visibility (one job per email)
- More granular control over individual email processing
- Easier job monitoring and failure tracking
- Maintains backward compatibility via processImmediately option
- Simpler job queue management

Migration:
- Replace sendEmailJob usage with sendEmail({ processImmediately: true })
- Individual emails now appear as separate jobs in queue
- Batch processing still available via processEmailsTask if needed

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-14 20:19:07 +02:00
11 changed files with 195 additions and 306 deletions

View File

@@ -1,6 +1,6 @@
import { getPayload } from 'payload' import { getPayload } from 'payload'
import config from '@payload-config' import config from '@payload-config'
import { sendEmail, processEmailById } from '@xtr-dev/payload-mailing' import { sendEmail } from '@xtr-dev/payload-mailing'
export async function POST(request: Request) { export async function POST(request: Request) {
try { try {
@@ -55,35 +55,21 @@ export async function POST(request: Request) {
emailOptions.data.scheduledAt = scheduledAt ? new Date(scheduledAt) : new Date(Date.now() + 60000) emailOptions.data.scheduledAt = scheduledAt ? new Date(scheduledAt) : new Date(Date.now() + 60000)
} }
// Set processImmediately for "send now" type
const processImmediately = (type === 'send' && !scheduledAt)
emailOptions.processImmediately = processImmediately
const result = await sendEmail(payload, emailOptions) const result = await sendEmail(payload, emailOptions)
// If it's "send now" (not scheduled), process the email immediately
if (type === 'send' && !scheduledAt) {
try {
await processEmailById(payload, String(result.id))
return Response.json({ return Response.json({
success: true, success: true,
emailId: result.id, emailId: result.id,
message: 'Email sent successfully', message: processImmediately ? 'Email sent successfully' :
status: 'sent' scheduledAt ? 'Email scheduled successfully' :
}) 'Email queued successfully',
} catch (processError) { status: processImmediately ? 'sent' :
// If immediate processing fails, return that it's queued scheduledAt ? 'scheduled' :
console.warn('Failed to process email immediately, left in queue:', processError) 'queued'
return Response.json({
success: true,
emailId: result.id,
message: 'Email queued successfully (immediate processing failed)',
status: 'queued'
})
}
}
return Response.json({
success: true,
emailId: result.id,
message: scheduledAt ? 'Email scheduled successfully' : 'Email queued successfully',
status: scheduledAt ? 'scheduled' : 'queued'
}) })
} catch (error) { } catch (error) {
console.error('Test email error:', error) console.error('Test email error:', error)

View File

@@ -1,6 +1,6 @@
{ {
"name": "@xtr-dev/payload-mailing", "name": "@xtr-dev/payload-mailing",
"version": "0.4.5", "version": "0.4.6",
"description": "Template-based email system with scheduling and job processing for PayloadCMS", "description": "Template-based email system with scheduling and job processing for PayloadCMS",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",

View File

@@ -4,7 +4,7 @@ const Emails: CollectionConfig = {
slug: 'emails', slug: 'emails',
admin: { admin: {
useAsTitle: 'subject', useAsTitle: 'subject',
defaultColumns: ['subject', 'to', 'status', 'scheduledAt', 'sentAt'], defaultColumns: ['subject', 'to', 'status', 'jobs', 'scheduledAt', 'sentAt'],
group: 'Mailing', group: 'Mailing',
description: 'Email delivery and status tracking', description: 'Email delivery and status tracking',
}, },
@@ -164,6 +164,24 @@ const Emails: CollectionConfig = {
description: 'Email priority (1=highest, 10=lowest)', description: 'Email priority (1=highest, 10=lowest)',
}, },
}, },
{
name: 'jobs',
type: 'relationship',
relationTo: 'payload-jobs',
hasMany: true,
admin: {
description: 'Processing jobs associated with this email',
allowCreate: false,
readOnly: true,
},
filterOptions: ({ id }) => {
return {
'input.emailId': {
equals: id,
},
}
},
},
], ],
timestamps: true, timestamps: true,
// indexes: [ // indexes: [

View File

@@ -11,9 +11,9 @@ export { MailingService } from './services/MailingService.js'
export { default as EmailTemplates, createEmailTemplatesCollection } from './collections/EmailTemplates.js' export { default as EmailTemplates, createEmailTemplatesCollection } from './collections/EmailTemplates.js'
export { default as Emails } from './collections/Emails.js' export { default as Emails } from './collections/Emails.js'
// Jobs (includes the send email task) // Jobs (includes the individual email processing job)
export { mailingJobs, sendEmailJob } from './jobs/index.js' export { mailingJobs } from './jobs/index.js'
export type { SendEmailTaskInput } from './jobs/sendEmailTask.js' export type { ProcessEmailJobInput } from './jobs/processEmailJob.js'
// Main email sending function // Main email sending function
export { sendEmail, type SendEmailOptions } from './sendEmail.js' export { sendEmail, type SendEmailOptions } from './sendEmail.js'

View File

@@ -1,14 +1,16 @@
import { processEmailsJob } from './processEmailsTask.js' import { processEmailsJob } from './processEmailsTask.js'
import { sendEmailJob } from './sendEmailTask.js' import { processEmailJob } from './processEmailJob.js'
/** /**
* All mailing-related jobs that get registered with Payload * All mailing-related jobs that get registered with Payload
*
* Note: The sendEmailJob has been removed as each email now gets its own individual processEmailJob
*/ */
export const mailingJobs = [ export const mailingJobs = [
processEmailsJob, processEmailsJob, // Kept for backward compatibility and batch processing if needed
sendEmailJob, processEmailJob, // New individual email processing job
] ]
// Re-export everything from individual job files // Re-export everything from individual job files
export * from './processEmailsTask.js' export * from './processEmailsTask.js'
export * from './sendEmailTask.js' export * from './processEmailJob.js'

View File

@@ -0,0 +1,72 @@
import type { PayloadRequest } from 'payload'
import { processEmailById } from '../utils/emailProcessor.js'
/**
* Data passed to the individual email processing job
*/
export interface ProcessEmailJobInput {
/**
* The ID of the email to process
*/
emailId: string | number
}
/**
* Job definition for processing a single email
* This replaces the batch processing approach with individual email jobs
*/
export const processEmailJob = {
slug: 'process-email',
label: 'Process Individual Email',
inputSchema: [
{
name: 'emailId',
type: 'text' as const,
required: true,
label: 'Email ID',
admin: {
description: 'The ID of the email to process and send'
}
}
],
outputSchema: [
{
name: 'success',
type: 'checkbox' as const
},
{
name: 'emailId',
type: 'text' as const
},
{
name: 'status',
type: 'text' as const
}
],
handler: async ({ input, req }: { input: ProcessEmailJobInput; req: PayloadRequest }) => {
const payload = (req as any).payload
const { emailId } = input
if (!emailId) {
throw new Error('Email ID is required for processing')
}
try {
// Process the individual email
await processEmailById(payload, String(emailId))
return {
output: {
success: true,
emailId: String(emailId),
status: 'sent',
message: `Email ${emailId} processed successfully`
}
}
} catch (error) {
throw new Error(`Failed to process email ${emailId}: ${String(error)}`)
}
}
}
export default processEmailJob

View File

@@ -1,256 +0,0 @@
import { sendEmail } from '../sendEmail.js'
import { BaseEmailDocument } from '../types/index.js'
import { processEmailById } from '../utils/emailProcessor.js'
export interface SendEmailTaskInput {
// Template mode fields
templateSlug?: string
variables?: Record<string, any>
// Direct email mode fields
subject?: string
html?: string
text?: string
// Common fields
to: string | string[]
cc?: string | string[]
bcc?: string | string[]
from?: string
fromName?: string
replyTo?: string
scheduledAt?: string | Date // ISO date string or Date object
priority?: number
processImmediately?: boolean // If true, process the email immediately instead of waiting for the queue
// Allow any additional fields that users might have in their email collection
[key: string]: any
}
/**
* Transforms task input into sendEmail options by separating template and data fields
*/
function transformTaskInputToSendEmailOptions(taskInput: SendEmailTaskInput) {
const sendEmailOptions: any = {
data: {}
}
// If using template mode, set template options
if (taskInput.templateSlug) {
sendEmailOptions.template = {
slug: taskInput.templateSlug,
variables: taskInput.variables || {}
}
}
// Standard email fields that should be copied to data
const standardFields = ['to', 'cc', 'bcc', 'from', 'fromName', 'replyTo', 'subject', 'html', 'text', 'scheduledAt', 'priority']
// Fields that should not be copied to data
const excludedFields = ['templateSlug', 'variables', 'processImmediately']
// Copy standard fields to data
standardFields.forEach(field => {
if (taskInput[field] !== undefined) {
sendEmailOptions.data[field] = taskInput[field]
}
})
// Copy any additional custom fields that aren't excluded or standard fields
Object.keys(taskInput).forEach(key => {
if (!excludedFields.includes(key) && !standardFields.includes(key)) {
sendEmailOptions.data[key] = taskInput[key]
}
})
return sendEmailOptions
}
/**
* Job definition for sending emails
* Can be used through Payload's job queue system to send emails programmatically
*/
export const sendEmailJob = {
slug: 'send-email',
label: 'Send Email',
inputSchema: [
{
name: 'processImmediately',
type: 'checkbox' as const,
label: 'Process Immediately',
defaultValue: false,
admin: {
description: 'Process and send the email immediately instead of waiting for the queue processor'
}
},
{
name: 'templateSlug',
type: 'text' as const,
label: 'Template Slug',
admin: {
description: 'Use a template (leave empty for direct email)',
condition: (data: any) => !data.subject && !data.html
}
},
{
name: 'variables',
type: 'json' as const,
label: 'Template Variables',
admin: {
description: 'JSON object with variables for template rendering',
condition: (data: any) => Boolean(data.templateSlug)
}
},
{
name: 'subject',
type: 'text' as const,
label: 'Subject',
admin: {
description: 'Email subject (required if not using template)',
condition: (data: any) => !data.templateSlug
}
},
{
name: 'html',
type: 'textarea' as const,
label: 'HTML Content',
admin: {
description: 'HTML email content (required if not using template)',
condition: (data: any) => !data.templateSlug
}
},
{
name: 'text',
type: 'textarea' as const,
label: 'Text Content',
admin: {
description: 'Plain text email content (optional)',
condition: (data: any) => !data.templateSlug
}
},
{
name: 'to',
type: 'text' as const,
required: true,
label: 'To (Email Recipients)',
admin: {
description: 'Comma-separated list of email addresses'
}
},
{
name: 'cc',
type: 'text' as const,
label: 'CC (Carbon Copy)',
admin: {
description: 'Optional comma-separated list of CC email addresses'
}
},
{
name: 'bcc',
type: 'text' as const,
label: 'BCC (Blind Carbon Copy)',
admin: {
description: 'Optional comma-separated list of BCC email addresses'
}
},
{
name: 'from',
type: 'text' as const,
label: 'From Email',
admin: {
description: 'Optional sender email address (uses default if not provided)'
}
},
{
name: 'fromName',
type: 'text' as const,
label: 'From Name',
admin: {
description: 'Optional sender display name (e.g., "John Doe")'
}
},
{
name: 'replyTo',
type: 'text' as const,
label: 'Reply To',
admin: {
description: 'Optional reply-to email address'
}
},
{
name: 'scheduledAt',
type: 'date' as const,
label: 'Schedule For',
admin: {
description: 'Optional date/time to schedule email for future delivery',
condition: (data: any) => !data.processImmediately
}
},
{
name: 'priority',
type: 'number' as const,
label: 'Priority',
min: 1,
max: 10,
defaultValue: 5,
admin: {
description: 'Email priority (1 = highest, 10 = lowest)'
}
}
],
outputSchema: [
{
name: 'id',
type: 'text' as const
}
],
handler: async ({ input, payload }: any) => {
// Cast input to our expected type
const taskInput = input as SendEmailTaskInput
const shouldProcessImmediately = taskInput.processImmediately || false
try {
// Transform task input into sendEmail options using helper function
const sendEmailOptions = transformTaskInputToSendEmailOptions(taskInput)
// Use the sendEmail helper to create the email
const email = await sendEmail<BaseEmailDocument>(payload, sendEmailOptions)
// If processImmediately is true, process the email now
if (shouldProcessImmediately) {
console.log(`⚡ Processing email ${email.id} immediately...`)
await processEmailById(payload, String(email.id))
console.log(`✅ Email ${email.id} processed and sent immediately`)
return {
output: {
success: true,
id: email.id,
status: 'sent',
processedImmediately: true
}
}
}
return {
output: {
success: true,
id: email.id,
status: 'queued',
processedImmediately: false
}
}
} catch (error) {
// Re-throw Error instances to preserve stack trace and error context
if (error instanceof Error) {
throw error
} else {
// Only wrap non-Error values
throw new Error(`Failed to process email: ${String(error)}`)
}
}
}
}
export default sendEmailJob

View File

@@ -3,7 +3,7 @@ import { MailingPluginConfig, MailingContext } from './types/index.js'
import { MailingService } from './services/MailingService.js' import { MailingService } from './services/MailingService.js'
import { createEmailTemplatesCollection } from './collections/EmailTemplates.js' import { createEmailTemplatesCollection } from './collections/EmailTemplates.js'
import Emails from './collections/Emails.js' import Emails from './collections/Emails.js'
import { mailingJobs, scheduleEmailsJob } from './jobs/index.js' import { mailingJobs } from './jobs/index.js'
export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => { export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Config): Config => {
@@ -106,18 +106,6 @@ export const mailingPlugin = (pluginConfig: MailingPluginConfig) => (config: Con
}, },
} as MailingContext } as MailingContext
// Schedule the initial email processing job
try {
await scheduleEmailsJob(payload, queueName, 60000) // Schedule in 1 minute
} catch (error) {
console.error('Failed to schedule email processing job:', error)
}
// Call onReady callback if provided
if (pluginConfig.onReady) {
await pluginConfig.onReady(payload)
}
if (pluginConfig.initOrder !== 'after' && config.onInit) { if (pluginConfig.initOrder !== 'after' && config.onInit) {
await config.onInit(payload) await config.onInit(payload)
} }

View File

@@ -1,6 +1,7 @@
import { Payload } from 'payload' import { Payload } from 'payload'
import { getMailing, renderTemplate, parseAndValidateEmails } from './utils/helpers.js' import { getMailing, renderTemplate, parseAndValidateEmails } from './utils/helpers.js'
import { BaseEmailDocument } from './types/index.js' import { BaseEmailDocument } from './types/index.js'
import { processJobById } from './utils/emailProcessor.js'
// Options for sending emails // Options for sending emails
export interface SendEmailOptions<T extends BaseEmailDocument = BaseEmailDocument> { export interface SendEmailOptions<T extends BaseEmailDocument = BaseEmailDocument> {
@@ -13,6 +14,8 @@ export interface SendEmailOptions<T extends BaseEmailDocument = BaseEmailDocumen
data?: Partial<T> data?: Partial<T>
// Common options // Common options
collectionSlug?: string // defaults to 'emails' collectionSlug?: string // defaults to 'emails'
processImmediately?: boolean // if true, creates job and processes it immediately
queue?: string // queue name for the job, defaults to mailing config queue
} }
/** /**
@@ -39,8 +42,8 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
payload: Payload, payload: Payload,
options: SendEmailOptions<TEmail> options: SendEmailOptions<TEmail>
): Promise<TEmail> => { ): Promise<TEmail> => {
const mailing = getMailing(payload) const mailingConfig = getMailing(payload)
const collectionSlug = options.collectionSlug || mailing.collections.emails || 'emails' const collectionSlug = options.collectionSlug || mailingConfig.collections.emails || 'emails'
let emailData: Partial<TEmail> = { ...options.data } as Partial<TEmail> let emailData: Partial<TEmail> = { ...options.data } as Partial<TEmail>
@@ -139,6 +142,58 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
throw new Error('Failed to create email: invalid response from database') throw new Error('Failed to create email: invalid response from database')
} }
// Create an individual job for this email
const queueName = options.queue || mailingConfig.queue || 'default'
if (!payload.jobs) {
if (options.processImmediately) {
throw new Error('PayloadCMS jobs not configured - cannot process email immediately')
} else {
console.warn('PayloadCMS jobs not configured - emails will not be processed automatically')
return email as TEmail
}
}
let jobId: string
try {
const job = await payload.jobs.queue({
queue: queueName,
task: 'process-email',
input: {
emailId: String(email.id)
},
// If scheduled, set the waitUntil date
waitUntil: emailData.scheduledAt ? new Date(emailData.scheduledAt) : undefined
})
jobId = String(job.id)
} catch (error) {
// Clean up the orphaned email since job creation failed
try {
await payload.delete({
collection: collectionSlug,
id: email.id
})
} catch (deleteError) {
console.error(`Failed to clean up orphaned email ${email.id} after job creation failure:`, deleteError)
}
// Throw the original job creation error
const errorMsg = `Failed to create processing job for email ${email.id}: ${String(error)}`
throw new Error(errorMsg)
}
// If processImmediately is true, process the job now
if (options.processImmediately) {
try {
await processJobById(payload, jobId)
} catch (error) {
// For immediate processing failures, we could consider cleanup, but the job exists and could be retried later
// So we'll leave the email and job in place for potential retry
throw new Error(`Failed to process email ${email.id} immediately: ${String(error)}`)
}
}
return email as TEmail return email as TEmail
} }

View File

@@ -76,7 +76,6 @@ export interface MailingPluginConfig {
templateEngine?: TemplateEngine templateEngine?: TemplateEngine
richTextEditor?: RichTextField['editor'] richTextEditor?: RichTextField['editor']
beforeSend?: BeforeSendHook beforeSend?: BeforeSendHook
onReady?: (payload: any) => Promise<void>
initOrder?: 'before' | 'after' initOrder?: 'before' | 'after'
} }

View File

@@ -29,6 +29,31 @@ export async function processEmailById(payload: Payload, emailId: string): Promi
await mailingContext.service.processEmailItem(emailId) await mailingContext.service.processEmailItem(emailId)
} }
/**
* Processes a job immediately by finding and executing it
* @param payload Payload instance
* @param jobId The ID of the job to run immediately
* @returns Promise that resolves when job is processed
*/
export async function processJobById(payload: Payload, jobId: string): Promise<void> {
if (!payload.jobs) {
throw new Error('PayloadCMS jobs not configured - cannot process job immediately')
}
try {
// Run a specific job by its ID (using where clause to find the job)
await payload.jobs.run({
where: {
id: {
equals: jobId
}
}
})
} catch (error) {
throw new Error(`Failed to process job ${jobId}: ${String(error)}`)
}
}
/** /**
* Processes all pending and failed emails using the mailing service * Processes all pending and failed emails using the mailing service
* @param payload Payload instance * @param payload Payload instance