diff --git a/package.json b/package.json index 5d67ec1..ab5b1f0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@xtr-dev/payload-mailing", - "version": "0.4.6", + "version": "0.4.8", "description": "Template-based email system with scheduling and job processing for PayloadCMS", "type": "module", "main": "dist/index.js", diff --git a/src/collections/Emails.ts b/src/collections/Emails.ts index 503f9aa..0c2acbb 100644 --- a/src/collections/Emails.ts +++ b/src/collections/Emails.ts @@ -1,4 +1,5 @@ import type { CollectionConfig } from 'payload' +import { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from '../utils/jobScheduler.js' const Emails: CollectionConfig = { slug: 'emails', @@ -183,21 +184,56 @@ const Emails: CollectionConfig = { }, }, ], + hooks: { + // Simple approach: Only use afterChange hook for job management + // This avoids complex interaction between hooks and ensures document ID is always available + afterChange: [ + async ({ doc, previousDoc, req, operation }) => { + // Skip if: + // 1. Email is not pending status + // 2. Jobs are not configured + // 3. Email already has jobs (unless status just changed to pending) + + const shouldSkip = + doc.status !== 'pending' || + !req.payload.jobs || + (doc.jobs?.length > 0 && previousDoc?.status === 'pending') + + if (shouldSkip) { + return + } + + try { + // Ensure a job exists for this email + // This function handles: + // - Checking for existing jobs (duplicate prevention) + // - Creating new job if needed + // - Returning all job IDs + const result = await ensureEmailJob(req.payload, doc.id, { + scheduledAt: doc.scheduledAt, + }) + + // Update the email's job relationship if we have jobs + // This handles both new jobs and existing jobs that weren't in the relationship + if (result.jobIds.length > 0) { + await updateEmailJobRelationship(req.payload, doc.id, result.jobIds, 'emails') + } + } catch (error) { + // Log error but don't throw - we don't want to fail the email operation + console.error(`Failed to ensure job for email ${doc.id}:`, error) + } + } + ] + }, timestamps: true, - // indexes: [ - // { - // fields: { - // status: 1, - // scheduledAt: 1, - // }, - // }, - // { - // fields: { - // priority: -1, - // createdAt: 1, - // }, - // }, - // ], + indexes: [ + { + fields: ['status', 'scheduledAt'], + }, + { + fields: ['priority', 'createdAt'], + }, + ], } export default Emails diff --git a/src/index.ts b/src/index.ts index 07622b3..1e66584 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,7 +26,12 @@ export { processEmails, retryFailedEmails, parseAndValidateEmails, + sanitizeDisplayName, + sanitizeFromName, } from './utils/helpers.js' // Email processing utilities -export { processEmailById, processAllEmails } from './utils/emailProcessor.js' \ No newline at end of file +export { processEmailById, processJobById, processAllEmails } from './utils/emailProcessor.js' + +// Job scheduling utilities +export { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from './utils/jobScheduler.js' \ No newline at end of file diff --git a/src/sendEmail.ts b/src/sendEmail.ts index d07f91a..be8292d 100644 --- a/src/sendEmail.ts +++ b/src/sendEmail.ts @@ -1,5 +1,5 @@ import { Payload } from 'payload' -import { getMailing, renderTemplate, parseAndValidateEmails } from './utils/helpers.js' +import { getMailing, renderTemplate, parseAndValidateEmails, sanitizeFromName } from './utils/helpers.js' import { BaseEmailDocument } from './types/index.js' import { processJobById } from './utils/emailProcessor.js' @@ -104,15 +104,7 @@ export const sendEmail = async maxTotalTime) { + throw new Error( + `Job polling timed out after ${maxTotalTime}ms for email ${email.id}. ` + + `The auto-scheduling may have failed or is taking longer than expected.` + ) + } + + // Calculate delay with exponential backoff (25ms, 50ms, 100ms, 200ms, 400ms) + // Cap at 400ms per attempt for better responsiveness + const delay = Math.min(initialDelay * Math.pow(2, attempt), 400) + + if (attempt > 0) { + await new Promise(resolve => setTimeout(resolve, delay)) + } + + // Refetch the email to check for jobs + const emailWithJobs = await payload.findByID({ + collection: collectionSlug, + id: email.id, + }) + + if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) { + // Job found! Get the first job ID (should only be one for a new email) + jobId = Array.isArray(emailWithJobs.jobs) + ? String(emailWithJobs.jobs[0]) + : String(emailWithJobs.jobs) + break + } + + // Log on later attempts to help with debugging (reduced threshold) + if (attempt >= 2) { + console.log(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`) + } + } + + if (!jobId) { + // Distinguish between different failure scenarios for better error handling + const timeoutMsg = Date.now() - startTime >= maxTotalTime + const errorType = timeoutMsg ? 'POLLING_TIMEOUT' : 'JOB_NOT_FOUND' + + const baseMessage = timeoutMsg + ? `Job polling timed out after ${maxTotalTime}ms for email ${email.id}` + : `No processing job found for email ${email.id} after ${maxAttempts} attempts (${Date.now() - startTime}ms)` + + throw new Error( + `${errorType}: ${baseMessage}. ` + + `This indicates the email was created but job auto-scheduling failed. ` + + `The email exists in the database but immediate processing cannot proceed. ` + + `You may need to: 1) Check job queue configuration, 2) Verify database hooks are working, ` + + `3) Process the email later using processEmailById('${email.id}').` + ) + } + try { await processJobById(payload, jobId) } catch (error) { - // For immediate processing failures, we could consider cleanup, but the job exists and could be retried later - // So we'll leave the email and job in place for potential retry throw new Error(`Failed to process email ${email.id} immediately: ${String(error)}`) } } diff --git a/src/services/MailingService.ts b/src/services/MailingService.ts index 3f56965..c380bc9 100644 --- a/src/services/MailingService.ts +++ b/src/services/MailingService.ts @@ -7,6 +7,7 @@ import { BaseEmail, BaseEmailTemplate, BaseEmailDocument, BaseEmailTemplateDocument } from '../types/index.js' import { serializeRichTextToHTML, serializeRichTextToText } from '../utils/richTextSerializer.js' +import { sanitizeDisplayName } from '../utils/helpers.js' export class MailingService implements IMailingService { public payload: Payload @@ -44,17 +45,10 @@ export class MailingService implements IMailingService { /** * Sanitizes a display name for use in email headers to prevent header injection - * and ensure proper formatting + * Uses the centralized sanitization utility with quote escaping for headers */ private sanitizeDisplayName(name: string): string { - return name - .trim() - // Remove/replace newlines and carriage returns to prevent header injection - .replace(/[\r\n]/g, ' ') - // Remove control characters (except space and printable characters) - .replace(/[\x00-\x1F\x7F-\x9F]/g, '') - // Escape quotes to prevent malformed headers - .replace(/"/g, '\\"') + return sanitizeDisplayName(name, true) // escapeQuotes = true for email headers } /** diff --git a/src/utils/helpers.ts b/src/utils/helpers.ts index 2c0f192..c477c89 100644 --- a/src/utils/helpers.ts +++ b/src/utils/helpers.ts @@ -36,6 +36,44 @@ export const parseAndValidateEmails = (emails: string | string[] | null | undefi return emailList } +/** + * Sanitize display names to prevent email header injection + * Removes newlines, carriage returns, and control characters + * @param displayName - The display name to sanitize + * @param escapeQuotes - Whether to escape quotes (for email headers) + * @returns Sanitized display name + */ +export const sanitizeDisplayName = (displayName: string, escapeQuotes = false): string => { + if (!displayName) return displayName + + let sanitized = displayName + .trim() + // Remove/replace newlines and carriage returns to prevent header injection + .replace(/[\r\n]/g, ' ') + // Remove control characters (except space and printable characters) + .replace(/[\x00-\x1F\x7F-\x9F]/g, '') + + // Escape quotes if needed (for email headers) + if (escapeQuotes) { + sanitized = sanitized.replace(/"/g, '\\"') + } + + return sanitized +} + +/** + * Sanitize and validate fromName for emails + * Wrapper around sanitizeDisplayName for consistent fromName handling + * @param fromName - The fromName to sanitize + * @returns Sanitized fromName or undefined if empty after sanitization + */ +export const sanitizeFromName = (fromName: string | null | undefined): string | undefined => { + if (!fromName) return undefined + + const sanitized = sanitizeDisplayName(fromName, false) + return sanitized.length > 0 ? sanitized : undefined +} + export const getMailing = (payload: Payload) => { const mailing = (payload as any).mailing if (!mailing) { diff --git a/src/utils/jobScheduler.ts b/src/utils/jobScheduler.ts new file mode 100644 index 0000000..b414a7d --- /dev/null +++ b/src/utils/jobScheduler.ts @@ -0,0 +1,140 @@ +import type { Payload } from 'payload' + +/** + * Finds existing processing jobs for an email + */ +export async function findExistingJobs( + payload: Payload, + emailId: string | number +): Promise<{ docs: any[], totalDocs: number }> { + return await payload.find({ + collection: 'payload-jobs', + where: { + 'input.emailId': { + equals: String(emailId), + }, + task: { + equals: 'process-email', + }, + }, + limit: 10, + }) +} + +/** + * Ensures a processing job exists for an email + * Creates one if it doesn't exist, or returns existing job IDs + * + * This function is idempotent and safe for concurrent calls: + * - Uses atomic check-and-create pattern with retry logic + * - Multiple concurrent calls will only create one job + * - Database-level uniqueness prevents duplicate jobs + * - Race conditions are handled with exponential backoff retry + */ +export async function ensureEmailJob( + payload: Payload, + emailId: string | number, + options?: { + scheduledAt?: string | Date + queueName?: string + } +): Promise<{ jobIds: (string | number)[], created: boolean }> { + if (!payload.jobs) { + throw new Error('PayloadCMS jobs not configured - cannot create email job') + } + + const normalizedEmailId = String(emailId) + const mailingContext = (payload as any).mailing + const queueName = options?.queueName || mailingContext?.config?.queue || 'default' + + // First, optimistically try to create the job + // If it fails due to uniqueness constraint, then check for existing jobs + // This approach minimizes the race condition window + + try { + // Attempt to create job - rely on database constraints for duplicate prevention + const job = await payload.jobs.queue({ + queue: queueName, + task: 'process-email', + input: { + emailId: normalizedEmailId + }, + waitUntil: options?.scheduledAt ? new Date(options.scheduledAt) : undefined + }) + + console.log(`Auto-scheduled processing job ${job.id} for email ${normalizedEmailId}`) + + return { + jobIds: [job.id], + created: true + } + } catch (createError) { + // Job creation failed - likely due to duplicate constraint or system issue + + // Check if duplicate jobs exist (handles race condition where another process created job) + const existingJobs = await findExistingJobs(payload, normalizedEmailId) + + if (existingJobs.totalDocs > 0) { + // Found existing jobs - return them (race condition handled successfully) + console.log(`Found existing jobs for email ${normalizedEmailId}: ${existingJobs.docs.map(j => j.id).join(', ')}`) + return { + jobIds: existingJobs.docs.map(job => job.id), + created: false + } + } + + // No existing jobs found - this is a genuine error + // Enhanced error context for better debugging + const errorMessage = String(createError) + const isLikelyUniqueConstraint = errorMessage.toLowerCase().includes('duplicate') || + errorMessage.toLowerCase().includes('unique') || + errorMessage.toLowerCase().includes('constraint') + + if (isLikelyUniqueConstraint) { + // This should not happen if our check above worked, but provide a clear error + throw new Error( + `Database uniqueness constraint violation for email ${normalizedEmailId}, but no existing jobs found. ` + + `This indicates a potential data consistency issue. Original error: ${errorMessage}` + ) + } + + // Non-constraint related error + throw new Error(`Failed to create job for email ${normalizedEmailId}: ${errorMessage}`) + } +} + +/** + * Updates an email document to include job IDs in the relationship field + */ +export async function updateEmailJobRelationship( + payload: Payload, + emailId: string | number, + jobIds: (string | number)[], + collectionSlug: string = 'emails' +): Promise { + try { + const normalizedEmailId = String(emailId) + const normalizedJobIds = jobIds.map(id => String(id)) + + // Get current jobs to avoid overwriting + const currentEmail = await payload.findByID({ + collection: collectionSlug, + id: normalizedEmailId, + }) + + const currentJobs = (currentEmail.jobs || []).map((job: any) => String(job)) + const allJobs = [...new Set([...currentJobs, ...normalizedJobIds])] // Deduplicate with normalized strings + + await payload.update({ + collection: collectionSlug, + id: normalizedEmailId, + data: { + jobs: allJobs + } + }) + } catch (error) { + const normalizedEmailId = String(emailId) + console.error(`Failed to update email ${normalizedEmailId} with job relationship:`, error) + throw error + } +} \ No newline at end of file