diff --git a/src/jobs/processEmailJob.ts b/src/jobs/processEmailJob.ts index 6156a6a..bd2b900 100644 --- a/src/jobs/processEmailJob.ts +++ b/src/jobs/processEmailJob.ts @@ -9,10 +9,6 @@ export interface ProcessEmailJobInput { * The ID of the email to process */ emailId: string | number - /** - * Optional unique constraint helper to prevent duplicate jobs - */ - uniqueKey?: string } /** diff --git a/src/sendEmail.ts b/src/sendEmail.ts index abb2100..881759d 100644 --- a/src/sendEmail.ts +++ b/src/sendEmail.ts @@ -149,16 +149,26 @@ export const sendEmail = async maxTotalTime) { + throw new Error( + `Job polling timed out after ${maxTotalTime}ms for email ${email.id}. ` + + `The auto-scheduling may have failed or is taking longer than expected.` + ) + } + + // Calculate delay with exponential backoff (25ms, 50ms, 100ms, 200ms, 400ms) + // Cap at 400ms per attempt for better responsiveness + const delay = Math.min(initialDelay * Math.pow(2, attempt), 400) if (attempt > 0) { await new Promise(resolve => setTimeout(resolve, delay)) @@ -178,15 +188,15 @@ export const sendEmail = async = 3) { + // Log on later attempts to help with debugging (reduced threshold) + if (attempt >= 2) { console.log(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`) } } if (!jobId) { throw new Error( - `No processing job found for email ${email.id} after ${maxAttempts} attempts. ` + + `No processing job found for email ${email.id} after ${maxAttempts} attempts (${Date.now() - startTime}ms). ` + `The auto-scheduling may have failed or is taking longer than expected.` ) } diff --git a/src/utils/jobScheduler.ts b/src/utils/jobScheduler.ts index 7aaa96b..f3f70a7 100644 --- a/src/utils/jobScheduler.ts +++ b/src/utils/jobScheduler.ts @@ -47,9 +47,9 @@ export async function ensureEmailJob( const mailingContext = (payload as any).mailing const queueName = options?.queueName || mailingContext?.config?.queue || 'default' - // Implement atomic check-and-create with retry logic to prevent race conditions - const maxAttempts = 5 - const baseDelay = 100 // Start with 100ms + // Implement atomic check-and-create with minimal retry for efficiency + const maxAttempts = 3 // Reduced from 5 for better performance + const baseDelay = 50 // Reduced from 100ms for faster response for (let attempt = 0; attempt < maxAttempts; attempt++) { // Check for existing jobs with precise matching @@ -64,14 +64,12 @@ export async function ensureEmailJob( } try { - // Attempt to create job with specific input that ensures uniqueness + // Attempt to create job - rely on database constraints for duplicate prevention const job = await payload.jobs.queue({ queue: queueName, task: 'process-email', input: { - emailId: normalizedEmailId, - // Add a unique constraint helper to prevent duplicates at queue level - uniqueKey: `email-${normalizedEmailId}-${Date.now()}-${Math.random()}` + emailId: normalizedEmailId }, waitUntil: options?.scheduledAt ? new Date(options.scheduledAt) : undefined }) @@ -85,7 +83,7 @@ export async function ensureEmailJob( } catch (error) { // On any creation error, wait briefly and check again for concurrent creation if (attempt < maxAttempts - 1) { - const delay = baseDelay * Math.pow(2, attempt) // Exponential backoff + const delay = Math.min(baseDelay * Math.pow(1.5, attempt), 200) // Gentler exponential backoff, capped at 200ms await new Promise(resolve => setTimeout(resolve, delay)) // Check if another process succeeded while we were failing