diff --git a/src/sendEmail.ts b/src/sendEmail.ts index 1309780..3b6d624 100644 --- a/src/sendEmail.ts +++ b/src/sendEmail.ts @@ -3,6 +3,7 @@ import { getMailing, renderTemplate, parseAndValidateEmails, sanitizeFromName } import { BaseEmailDocument } from './types/index.js' import { processJobById } from './utils/emailProcessor.js' import { createContextLogger } from './utils/logger.js' +import { pollForJobId } from './utils/jobPolling.js' // Options for sending emails export interface SendEmailOptions { @@ -130,57 +131,14 @@ export const sendEmail = async maxTotalTime) { - throw new Error( - `Job polling timed out after ${maxTotalTime}ms for email ${email.id}. ` + - `The auto-scheduling may have failed or is taking longer than expected.` - ) - } - - const delay = Math.min(initialDelay * Math.pow(2, attempt), 400) - - if (attempt > 0) { - await new Promise(resolve => setTimeout(resolve, delay)) - } - - const emailWithJobs = await payload.findByID({ - collection: collectionSlug, - id: email.id, - }) - - if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) { - const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs - jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob) - break - } - - if (attempt >= 2) { - logger.debug(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`) - } - } - - if (!jobId) { - const timeoutMsg = Date.now() - startTime >= maxTotalTime - const errorType = timeoutMsg ? 'POLLING_TIMEOUT' : 'JOB_NOT_FOUND' - const baseMessage = timeoutMsg - ? `Job polling timed out after ${maxTotalTime}ms for email ${email.id}` - : `No processing job found for email ${email.id} after ${maxAttempts} attempts (${Date.now() - startTime}ms)` - - throw new Error( - `${errorType}: ${baseMessage}. ` + - `This indicates the email was created but job auto-scheduling failed. ` + - `The email exists in the database but immediate processing cannot proceed. ` + - `You may need to: 1) Check job queue configuration, 2) Verify database hooks are working, ` + - `3) Process the email later using processEmailById('${email.id}').` - ) - } + // Poll for the job ID using configurable polling mechanism + const { jobId } = await pollForJobId({ + payload, + collectionSlug, + emailId: email.id, + config: mailingConfig.jobPolling, + logger, + }) try { await processJobById(payload, jobId) diff --git a/src/types/index.ts b/src/types/index.ts index 7e01045..4826286 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -68,6 +68,13 @@ export interface BeforeSendMailOptions { export type BeforeSendHook = (options: BeforeSendMailOptions, email: BaseEmailDocument) => BeforeSendMailOptions | Promise +export interface JobPollingConfig { + maxAttempts?: number // Maximum number of polling attempts (default: 5) + initialDelay?: number // Initial delay in milliseconds (default: 25) + maxTotalTime?: number // Maximum total polling time in milliseconds (default: 3000) + maxBackoffDelay?: number // Maximum delay between attempts in milliseconds (default: 400) +} + export interface MailingPluginConfig { collections?: { templates?: string | Partial @@ -83,6 +90,7 @@ export interface MailingPluginConfig { richTextEditor?: RichTextField['editor'] beforeSend?: BeforeSendHook initOrder?: 'before' | 'after' + jobPolling?: JobPollingConfig } export interface QueuedEmail { diff --git a/src/utils/jobPolling.ts b/src/utils/jobPolling.ts new file mode 100644 index 0000000..14be8dd --- /dev/null +++ b/src/utils/jobPolling.ts @@ -0,0 +1,115 @@ +import { Payload } from 'payload' +import { JobPollingConfig } from '../types/index.js' + +export interface PollForJobIdOptions { + payload: Payload + collectionSlug: string + emailId: string | number + config?: JobPollingConfig + logger?: { + debug: (message: string, ...args: any[]) => void + info: (message: string, ...args: any[]) => void + warn: (message: string, ...args: any[]) => void + error: (message: string, ...args: any[]) => void + } +} + +export interface PollForJobIdResult { + jobId: string + attempts: number + elapsedTime: number +} + +// Default job polling configuration values +const DEFAULT_JOB_POLLING_CONFIG: Required = { + maxAttempts: 5, + initialDelay: 25, + maxTotalTime: 3000, + maxBackoffDelay: 400, +} + +/** + * Polls for a job ID associated with an email document using exponential backoff. + * This utility handles the complexity of waiting for auto-scheduled jobs to be created. + * + * The polling mechanism uses exponential backoff with configurable parameters: + * - Starts with an initial delay and doubles on each retry + * - Caps individual delays at maxBackoffDelay + * - Enforces a maximum total polling time + * + * @param options - Polling options including payload, collection, email ID, and config + * @returns Promise resolving to job ID and timing information + * @throws Error if job is not found within the configured limits + */ +export const pollForJobId = async (options: PollForJobIdOptions): Promise => { + const { payload, collectionSlug, emailId, logger } = options + + // Merge user config with defaults + const config: Required = { + ...DEFAULT_JOB_POLLING_CONFIG, + ...options.config, + } + + const { maxAttempts, initialDelay, maxTotalTime, maxBackoffDelay } = config + const startTime = Date.now() + let jobId: string | undefined + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + const elapsedTime = Date.now() - startTime + + // Check if we've exceeded the maximum total polling time + if (elapsedTime > maxTotalTime) { + throw new Error( + `Job polling timed out after ${maxTotalTime}ms for email ${emailId}. ` + + `The auto-scheduling may have failed or is taking longer than expected.` + ) + } + + // Calculate exponential backoff delay, capped at maxBackoffDelay + const delay = Math.min(initialDelay * Math.pow(2, attempt), maxBackoffDelay) + + // Wait before checking (skip on first attempt) + if (attempt > 0) { + await new Promise(resolve => setTimeout(resolve, delay)) + } + + // Fetch the email document to check for associated jobs + const emailWithJobs = await payload.findByID({ + collection: collectionSlug, + id: emailId, + }) + + // Check if jobs array exists and has entries + if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) { + const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs + jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob) + + return { + jobId, + attempts: attempt + 1, + elapsedTime: Date.now() - startTime, + } + } + + // Log progress for attempts after the second try + if (attempt >= 2 && logger) { + logger.debug(`Waiting for job creation for email ${emailId}, attempt ${attempt + 1}/${maxAttempts}`) + } + } + + // If we reach here, job was not found + const elapsedTime = Date.now() - startTime + const timeoutMsg = elapsedTime >= maxTotalTime + const errorType = timeoutMsg ? 'POLLING_TIMEOUT' : 'JOB_NOT_FOUND' + const baseMessage = timeoutMsg + ? `Job polling timed out after ${maxTotalTime}ms for email ${emailId}` + : `No processing job found for email ${emailId} after ${maxAttempts} attempts (${elapsedTime}ms)` + + throw new Error( + `${errorType}: ${baseMessage}. ` + + `This indicates the email was created but job auto-scheduling failed. ` + + `The email exists in the database but immediate processing cannot proceed. ` + + `You may need to: 1) Check job queue configuration, 2) Verify database hooks are working, ` + + `3) Process the email later using processEmailById('${emailId}').` + ) +}