diff --git a/src/collections/Emails.ts b/src/collections/Emails.ts index 3b32d5e..1934cb0 100644 --- a/src/collections/Emails.ts +++ b/src/collections/Emails.ts @@ -1,4 +1,5 @@ import type { CollectionConfig } from 'payload' +import { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from '../utils/jobScheduler.js' const Emails: CollectionConfig = { slug: 'emails', @@ -201,19 +202,7 @@ const Emails: CollectionConfig = { // For updates where we need to check existing jobs, we need the document ID if (originalDoc?.id) { try { - // Check if a processing job already exists for this email - const existingJobs = await req.payload.find({ - collection: 'payload-jobs', - where: { - 'input.emailId': { - equals: String(originalDoc.id), - }, - task: { - equals: 'process-email', - }, - }, - limit: 10, - }) + const existingJobs = await findExistingJobs(req.payload, originalDoc.id) if (existingJobs.totalDocs > 0) { // Add existing jobs to the relationship @@ -245,48 +234,17 @@ const Emails: CollectionConfig = { } try { - // Check if a processing job already exists for this email - const existingJobs = await req.payload.find({ - collection: 'payload-jobs', - where: { - 'input.emailId': { - equals: String(doc.id), - }, - task: { - equals: 'process-email', - }, - }, - limit: 1, + // Ensure a job exists for this email (will check for existing ones first) + const result = await ensureEmailJob(req.payload, doc.id, { + scheduledAt: doc.scheduledAt, }) - // If no job exists, create one and add it to the relationship - if (existingJobs.totalDocs === 0) { - const mailingContext = (req.payload as any).mailing - const queueName = mailingContext?.config?.queue || 'default' - - const job = await req.payload.jobs.queue({ - queue: queueName, - task: 'process-email', - input: { - emailId: String(doc.id) - }, - // If scheduled, set the waitUntil date - waitUntil: doc.scheduledAt ? new Date(doc.scheduledAt) : undefined - }) - - // Update the email document to include the job in the relationship - await req.payload.update({ - collection: 'emails', - id: doc.id, - data: { - jobs: [...(doc.jobs || []), job.id] - } - }) - - console.log(`Auto-scheduled processing job ${job.id} for email ${doc.id}`) + // If a new job was created or we found existing jobs, update the relationship + if (result.jobIds.length > 0) { + await updateEmailJobRelationship(req.payload, doc.id, result.jobIds, 'emails') } } catch (error) { - console.error(`Failed to auto-schedule job for email ${doc.id}:`, error) + console.error(`Failed to ensure job for email ${doc.id}:`, error) // Don't throw - we don't want to fail the email creation/update } } diff --git a/src/index.ts b/src/index.ts index 07622b3..4de77dc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,4 +29,7 @@ export { } from './utils/helpers.js' // Email processing utilities -export { processEmailById, processAllEmails } from './utils/emailProcessor.js' \ No newline at end of file +export { processEmailById, processAllEmails } from './utils/emailProcessor.js' + +// Job scheduling utilities +export { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from './utils/jobScheduler.js' \ No newline at end of file diff --git a/src/sendEmail.ts b/src/sendEmail.ts index d28f164..5efb316 100644 --- a/src/sendEmail.ts +++ b/src/sendEmail.ts @@ -149,10 +149,6 @@ export const sendEmail = async setTimeout(resolve, 100)) - // Refetch the email to get the populated jobs relationship const emailWithJobs = await payload.findByID({ collection: collectionSlug, diff --git a/src/utils/jobScheduler.ts b/src/utils/jobScheduler.ts new file mode 100644 index 0000000..3eb4a85 --- /dev/null +++ b/src/utils/jobScheduler.ts @@ -0,0 +1,103 @@ +import type { Payload } from 'payload' + +/** + * Finds existing processing jobs for an email + */ +export async function findExistingJobs( + payload: Payload, + emailId: string | number +): Promise<{ docs: any[], totalDocs: number }> { + return await payload.find({ + collection: 'payload-jobs', + where: { + 'input.emailId': { + equals: String(emailId), + }, + task: { + equals: 'process-email', + }, + }, + limit: 10, + }) +} + +/** + * Ensures a processing job exists for an email + * Creates one if it doesn't exist, or returns existing job IDs + */ +export async function ensureEmailJob( + payload: Payload, + emailId: string | number, + options?: { + scheduledAt?: string | Date + queueName?: string + } +): Promise<{ jobIds: (string | number)[], created: boolean }> { + if (!payload.jobs) { + throw new Error('PayloadCMS jobs not configured - cannot create email job') + } + + // Check for existing jobs first + const existingJobs = await findExistingJobs(payload, emailId) + + if (existingJobs.totalDocs > 0) { + // Return existing job IDs + return { + jobIds: existingJobs.docs.map(job => job.id), + created: false + } + } + + // No existing job, create a new one + const mailingContext = (payload as any).mailing + const queueName = options?.queueName || mailingContext?.config?.queue || 'default' + + const job = await payload.jobs.queue({ + queue: queueName, + task: 'process-email', + input: { + emailId: String(emailId) + }, + // If scheduled, set the waitUntil date + waitUntil: options?.scheduledAt ? new Date(options.scheduledAt) : undefined + }) + + console.log(`Auto-scheduled processing job ${job.id} for email ${emailId}`) + + return { + jobIds: [job.id], + created: true + } +} + +/** + * Updates an email document to include job IDs in the relationship field + */ +export async function updateEmailJobRelationship( + payload: Payload, + emailId: string | number, + jobIds: (string | number)[], + collectionSlug: string = 'emails' +): Promise { + try { + // Get current jobs to avoid overwriting + const currentEmail = await payload.findByID({ + collection: collectionSlug, + id: emailId, + }) + + const currentJobs = currentEmail.jobs || [] + const allJobs = [...new Set([...currentJobs, ...jobIds])] // Deduplicate + + await payload.update({ + collection: collectionSlug, + id: emailId, + data: { + jobs: allJobs + } + }) + } catch (error) { + console.error(`Failed to update email ${emailId} with job relationship:`, error) + throw error + } +} \ No newline at end of file