mirror of
https://github.com/xtr-dev/payload-mailing.git
synced 2025-12-10 00:03:23 +00:00
Refactor immediate processing to use configurable job polling
Extract complex polling mechanism from sendEmail.ts into dedicated utility function (jobPolling.ts) and make polling parameters configurable via plugin options. This improves code maintainability and allows users to customize polling behavior through the jobPolling config option. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ import { getMailing, renderTemplate, parseAndValidateEmails, sanitizeFromName }
|
||||
import { BaseEmailDocument } from './types/index.js'
|
||||
import { processJobById } from './utils/emailProcessor.js'
|
||||
import { createContextLogger } from './utils/logger.js'
|
||||
import { pollForJobId } from './utils/jobPolling.js'
|
||||
|
||||
// Options for sending emails
|
||||
export interface SendEmailOptions<T extends BaseEmailDocument = BaseEmailDocument> {
|
||||
@@ -130,57 +131,14 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
|
||||
throw new Error('PayloadCMS jobs not configured - cannot process email immediately')
|
||||
}
|
||||
|
||||
const maxAttempts = 5
|
||||
const initialDelay = 25
|
||||
const maxTotalTime = 3000
|
||||
const startTime = Date.now()
|
||||
let jobId: string | undefined
|
||||
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
if (Date.now() - startTime > maxTotalTime) {
|
||||
throw new Error(
|
||||
`Job polling timed out after ${maxTotalTime}ms for email ${email.id}. ` +
|
||||
`The auto-scheduling may have failed or is taking longer than expected.`
|
||||
)
|
||||
}
|
||||
|
||||
const delay = Math.min(initialDelay * Math.pow(2, attempt), 400)
|
||||
|
||||
if (attempt > 0) {
|
||||
await new Promise(resolve => setTimeout(resolve, delay))
|
||||
}
|
||||
|
||||
const emailWithJobs = await payload.findByID({
|
||||
collection: collectionSlug,
|
||||
id: email.id,
|
||||
})
|
||||
|
||||
if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) {
|
||||
const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs
|
||||
jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob)
|
||||
break
|
||||
}
|
||||
|
||||
if (attempt >= 2) {
|
||||
logger.debug(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`)
|
||||
}
|
||||
}
|
||||
|
||||
if (!jobId) {
|
||||
const timeoutMsg = Date.now() - startTime >= maxTotalTime
|
||||
const errorType = timeoutMsg ? 'POLLING_TIMEOUT' : 'JOB_NOT_FOUND'
|
||||
const baseMessage = timeoutMsg
|
||||
? `Job polling timed out after ${maxTotalTime}ms for email ${email.id}`
|
||||
: `No processing job found for email ${email.id} after ${maxAttempts} attempts (${Date.now() - startTime}ms)`
|
||||
|
||||
throw new Error(
|
||||
`${errorType}: ${baseMessage}. ` +
|
||||
`This indicates the email was created but job auto-scheduling failed. ` +
|
||||
`The email exists in the database but immediate processing cannot proceed. ` +
|
||||
`You may need to: 1) Check job queue configuration, 2) Verify database hooks are working, ` +
|
||||
`3) Process the email later using processEmailById('${email.id}').`
|
||||
)
|
||||
}
|
||||
// Poll for the job ID using configurable polling mechanism
|
||||
const { jobId } = await pollForJobId({
|
||||
payload,
|
||||
collectionSlug,
|
||||
emailId: email.id,
|
||||
config: mailingConfig.jobPolling,
|
||||
logger,
|
||||
})
|
||||
|
||||
try {
|
||||
await processJobById(payload, jobId)
|
||||
|
||||
@@ -68,6 +68,13 @@ export interface BeforeSendMailOptions {
|
||||
|
||||
export type BeforeSendHook = (options: BeforeSendMailOptions, email: BaseEmailDocument) => BeforeSendMailOptions | Promise<BeforeSendMailOptions>
|
||||
|
||||
export interface JobPollingConfig {
|
||||
maxAttempts?: number // Maximum number of polling attempts (default: 5)
|
||||
initialDelay?: number // Initial delay in milliseconds (default: 25)
|
||||
maxTotalTime?: number // Maximum total polling time in milliseconds (default: 3000)
|
||||
maxBackoffDelay?: number // Maximum delay between attempts in milliseconds (default: 400)
|
||||
}
|
||||
|
||||
export interface MailingPluginConfig {
|
||||
collections?: {
|
||||
templates?: string | Partial<CollectionConfig>
|
||||
@@ -83,6 +90,7 @@ export interface MailingPluginConfig {
|
||||
richTextEditor?: RichTextField['editor']
|
||||
beforeSend?: BeforeSendHook
|
||||
initOrder?: 'before' | 'after'
|
||||
jobPolling?: JobPollingConfig
|
||||
}
|
||||
|
||||
export interface QueuedEmail {
|
||||
|
||||
115
src/utils/jobPolling.ts
Normal file
115
src/utils/jobPolling.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { Payload } from 'payload'
|
||||
import { JobPollingConfig } from '../types/index.js'
|
||||
|
||||
export interface PollForJobIdOptions {
|
||||
payload: Payload
|
||||
collectionSlug: string
|
||||
emailId: string | number
|
||||
config?: JobPollingConfig
|
||||
logger?: {
|
||||
debug: (message: string, ...args: any[]) => void
|
||||
info: (message: string, ...args: any[]) => void
|
||||
warn: (message: string, ...args: any[]) => void
|
||||
error: (message: string, ...args: any[]) => void
|
||||
}
|
||||
}
|
||||
|
||||
export interface PollForJobIdResult {
|
||||
jobId: string
|
||||
attempts: number
|
||||
elapsedTime: number
|
||||
}
|
||||
|
||||
// Default job polling configuration values
|
||||
const DEFAULT_JOB_POLLING_CONFIG: Required<JobPollingConfig> = {
|
||||
maxAttempts: 5,
|
||||
initialDelay: 25,
|
||||
maxTotalTime: 3000,
|
||||
maxBackoffDelay: 400,
|
||||
}
|
||||
|
||||
/**
|
||||
* Polls for a job ID associated with an email document using exponential backoff.
|
||||
* This utility handles the complexity of waiting for auto-scheduled jobs to be created.
|
||||
*
|
||||
* The polling mechanism uses exponential backoff with configurable parameters:
|
||||
* - Starts with an initial delay and doubles on each retry
|
||||
* - Caps individual delays at maxBackoffDelay
|
||||
* - Enforces a maximum total polling time
|
||||
*
|
||||
* @param options - Polling options including payload, collection, email ID, and config
|
||||
* @returns Promise resolving to job ID and timing information
|
||||
* @throws Error if job is not found within the configured limits
|
||||
*/
|
||||
export const pollForJobId = async (options: PollForJobIdOptions): Promise<PollForJobIdResult> => {
|
||||
const { payload, collectionSlug, emailId, logger } = options
|
||||
|
||||
// Merge user config with defaults
|
||||
const config: Required<JobPollingConfig> = {
|
||||
...DEFAULT_JOB_POLLING_CONFIG,
|
||||
...options.config,
|
||||
}
|
||||
|
||||
const { maxAttempts, initialDelay, maxTotalTime, maxBackoffDelay } = config
|
||||
const startTime = Date.now()
|
||||
let jobId: string | undefined
|
||||
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
const elapsedTime = Date.now() - startTime
|
||||
|
||||
// Check if we've exceeded the maximum total polling time
|
||||
if (elapsedTime > maxTotalTime) {
|
||||
throw new Error(
|
||||
`Job polling timed out after ${maxTotalTime}ms for email ${emailId}. ` +
|
||||
`The auto-scheduling may have failed or is taking longer than expected.`
|
||||
)
|
||||
}
|
||||
|
||||
// Calculate exponential backoff delay, capped at maxBackoffDelay
|
||||
const delay = Math.min(initialDelay * Math.pow(2, attempt), maxBackoffDelay)
|
||||
|
||||
// Wait before checking (skip on first attempt)
|
||||
if (attempt > 0) {
|
||||
await new Promise(resolve => setTimeout(resolve, delay))
|
||||
}
|
||||
|
||||
// Fetch the email document to check for associated jobs
|
||||
const emailWithJobs = await payload.findByID({
|
||||
collection: collectionSlug,
|
||||
id: emailId,
|
||||
})
|
||||
|
||||
// Check if jobs array exists and has entries
|
||||
if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) {
|
||||
const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs
|
||||
jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob)
|
||||
|
||||
return {
|
||||
jobId,
|
||||
attempts: attempt + 1,
|
||||
elapsedTime: Date.now() - startTime,
|
||||
}
|
||||
}
|
||||
|
||||
// Log progress for attempts after the second try
|
||||
if (attempt >= 2 && logger) {
|
||||
logger.debug(`Waiting for job creation for email ${emailId}, attempt ${attempt + 1}/${maxAttempts}`)
|
||||
}
|
||||
}
|
||||
|
||||
// If we reach here, job was not found
|
||||
const elapsedTime = Date.now() - startTime
|
||||
const timeoutMsg = elapsedTime >= maxTotalTime
|
||||
const errorType = timeoutMsg ? 'POLLING_TIMEOUT' : 'JOB_NOT_FOUND'
|
||||
const baseMessage = timeoutMsg
|
||||
? `Job polling timed out after ${maxTotalTime}ms for email ${emailId}`
|
||||
: `No processing job found for email ${emailId} after ${maxAttempts} attempts (${elapsedTime}ms)`
|
||||
|
||||
throw new Error(
|
||||
`${errorType}: ${baseMessage}. ` +
|
||||
`This indicates the email was created but job auto-scheduling failed. ` +
|
||||
`The email exists in the database but immediate processing cannot proceed. ` +
|
||||
`You may need to: 1) Check job queue configuration, 2) Verify database hooks are working, ` +
|
||||
`3) Process the email later using processEmailById('${emailId}').`
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user