@@ -335,29 +335,39 @@ export async function syncSubscriptionUsageLimits(subscription: SubscriptionData
335335 // on the next subscription event for any previously-joined
336336 // members whose bytes weren't transferred. Safe to re-run: once
337337 // transferred the user row is 0, so subsequent passes no-op.
338+ //
339+ // Race note: concurrent `incrementStorageUsage` /
340+ // `decrementStorageUsage` on the same rows could otherwise slip
341+ // between our snapshot SELECT and the zeroing UPDATE, wiping
342+ // bytes or corrupting the aggregate. We open a transaction and
343+ // take row-level write locks via `SELECT ... FOR UPDATE` so
344+ // concurrent writes on these user rows block until we commit.
338345 if ( isPaid ( subscription . plan ) ) {
339346 try {
340347 const memberIds = members . map ( ( m ) => m . userId )
341- const personalStorageRows = await db
342- . select ( {
343- userId : userStats . userId ,
344- bytes : userStats . storageUsedBytes ,
345- } )
346- . from ( userStats )
347- . where ( inArray ( userStats . userId , memberIds ) )
348+ await db . transaction ( async ( tx ) => {
349+ const personalStorageRows = await tx
350+ . select ( {
351+ userId : userStats . userId ,
352+ bytes : userStats . storageUsedBytes ,
353+ } )
354+ . from ( userStats )
355+ . where ( inArray ( userStats . userId , memberIds ) )
356+ . for ( 'update' )
348357
349- const toTransfer = personalStorageRows . filter ( ( r ) => ( r . bytes ?? 0 ) > 0 )
350- const totalBytes = toTransfer . reduce ( ( acc , r ) => acc + ( r . bytes ?? 0 ) , 0 )
358+ const toTransfer = personalStorageRows . filter ( ( r ) => ( r . bytes ?? 0 ) > 0 )
359+ const totalBytes = toTransfer . reduce ( ( acc , r ) => acc + ( r . bytes ?? 0 ) , 0 )
351360
352- if ( totalBytes > 0 ) {
353- await db
361+ if ( totalBytes === 0 ) return
362+
363+ await tx
354364 . update ( organization )
355365 . set ( {
356366 storageUsedBytes : sql `${ organization . storageUsedBytes } + ${ totalBytes } ` ,
357367 } )
358368 . where ( eq ( organization . id , organizationId ) )
359369
360- await db
370+ await tx
361371 . update ( userStats )
362372 . set ( { storageUsedBytes : 0 } )
363373 . where (
@@ -373,7 +383,7 @@ export async function syncSubscriptionUsageLimits(subscription: SubscriptionData
373383 memberCount : toTransfer . length ,
374384 totalBytes,
375385 } )
376- }
386+ } )
377387 } catch ( storageError ) {
378388 logger . error ( 'Failed to transfer personal storage to org pool' , {
379389 organizationId,
0 commit comments