Skip to content

Commit e561c3b

Browse files
committed
outbox service
1 parent 62727f0 commit e561c3b

28 files changed

Lines changed: 16957 additions & 703 deletions

File tree

apps/sim/app/api/billing/route.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,20 @@ export async function GET(request: NextRequest) {
4545
let billingData
4646

4747
if (context === 'user') {
48-
// Get user billing and billing blocked status in parallel
48+
if (contextId) {
49+
const membership = await db
50+
.select({ role: member.role })
51+
.from(member)
52+
.where(and(eq(member.organizationId, contextId), eq(member.userId, session.user.id)))
53+
.limit(1)
54+
if (membership.length === 0) {
55+
return NextResponse.json(
56+
{ error: 'Access denied - not a member of this organization' },
57+
{ status: 403 }
58+
)
59+
}
60+
}
61+
4962
const [billingResult, billingStatus] = await Promise.all([
5063
getSimplifiedBillingSummary(session.user.id, contextId || undefined),
5164
getEffectiveBillingStatus(session.user.id),

apps/sim/app/api/organizations/[id]/invitations/[invitationId]/route.ts

Lines changed: 17 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import { getSession } from '@/lib/auth'
2323
import { hasAccessControlAccess } from '@/lib/billing'
2424
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
2525
import { isPaid, sqlIsPro } from '@/lib/billing/plan-helpers'
26-
import { requireStripeClient } from '@/lib/billing/stripe-client'
2726
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
27+
import { OUTBOX_EVENT_TYPES } from '@/lib/billing/webhooks/outbox-handlers'
28+
import { enqueueOutboxEvent } from '@/lib/core/outbox/service'
2829
import { getBaseUrl } from '@/lib/core/utils/urls'
2930
import { generateId } from '@/lib/core/utils/uuid'
3031
import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment'
@@ -328,8 +329,6 @@ export async function PUT(
328329
}
329330
}
330331

331-
let personalProToCancel: any = null
332-
333332
await db.transaction(async (tx) => {
334333
await tx.update(invitation).set({ status }).where(eq(invitation.id, invitationId))
335334

@@ -342,8 +341,7 @@ export async function PUT(
342341
createdAt: new Date(),
343342
})
344343

345-
// Snapshot Pro usage and cancel Pro subscription when joining a paid team
346-
try {
344+
{
347345
const orgSubs = await tx
348346
.select()
349347
.from(subscriptionTable)
@@ -393,8 +391,9 @@ export async function PUT(
393391
.update(userStats)
394392
.set({
395393
proPeriodCostSnapshot: currentProUsage,
396-
currentPeriodCost: '0', // Reset so new usage is attributed to team
397-
currentPeriodCopilotCost: '0', // Reset copilot cost for new period
394+
proPeriodCostSnapshotAt: new Date(),
395+
currentPeriodCost: '0',
396+
currentPeriodCopilotCost: '0',
398397
})
399398
.where(eq(userStats.userId, userId))
400399

@@ -405,20 +404,20 @@ export async function PUT(
405404
})
406405
}
407406

408-
// Mark for cancellation after transaction
409-
if (personalPro.cancelAtPeriodEnd !== true) {
410-
personalProToCancel = personalPro
407+
if (personalPro.cancelAtPeriodEnd !== true && personalPro.stripeSubscriptionId) {
408+
await tx
409+
.update(subscriptionTable)
410+
.set({ cancelAtPeriodEnd: true })
411+
.where(eq(subscriptionTable.id, personalPro.id))
412+
413+
await enqueueOutboxEvent(tx, OUTBOX_EVENT_TYPES.STRIPE_SYNC_CANCEL_AT_PERIOD_END, {
414+
stripeSubscriptionId: personalPro.stripeSubscriptionId,
415+
subscriptionId: personalPro.id,
416+
reason: 'member-joined-paid-org',
417+
})
411418
}
412419
}
413420

414-
// Transfer the joining user's pre-join storage bytes into
415-
// the org pool — after this point storage reads/writes route
416-
// through `organization.storageUsedBytes`, so bytes left on
417-
// `user_stats` would be orphaned (and a later decrement from
418-
// deleting a pre-join file would wrongly reduce the org
419-
// pool). `.for('update')` row-locks `user_stats` so a
420-
// concurrent increment/decrement can't land between the
421-
// SELECT and the zero UPDATE and get silently dropped.
422421
const storageRows = await tx
423422
.select({ storageUsedBytes: userStats.storageUsedBytes })
424423
.from(userStats)
@@ -447,13 +446,6 @@ export async function PUT(
447446
})
448447
}
449448
}
450-
} catch (error) {
451-
logger.error('Failed to handle Pro user joining team', {
452-
userId: session.user.id,
453-
organizationId,
454-
error,
455-
})
456-
// Don't fail the whole invitation acceptance due to this
457449
}
458450

459451
// Auto-assign to permission group if one has autoAddNewMembers enabled
@@ -593,44 +585,6 @@ export async function PUT(
593585
}
594586
}
595587

