Extract job scheduling logic into dedicated utility functions

♻️ Refactoring:
- Created new jobScheduler.ts utility module
- Extracted findExistingJobs() for duplicate detection
- Extracted ensureEmailJob() for job creation with duplicate prevention
- Extracted updateEmailJobRelationship() for relationship management

📦 Functions:
- findExistingJobs(): Queries for existing processing jobs by email ID
- ensureEmailJob(): Creates job only if none exists, returns job IDs
- updateEmailJobRelationship(): Updates email with job relationship

🎯 Benefits:
- Reusable functions for job management
- Single source of truth for job scheduling logic
- Cleaner, more testable code
- Exported utilities for external use
- Better separation of concerns

🔧 Updated:
- Emails collection hooks now use extracted functions
- Exports added to main index for public API
- Cleaner hook implementation with less duplication
This commit is contained in:
2025-09-14 21:14:02 +02:00
parent 6f3d0f56c5
commit 640ea0818d
4 changed files with 116 additions and 56 deletions

View File

@@ -1,4 +1,5 @@
import type { CollectionConfig } from 'payload' import type { CollectionConfig } from 'payload'
import { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from '../utils/jobScheduler.js'
const Emails: CollectionConfig = { const Emails: CollectionConfig = {
slug: 'emails', slug: 'emails',
@@ -201,19 +202,7 @@ const Emails: CollectionConfig = {
// For updates where we need to check existing jobs, we need the document ID // For updates where we need to check existing jobs, we need the document ID
if (originalDoc?.id) { if (originalDoc?.id) {
try { try {
// Check if a processing job already exists for this email const existingJobs = await findExistingJobs(req.payload, originalDoc.id)
const existingJobs = await req.payload.find({
collection: 'payload-jobs',
where: {
'input.emailId': {
equals: String(originalDoc.id),
},
task: {
equals: 'process-email',
},
},
limit: 10,
})
if (existingJobs.totalDocs > 0) { if (existingJobs.totalDocs > 0) {
// Add existing jobs to the relationship // Add existing jobs to the relationship
@@ -245,48 +234,17 @@ const Emails: CollectionConfig = {
} }
try { try {
// Check if a processing job already exists for this email // Ensure a job exists for this email (will check for existing ones first)
const existingJobs = await req.payload.find({ const result = await ensureEmailJob(req.payload, doc.id, {
collection: 'payload-jobs', scheduledAt: doc.scheduledAt,
where: {
'input.emailId': {
equals: String(doc.id),
},
task: {
equals: 'process-email',
},
},
limit: 1,
}) })
// If no job exists, create one and add it to the relationship // If a new job was created or we found existing jobs, update the relationship
if (existingJobs.totalDocs === 0) { if (result.jobIds.length > 0) {
const mailingContext = (req.payload as any).mailing await updateEmailJobRelationship(req.payload, doc.id, result.jobIds, 'emails')
const queueName = mailingContext?.config?.queue || 'default'
const job = await req.payload.jobs.queue({
queue: queueName,
task: 'process-email',
input: {
emailId: String(doc.id)
},
// If scheduled, set the waitUntil date
waitUntil: doc.scheduledAt ? new Date(doc.scheduledAt) : undefined
})
// Update the email document to include the job in the relationship
await req.payload.update({
collection: 'emails',
id: doc.id,
data: {
jobs: [...(doc.jobs || []), job.id]
}
})
console.log(`Auto-scheduled processing job ${job.id} for email ${doc.id}`)
} }
} catch (error) { } catch (error) {
console.error(`Failed to auto-schedule job for email ${doc.id}:`, error) console.error(`Failed to ensure job for email ${doc.id}:`, error)
// Don't throw - we don't want to fail the email creation/update // Don't throw - we don't want to fail the email creation/update
} }
} }

View File

@@ -30,3 +30,6 @@ export {
// Email processing utilities // Email processing utilities
export { processEmailById, processAllEmails } from './utils/emailProcessor.js' export { processEmailById, processAllEmails } from './utils/emailProcessor.js'
// Job scheduling utilities
export { findExistingJobs, ensureEmailJob, updateEmailJobRelationship } from './utils/jobScheduler.js'

View File

@@ -149,10 +149,6 @@ export const sendEmail = async <TEmail extends BaseEmailDocument = BaseEmailDocu
throw new Error('PayloadCMS jobs not configured - cannot process email immediately') throw new Error('PayloadCMS jobs not configured - cannot process email immediately')
} }
// Wait a bit for hooks to complete and populate the job relationship
// This is necessary because hooks might run asynchronously
await new Promise(resolve => setTimeout(resolve, 100))
// Refetch the email to get the populated jobs relationship // Refetch the email to get the populated jobs relationship
const emailWithJobs = await payload.findByID({ const emailWithJobs = await payload.findByID({
collection: collectionSlug, collection: collectionSlug,

103
src/utils/jobScheduler.ts Normal file
View File

@@ -0,0 +1,103 @@
import type { Payload } from 'payload'
/**
* Finds existing processing jobs for an email
*/
export async function findExistingJobs(
payload: Payload,
emailId: string | number
): Promise<{ docs: any[], totalDocs: number }> {
return await payload.find({
collection: 'payload-jobs',
where: {
'input.emailId': {
equals: String(emailId),
},
task: {
equals: 'process-email',
},
},
limit: 10,
})
}
/**
* Ensures a processing job exists for an email
* Creates one if it doesn't exist, or returns existing job IDs
*/
export async function ensureEmailJob(
payload: Payload,
emailId: string | number,
options?: {
scheduledAt?: string | Date
queueName?: string
}
): Promise<{ jobIds: (string | number)[], created: boolean }> {
if (!payload.jobs) {
throw new Error('PayloadCMS jobs not configured - cannot create email job')
}
// Check for existing jobs first
const existingJobs = await findExistingJobs(payload, emailId)
if (existingJobs.totalDocs > 0) {
// Return existing job IDs
return {
jobIds: existingJobs.docs.map(job => job.id),
created: false
}
}
// No existing job, create a new one
const mailingContext = (payload as any).mailing
const queueName = options?.queueName || mailingContext?.config?.queue || 'default'
const job = await payload.jobs.queue({
queue: queueName,
task: 'process-email',
input: {
emailId: String(emailId)
},
// If scheduled, set the waitUntil date
waitUntil: options?.scheduledAt ? new Date(options.scheduledAt) : undefined
})
console.log(`Auto-scheduled processing job ${job.id} for email ${emailId}`)
return {
jobIds: [job.id],
created: true
}
}
/**
* Updates an email document to include job IDs in the relationship field
*/
export async function updateEmailJobRelationship(
payload: Payload,
emailId: string | number,
jobIds: (string | number)[],
collectionSlug: string = 'emails'
): Promise<void> {
try {
// Get current jobs to avoid overwriting
const currentEmail = await payload.findByID({
collection: collectionSlug,
id: emailId,
})
const currentJobs = currentEmail.jobs || []
const allJobs = [...new Set([...currentJobs, ...jobIds])] // Deduplicate
await payload.update({
collection: collectionSlug,
id: emailId,
data: {
jobs: allJobs
}
})
} catch (error) {
console.error(`Failed to update email ${emailId} with job relationship:`, error)
throw error
}
}