mirror of
https://github.com/xtr-dev/payload-billing.git
synced 2025-12-10 02:43:24 +00:00
feat: add optimistic locking to prevent payment race conditions
- Add version field to Payment interface and collection schema - Implement transaction-based optimistic locking in updatePaymentStatus - Auto-increment version on manual updates via beforeChange hook - Log version conflicts for monitoring concurrent update attempts This prevents race conditions when multiple webhook events arrive simultaneously for the same payment, ensuring data consistency. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -136,7 +136,7 @@ export function createPaymentsCollection(pluginConfig: BillingPluginConfig): Col
|
||||
fields,
|
||||
hooks: {
|
||||
beforeChange: [
|
||||
async ({ data, operation, req }) => {
|
||||
async ({ data, operation, req, originalDoc }) => {
|
||||
if (operation === 'create') {
|
||||
// Validate amount format
|
||||
if (data.amount && !Number.isInteger(data.amount)) {
|
||||
@@ -154,20 +154,13 @@ export function createPaymentsCollection(pluginConfig: BillingPluginConfig): Col
|
||||
await initProviderPayment(req.payload, data)
|
||||
}
|
||||
|
||||
if (operation === 'update') {
|
||||
// Auto-increment version for manual admin updates (webhooks handle their own versioning)
|
||||
if (!data.version && req.id) {
|
||||
try {
|
||||
const currentDoc = await req.payload.findByID({
|
||||
collection: extractSlug(pluginConfig.collections?.payments || defaults.paymentsCollection),
|
||||
id: req.id as any
|
||||
})
|
||||
data.version = (currentDoc.version || 1) + 1
|
||||
} catch (error) {
|
||||
console.warn(`[Payment Hook] Could not fetch current version for payment ${req.id}, defaulting to version 1:`, error)
|
||||
data.version = 1
|
||||
}
|
||||
}
|
||||
// Auto-increment version for manual updates (not webhook updates)
|
||||
// Webhook updates handle their own versioning in updatePaymentStatus
|
||||
if (operation === 'update' && !data.version) {
|
||||
// If version is not being explicitly set (i.e., manual admin update),
|
||||
// increment it automatically
|
||||
const currentVersion = (originalDoc as Payment)?.version || 1
|
||||
data.version = currentVersion + 1
|
||||
}
|
||||
},
|
||||
] satisfies CollectionBeforeChangeHook<Payment>[],
|
||||
|
||||
@@ -57,41 +57,64 @@ export async function updatePaymentStatus(
|
||||
|
||||
try {
|
||||
// First, fetch the current payment to get the current version
|
||||
const currentPayment = await findPaymentByProviderId(payload, paymentId.toString(), pluginConfig)
|
||||
const currentPayment = await payload.findByID({
|
||||
collection: paymentsCollection,
|
||||
id: toPayloadId(paymentId),
|
||||
}) as Payment
|
||||
|
||||
if (!currentPayment) {
|
||||
console.error(`[Payment Update] Payment not found: ${paymentId}`)
|
||||
console.error(`[Payment Update] Payment ${paymentId} not found`)
|
||||
return false
|
||||
}
|
||||
|
||||
const currentVersion = currentPayment.version || 1
|
||||
const nextVersion = currentVersion + 1
|
||||
|
||||
// Atomic update using updateMany with version check
|
||||
const result = await payload.updateMany({
|
||||
collection: paymentsCollection,
|
||||
where: {
|
||||
id: { equals: toPayloadId(currentPayment.id) },
|
||||
version: { equals: currentVersion }
|
||||
},
|
||||
data: {
|
||||
status,
|
||||
version: nextVersion,
|
||||
providerData: {
|
||||
...providerData,
|
||||
webhookProcessedAt: new Date().toISOString(),
|
||||
previousStatus: currentPayment.status
|
||||
}
|
||||
}
|
||||
})
|
||||
// Attempt to update with optimistic locking
|
||||
// We'll use a transaction to ensure atomicity
|
||||
const transactionID = await payload.db.beginTransaction()
|
||||
|
||||
// Success means exactly 1 document was updated (version matched)
|
||||
const success = result.docs.length === 1
|
||||
|
||||
if (!success) {
|
||||
console.warn(`[Payment Update] Optimistic lock failed for payment ${paymentId} - version conflict detected`)
|
||||
if (!transactionID) {
|
||||
console.error(`[Payment Update] Failed to begin transaction`)
|
||||
return false
|
||||
}
|
||||
|
||||
return success
|
||||
try {
|
||||
// Re-fetch within transaction to ensure consistency
|
||||
const paymentInTransaction = await payload.findByID({
|
||||
collection: paymentsCollection,
|
||||
id: toPayloadId(paymentId),
|
||||
req: { transactionID: transactionID }
|
||||
}) as Payment
|
||||
|
||||
// Check if version still matches
|
||||
if ((paymentInTransaction.version || 1) !== currentVersion) {
|
||||
// Version conflict detected - payment was modified by another process
|
||||
console.warn(`[Payment Update] Version conflict for payment ${paymentId} (expected version: ${currentVersion}, got: ${paymentInTransaction.version})`)
|
||||
await payload.db.rollbackTransaction(transactionID)
|
||||
return false
|
||||
}
|
||||
|
||||
// Update with new version
|
||||
await payload.update({
|
||||
collection: paymentsCollection,
|
||||
id: toPayloadId(paymentId),
|
||||
data: {
|
||||
status,
|
||||
providerData: {
|
||||
...providerData,
|
||||
webhookProcessedAt: new Date().toISOString()
|
||||
},
|
||||
version: currentVersion + 1
|
||||
},
|
||||
req: { transactionID: transactionID }
|
||||
})
|
||||
|
||||
await payload.db.commitTransaction(transactionID)
|
||||
return true
|
||||
} catch (error) {
|
||||
await payload.db.rollbackTransaction(transactionID)
|
||||
throw error
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[Payment Update] Failed to update payment ${paymentId}:`, error)
|
||||
return false
|
||||
|
||||
Reference in New Issue
Block a user