596-
// Handle Pro subscription cancellation after transaction commits
597-
if (personalProToCancel) {
598-
try {
599-
const stripe = requireStripeClient()
600-
if (personalProToCancel.stripeSubscriptionId) {
601-
try {
602-
await stripe.subscriptions.update(personalProToCancel.stripeSubscriptionId, {
603-
cancel_at_period_end: true,
604-
})
605-
} catch (stripeError) {
606-
logger.error('Failed to set cancel_at_period_end on Stripe for personal Pro', {
607-
userId: session.user.id,
608-
subscriptionId: personalProToCancel.id,
609-
stripeSubscriptionId: personalProToCancel.stripeSubscriptionId,
610-
error: stripeError,
611-
})
612-
}
613-
}
614-
615-
await db
616-
.update(subscriptionTable)
617-
.set({ cancelAtPeriodEnd: true })
618-
.where(eq(subscriptionTable.id, personalProToCancel.id))
619-
620-
logger.info('Auto-cancelled personal Pro at period end after joining paid team', {
621-
userId: session.user.id,
622-
personalSubscriptionId: personalProToCancel.id,
623-
organizationId,
624-
})
625-
} catch (dbError) {
626-
logger.error('Failed to update DB cancelAtPeriodEnd for personal Pro', {
627-
userId: session.user.id,
628-
subscriptionId: personalProToCancel.id,
629-
error: dbError,
630-
})
631-
}
632-
}
633-
634588
if (status === 'accepted') {
635589
try {
636590
await syncUsageLimitsFromSubscription(session.user.id)

apps/sim/app/api/organizations/[id]/members/route.ts

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import { db } from '@sim/db'
2-
import { invitation, member, organization, user, userStats } from '@sim/db/schema'
2+
import {
3+
invitation,
4+
member,
5+
organization,
6+
subscription as subscriptionTable,
7+
user,
8+
userStats,
9+
} from '@sim/db/schema'
310
import { createLogger } from '@sim/logger'
4-
import { and, eq } from 'drizzle-orm'
11+
import { and, eq, inArray } from 'drizzle-orm'
512
import { type NextRequest, NextResponse } from 'next/server'
613
import { getEmailSubject, renderInvitationEmail } from '@/components/emails'
714
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
815
import { getSession } from '@/lib/auth'
9-
import { getUserUsageData } from '@/lib/billing/core/usage'
16+
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
1017
import { validateSeatAvailability } from '@/lib/billing/validation/seat-management'
1118
import { getBaseUrl } from '@/lib/core/utils/urls'
1219
import { generateId } from '@/lib/core/utils/uuid'
@@ -83,16 +90,32 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
8390
.leftJoin(userStats, eq(user.id, userStats.userId))
8491
.where(eq(member.organizationId, organizationId))
8592

86-
const membersWithUsage = await Promise.all(
87-
base.map(async (row) => {
88-
const usage = await getUserUsageData(row.userId)
89-
return {
90-
...row,
91-
billingPeriodStart: usage.billingPeriodStart,
92-
billingPeriodEnd: usage.billingPeriodEnd,
93-
}
93+
// The billing period is the same for every member — it comes from
94+
// whichever subscription covers them. Fetch once and attach to
95+
// every row instead of calling `getUserUsageData` per-member,
96+
// which would run an O(N) pooled query for each of N rows.
97+
const [orgSub] = await db
98+
.select({
99+
periodStart: subscriptionTable.periodStart,
100+
periodEnd: subscriptionTable.periodEnd,
94101
})
95-
)
102+
.from(subscriptionTable)
103+
.where(
104+
and(
105+
eq(subscriptionTable.referenceId, organizationId),
106+
inArray(subscriptionTable.status, ENTITLED_SUBSCRIPTION_STATUSES)
107+
)
108+
)
109+
.limit(1)
110+
111+
const billingPeriodStart = orgSub?.periodStart ?? null
112+
const billingPeriodEnd = orgSub?.periodEnd ?? null
113+
114+
const membersWithUsage = base.map((row) => ({
115+
...row,
116+
billingPeriodStart,
117+
billingPeriodEnd,
118+
}))
96119

97120
return NextResponse.json({
98121
success: true,

apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import { member, organization, user, userStats } from '@sim/db/schema'
3333
import { createLogger } from '@sim/logger'
3434
import { count, eq } from 'drizzle-orm'
3535
import { addUserToOrganization } from '@/lib/billing/organizations/membership'
36-
import { requireStripeClient } from '@/lib/billing/stripe-client'
3736
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
3837
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
3938
import {
@@ -229,28 +228,6 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
229228
return badRequestResponse(result.error || 'Failed to add member')
230229
}
231230

232-
if (isBillingEnabled && result.billingActions.proSubscriptionToCancel?.stripeSubscriptionId) {
233-
try {
234-
const stripe = requireStripeClient()
235-
await stripe.subscriptions.update(
236-
result.billingActions.proSubscriptionToCancel.stripeSubscriptionId,
237-
{ cancel_at_period_end: true }
238-
)
239-
logger.info('Admin API: Synced Pro cancellation with Stripe', {
240-
userId: body.userId,
241-
subscriptionId: result.billingActions.proSubscriptionToCancel.subscriptionId,
242-
stripeSubscriptionId: result.billingActions.proSubscriptionToCancel.stripeSubscriptionId,
243-
})
244-
} catch (stripeError) {
245-
logger.error('Admin API: Failed to sync Pro cancellation with Stripe', {
246-
userId: body.userId,
247-
subscriptionId: result.billingActions.proSubscriptionToCancel.subscriptionId,
248-
stripeSubscriptionId: result.billingActions.proSubscriptionToCancel.stripeSubscriptionId,
249-
error: stripeError,
250-
})
251-
}
252-
}
253-
254231
const data: AdminMember = {
255232
id: result.memberId!,
256233
userId: body.userId,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { db } from '@sim/db'
2+
import { outboxEvent } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq } from 'drizzle-orm'
5+
import { NextResponse } from 'next/server'
6+
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
7+
8+
const logger = createLogger('AdminOutboxRequeueAPI')
9+
10+
export const dynamic = 'force-dynamic'
11+
12+
/**
13+
* POST /api/v1/admin/outbox/[id]/requeue
14+
*
15+
* Move a dead-lettered outbox event back to `pending` so the worker
16+
* will retry it. Resets `attempts`, `lastError`, and `availableAt` so
17+
* the next poll picks it up. Only dead-lettered events can be
18+
* requeued — completed/pending/processing rows are rejected to avoid
19+
* operator errors.
20+
*/
21+
export const POST = withAdminAuthParams<{ id: string }>(async (_request, { params }) => {
22+
const { id } = await params
23+
24+
try {
25+
const result = await db
26+
.update(outboxEvent)
27+
.set({
28+
status: 'pending',
29+
attempts: 0,
30+
lastError: null,
31+
availableAt: new Date(),
32+
lockedAt: null,
33+
processedAt: null,
34+
})
35+
.where(and(eq(outboxEvent.id, id), eq(outboxEvent.status, 'dead_letter')))
36+
.returning({ id: outboxEvent.id, eventType: outboxEvent.eventType })
37+
38+
if (result.length === 0) {
39+
return NextResponse.json(
40+
{
41+
success: false,
42+
error:
43+
'Event not found or not in dead_letter status. Only dead-lettered events can be requeued.',
44+
},
45+
{ status: 404 }
46+
)
47+
}
48+
49+
logger.info('Requeued dead-lettered outbox event', {
50+
eventId: result[0].id,
51+
eventType: result[0].eventType,
52+
})
53+
54+
return NextResponse.json({
55+
success: true,
56+
requeued: result[0],
57+
})
58+
} catch (error) {
59+
logger.error('Failed to requeue outbox event', {
60+
eventId: id,
61+
error: error instanceof Error ? error.message : error,
62+
})
63+
return NextResponse.json(
64+
{
65+
success: false,
66+
error: error instanceof Error ? error.message : 'Unknown error',
67+
},
68+
{ status: 500 }
69+
)
70+
}
71+
})

0 commit comments

Comments
 (0)