Merge pull request #59 from xtr-dev/dev

Dev
This commit is contained in:
Bas
2025-09-20 20:29:07 +02:00
committed by GitHub
4 changed files with 9 additions and 30 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "@xtr-dev/payload-mailing", "name": "@xtr-dev/payload-mailing",
"version": "0.4.11", "version": "0.4.12",
"description": "Template-based email system with scheduling and job processing for PayloadCMS", "description": "Template-based email system with scheduling and job processing for PayloadCMS",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",

View File

@@ -139,7 +139,6 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
// If processImmediately is true, get the job from the relationship and process it now // If processImmediately is true, get the job from the relationship and process it now
if (options.processImmediately) { if (options.processImmediately) {
const logger = createContextLogger(payload, 'IMMEDIATE') const logger = createContextLogger(payload, 'IMMEDIATE')
logger.debug(`Starting immediate processing for email ${email.id}`)
if (!payload.jobs) { if (!payload.jobs) {
throw new Error('PayloadCMS jobs not configured - cannot process email immediately') throw new Error('PayloadCMS jobs not configured - cannot process email immediately')
@@ -153,7 +152,6 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
const startTime = Date.now() const startTime = Date.now()
let jobId: string | undefined let jobId: string | undefined
logger.debug(`Polling for job creation (max ${maxAttempts} attempts, ${maxTotalTime}ms timeout)`)
for (let attempt = 0; attempt < maxAttempts; attempt++) { for (let attempt = 0; attempt < maxAttempts; attempt++) {
// Check total timeout before continuing // Check total timeout before continuing
@@ -178,21 +176,21 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
id: email.id, id: email.id,
}) })
logger.debug(`Attempt ${attempt + 1}/${maxAttempts}: Found ${emailWithJobs.jobs?.length || 0} jobs for email ${email.id}`)
if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) { if (emailWithJobs.jobs && emailWithJobs.jobs.length > 0) {
// Job found! Get the first job ID (should only be one for a new email) // Job found! Get the first job ID (should only be one for a new email)
const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs const firstJob = Array.isArray(emailWithJobs.jobs) ? emailWithJobs.jobs[0] : emailWithJobs.jobs
jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob) jobId = typeof firstJob === 'string' ? firstJob : String(firstJob.id || firstJob)
logger.info(`Found job ID: ${jobId}`)
break break
} }
// Log on later attempts to help with debugging (reduced threshold) // Log on later attempts to help with debugging (reduced threshold)
if (attempt >= 1) { if (attempt >= 1) {
if (attempt >= 2) {
logger.debug(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`) logger.debug(`Waiting for job creation for email ${email.id}, attempt ${attempt + 1}/${maxAttempts}`)
} }
} }
}
if (!jobId) { if (!jobId) {
// Distinguish between different failure scenarios for better error handling // Distinguish between different failure scenarios for better error handling
@@ -212,10 +210,9 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
) )
} }
logger.info(`Starting job execution for job ${jobId}`)
try { try {
await processJobById(payload, jobId) await processJobById(payload, jobId)
logger.info(`Successfully processed email ${email.id} immediately`) logger.debug(`Successfully processed email ${email.id} immediately`)
} catch (error) { } catch (error) {
logger.error(`Failed to process email ${email.id} immediately:`, error) logger.error(`Failed to process email ${email.id} immediately:`, error)
throw new Error(`Failed to process email ${email.id} immediately: ${String(error)}`) throw new Error(`Failed to process email ${email.id} immediately: ${String(error)}`)

View File

@@ -37,16 +37,11 @@ export async function processEmailById(payload: Payload, emailId: string): Promi
* @returns Promise that resolves when job is processed * @returns Promise that resolves when job is processed
*/ */
export async function processJobById(payload: Payload, jobId: string): Promise<void> { export async function processJobById(payload: Payload, jobId: string): Promise<void> {
const logger = createContextLogger(payload, 'PROCESSOR')
logger.debug(`Starting processJobById for job ${jobId}`)
if (!payload.jobs) { if (!payload.jobs) {
throw new Error('PayloadCMS jobs not configured - cannot process job immediately') throw new Error('PayloadCMS jobs not configured - cannot process job immediately')
} }
try { try {
logger.debug(`Running job ${jobId} with payload.jobs.run()`)
// Run a specific job by its ID (using where clause to find the job) // Run a specific job by its ID (using where clause to find the job)
const result = await payload.jobs.run({ const result = await payload.jobs.run({
where: { where: {
@@ -55,9 +50,8 @@ export async function processJobById(payload: Payload, jobId: string): Promise<v
} }
} }
}) })
logger.info(`Job ${jobId} execution completed`, { result })
} catch (error) { } catch (error) {
const logger = createContextLogger(payload, 'PROCESSOR')
logger.error(`Job ${jobId} execution failed:`, error) logger.error(`Job ${jobId} execution failed:`, error)
throw new Error(`Failed to process job ${jobId}: ${String(error)}`) throw new Error(`Failed to process job ${jobId}: ${String(error)}`)
} }

View File

@@ -49,15 +49,12 @@ export async function ensureEmailJob(
const queueName = options?.queueName || mailingContext?.config?.queue || 'default' const queueName = options?.queueName || mailingContext?.config?.queue || 'default'
const logger = createContextLogger(payload, 'JOB_SCHEDULER') const logger = createContextLogger(payload, 'JOB_SCHEDULER')
logger.debug(`Ensuring job for email ${normalizedEmailId}`)
logger.debug(`Queue: ${queueName}, scheduledAt: ${options?.scheduledAt || 'immediate'}`)
// First, optimistically try to create the job // First, optimistically try to create the job
// If it fails due to uniqueness constraint, then check for existing jobs // If it fails due to uniqueness constraint, then check for existing jobs
// This approach minimizes the race condition window // This approach minimizes the race condition window
try { try {
logger.debug(`Attempting to create new job for email ${normalizedEmailId}`)
// Attempt to create job - rely on database constraints for duplicate prevention // Attempt to create job - rely on database constraints for duplicate prevention
const job = await payload.jobs.queue({ const job = await payload.jobs.queue({
queue: queueName, queue: queueName,
@@ -69,31 +66,22 @@ export async function ensureEmailJob(
}) })
logger.info(`Auto-scheduled processing job ${job.id} for email ${normalizedEmailId}`) logger.info(`Auto-scheduled processing job ${job.id} for email ${normalizedEmailId}`)
logger.debug(`Job details`, {
jobId: job.id,
emailId: normalizedEmailId,
scheduledAt: options?.scheduledAt || 'immediate',
task: 'process-email',
queue: queueName
})
return { return {
jobIds: [job.id], jobIds: [job.id],
created: true created: true
} }
} catch (createError) { } catch (createError) {
logger.warn(`Job creation failed for email ${normalizedEmailId}: ${String(createError)}`)
// Job creation failed - likely due to duplicate constraint or system issue // Job creation failed - likely due to duplicate constraint or system issue
// Check if duplicate jobs exist (handles race condition where another process created job) // Check if duplicate jobs exist (handles race condition where another process created job)
const existingJobs = await findExistingJobs(payload, normalizedEmailId) const existingJobs = await findExistingJobs(payload, normalizedEmailId)
logger.debug(`Found ${existingJobs.totalDocs} existing jobs after creation failure`)
if (existingJobs.totalDocs > 0) { if (existingJobs.totalDocs > 0) {
// Found existing jobs - return them (race condition handled successfully) // Found existing jobs - return them (race condition handled successfully)
logger.info(`Using existing jobs for email ${normalizedEmailId}: ${existingJobs.docs.map(j => j.id).join(', ')}`) logger.debug(`Using existing jobs for email ${normalizedEmailId}: ${existingJobs.docs.map(j => j.id).join(', ')}`)
return { return {
jobIds: existingJobs.docs.map(job => job.id), jobIds: existingJobs.docs.map(job => job.id),
created: false created: false
@@ -109,7 +97,7 @@ export async function ensureEmailJob(
if (isLikelyUniqueConstraint) { if (isLikelyUniqueConstraint) {
// This should not happen if our check above worked, but provide a clear error // This should not happen if our check above worked, but provide a clear error
logger.error(`Unique constraint violation but no existing jobs found for email ${normalizedEmailId}`) logger.warn(`Unique constraint violation but no existing jobs found for email ${normalizedEmailId}`)
throw new Error( throw new Error(
`Database uniqueness constraint violation for email ${normalizedEmailId}, but no existing jobs found. ` + `Database uniqueness constraint violation for email ${normalizedEmailId}, but no existing jobs found. ` +
`This indicates a potential data consistency issue. Original error: ${errorMessage}` `This indicates a potential data consistency issue. Original error: ${errorMessage}`
@@ -117,7 +105,7 @@ export async function ensureEmailJob(
} }
// Non-constraint related error // Non-constraint related error
logger.error(`Non-constraint job creation error for email ${normalizedEmailId}: ${errorMessage}`) logger.error(`Job creation error for email ${normalizedEmailId}: ${errorMessage}`)
throw new Error(`Failed to create job for email ${normalizedEmailId}: ${errorMessage}`) throw new Error(`Failed to create job for email ${normalizedEmailId}: ${errorMessage}`)
} }
} }