Skip to content

Commit 85a8560

Browse files
committed
update all deployment versions dependent exections to use api key owner instead of workflow owner
1 parent 402d8c4 commit 85a8560

File tree

5 files changed

+121
-52
lines changed

5 files changed

+121
-52
lines changed

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { and, eq, lte, not, sql } from 'drizzle-orm'
44
import { NextResponse } from 'next/server'
55
import { v4 as uuidv4 } from 'uuid'
66
import { z } from 'zod'
7+
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
78
import { checkServerSideUsageLimits } from '@/lib/billing'
89
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
910
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
@@ -106,12 +107,22 @@ export async function GET() {
106107
continue
107108
}
108109

110+
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
111+
112+
if (!actorUserId) {
113+
logger.warn(
114+
`[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.`
115+
)
116+
runningExecutions.delete(schedule.workflowId)
117+
continue
118+
}
119+
109120
// Check rate limits for scheduled execution (checks both personal and org subscriptions)
110-
const userSubscription = await getHighestPrioritySubscription(workflowRecord.userId)
121+
const userSubscription = await getHighestPrioritySubscription(actorUserId)
111122

112123
const rateLimiter = new RateLimiter()
113124
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
114-
workflowRecord.userId,
125+
actorUserId,
115126
userSubscription,
116127
'schedule',
117128
false // schedules are always sync
@@ -149,7 +160,7 @@ export async function GET() {
149160
continue
150161
}
151162

152-
const usageCheck = await checkServerSideUsageLimits(workflowRecord.userId)
163+
const usageCheck = await checkServerSideUsageLimits(actorUserId)
153164
if (usageCheck.isExceeded) {
154165
logger.warn(
155166
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
@@ -159,26 +170,19 @@ export async function GET() {
159170
workflowId: schedule.workflowId,
160171
}
161172
)
162-
163-
// Error logging handled by logging session
164-
165-
const retryDelay = 24 * 60 * 60 * 1000 // 24 hour delay for exceeded limits
166-
const nextRetryAt = new Date(now.getTime() + retryDelay)
167-
168173
try {
174+
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
175+
const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
169176
await db
170177
.update(workflowSchedule)
171-
.set({
172-
updatedAt: now,
173-
nextRunAt: nextRetryAt,
174-
})
178+
.set({ updatedAt: now, nextRunAt })
175179
.where(eq(workflowSchedule.id, schedule.id))
176-
177-
logger.debug(`[${requestId}] Updated next retry time due to usage limits`)
178-
} catch (updateError) {
179-
logger.error(`[${requestId}] Error updating schedule for usage limits:`, updateError)
180+
} catch (calcErr) {
181+
logger.warn(
182+
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`,
183+
calcErr
184+
)
180185
}
181-
182186
runningExecutions.delete(schedule.workflowId)
183187
continue
184188
}
@@ -224,7 +228,7 @@ export async function GET() {
224228

225229
// Retrieve environment variables with workspace precedence
226230
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
227-
workflowRecord.userId,
231+
actorUserId,
228232
workflowRecord.workspaceId || undefined
229233
)
230234
const variables = EnvVarsSchema.parse({
@@ -376,7 +380,7 @@ export async function GET() {
376380

377381
// Start logging with environment variables
378382
await loggingSession.safeStart({
379-
userId: workflowRecord.userId,
383+
userId: actorUserId,
380384
workspaceId: workflowRecord.workspaceId || '',
381385
variables: variables || {},
382386
})
@@ -420,7 +424,7 @@ export async function GET() {
420424
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
421425
lastActive: now,
422426
})
423-
.where(eq(userStats.userId, workflowRecord.userId))
427+
.where(eq(userStats.userId, actorUserId))
424428

425429
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
426430
} catch (statsError) {

apps/sim/app/api/workflows/[id]/deploy/route.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
293293
}
294294
}
295295

296+
// Attribution: this route is UI-only; require session user as actor
297+
const actorUserId: string | null = session?.user?.id ?? null
298+
if (!actorUserId) {
299+
logger.warn(`[${requestId}] Unable to resolve actor user for workflow deployment: ${id}`)
300+
return createErrorResponse('Unable to determine deploying user', 400)
301+
}
302+
296303
await db.transaction(async (tx) => {
297304
const [{ maxVersion }] = await tx
298305
.select({ maxVersion: sql`COALESCE(MAX("version"), 0)` })
@@ -318,7 +325,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
318325
state: currentState,
319326
isActive: true,
320327
createdAt: deployedAt,
321-
createdBy: userId,
328+
createdBy: actorUserId,
322329
})
323330

324331
const updateData: Record<string, unknown> = {

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { eq, sql } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { v4 as uuidv4 } from 'uuid'
77
import { z } from 'zod'
8+
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
89
import { getSession } from '@/lib/auth'
910
import { checkServerSideUsageLimits } from '@/lib/billing'
1011
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
@@ -66,8 +67,8 @@ class UsageLimitError extends Error {
6667
async function executeWorkflow(
6768
workflow: any,
6869
requestId: string,
69-
input?: any,
70-
executingUserId?: string
70+
input: any | undefined,
71+
actorUserId: string
7172
): Promise<any> {
7273
const workflowId = workflow.id
7374
const executionId = uuidv4()
@@ -86,8 +87,8 @@ async function executeWorkflow(
8687

8788
// Rate limiting is now handled before entering the sync queue
8889

89-
// Check if the user has exceeded their usage limits
90-
const usageCheck = await checkServerSideUsageLimits(workflow.userId)
90+
// Check if the actor has exceeded their usage limits
91+
const usageCheck = await checkServerSideUsageLimits(actorUserId)
9192
if (usageCheck.isExceeded) {
9293
logger.warn(`[${requestId}] User ${workflow.userId} has exceeded usage limits`, {
9394
currentUsage: usageCheck.currentUsage,
@@ -133,13 +134,13 @@ async function executeWorkflow(
133134

134135
// Load personal (for the executing user) and workspace env (workspace overrides personal)
135136
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
136-
executingUserId || workflow.userId,
137+
actorUserId,
137138
workflow.workspaceId || undefined
138139
)
139140
const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted })
140141

141142
await loggingSession.safeStart({
142-
userId: executingUserId || workflow.userId,
143+
userId: actorUserId,
143144
workspaceId: workflow.workspaceId,
144145
variables,
145146
})
@@ -341,7 +342,7 @@ async function executeWorkflow(
341342
totalApiCalls: sql`total_api_calls + 1`,
342343
lastActive: sql`now()`,
343344
})
344-
.where(eq(userStats.userId, workflow.userId))
345+
.where(eq(userStats.userId, actorUserId))
345346
}
346347

347348
await loggingSession.safeComplete({
@@ -405,19 +406,30 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
405406

406407
// Synchronous execution
407408
try {
408-
// Check rate limits BEFORE entering queue for GET requests
409-
if (triggerType === 'api') {
410-
// Get user subscription (checks both personal and org subscriptions)
411-
const userSubscription = await getHighestPrioritySubscription(validation.workflow.userId)
409+
// Resolve actor user id
410+
let actorUserId: string | null = null
411+
if (triggerType === 'manual') {
412+
actorUserId = session!.user!.id
413+
} else {
414+
const apiKeyHeader = request.headers.get('X-API-Key')
415+
const auth = apiKeyHeader ? await authenticateApiKeyFromHeader(apiKeyHeader) : null
416+
if (!auth?.success || !auth.userId) {
417+
return createErrorResponse('Unauthorized', 401)
418+
}
419+
actorUserId = auth.userId
420+
if (auth.keyId) {
421+
void updateApiKeyLastUsed(auth.keyId).catch(() => {})
422+
}
412423

424+
// Check rate limits BEFORE entering execution for API requests
425+
const userSubscription = await getHighestPrioritySubscription(actorUserId)
413426
const rateLimiter = new RateLimiter()
414427
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
415-
validation.workflow.userId,
428+
actorUserId,
416429
userSubscription,
417-
triggerType,
418-
false // isAsync = false for sync calls
430+
'api',
431+
false
419432
)
420-
421433
if (!rateLimitCheck.allowed) {
422434
throw new RateLimitError(
423435
`Rate limit exceeded. You have ${rateLimitCheck.remaining} requests remaining. Resets at ${rateLimitCheck.resetAt.toISOString()}`
@@ -429,8 +441,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
429441
validation.workflow,
430442
requestId,
431443
undefined,
432-
// Executing user (manual run): if session present, use that user for fallback
433-
(await getSession())?.user?.id || undefined
444+
actorUserId as string
434445
)
435446

436447
// Check if the workflow execution contains a response block output
@@ -517,14 +528,19 @@ export async function POST(
517528
let triggerType: TriggerType = 'manual'
518529

519530
const session = await getSession()
520-
if (session?.user?.id) {
531+
const apiKeyHeader = request.headers.get('X-API-Key')
532+
if (session?.user?.id && !apiKeyHeader) {
521533
authenticatedUserId = session.user.id
522-
triggerType = 'manual' // UI session (not rate limited)
523-
} else {
524-
const apiKeyHeader = request.headers.get('X-API-Key')
525-
if (apiKeyHeader) {
526-
authenticatedUserId = validation.workflow.userId
527-
triggerType = 'api'
534+
triggerType = 'manual'
535+
} else if (apiKeyHeader) {
536+
const auth = await authenticateApiKeyFromHeader(apiKeyHeader)
537+
if (!auth.success || !auth.userId) {
538+
return createErrorResponse('Unauthorized', 401)
539+
}
540+
authenticatedUserId = auth.userId
541+
triggerType = 'api'
542+
if (auth.keyId) {
543+
void updateApiKeyLastUsed(auth.keyId).catch(() => {})
528544
}
529545
}
530546

apps/sim/lib/api-key/service.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,27 @@ export async function updateApiKeyLastUsed(keyId: string): Promise<void> {
125125
}
126126
}
127127

128+
/**
129+
* Given a pinned API key ID, resolve the owning userId (actor).
130+
* Returns null if not found.
131+
*/
132+
export async function getApiKeyOwnerUserId(
133+
pinnedApiKeyId: string | null | undefined
134+
): Promise<string | null> {
135+
if (!pinnedApiKeyId) return null
136+
try {
137+
const rows = await db
138+
.select({ userId: apiKeyTable.userId })
139+
.from(apiKeyTable)
140+
.where(eq(apiKeyTable.id, pinnedApiKeyId))
141+
.limit(1)
142+
return rows[0]?.userId ?? null
143+
} catch (error) {
144+
logger.error('Error resolving API key owner', { error, pinnedApiKeyId })
145+
return null
146+
}
147+
}
148+
128149
/**
129150
* Get the API encryption key from the environment
130151
* @returns The API encryption key

apps/sim/lib/webhooks/processor.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db, webhook, workflow } from '@sim/db'
22
import { tasks } from '@trigger.dev/sdk'
33
import { and, eq } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
5+
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
56
import { checkServerSideUsageLimits } from '@/lib/billing'
67
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
78
import { env, isTruthy } from '@/lib/env'
@@ -268,18 +269,25 @@ export async function checkRateLimits(
268269
requestId: string
269270
): Promise<NextResponse | null> {
270271
try {
271-
const userSubscription = await getHighestPrioritySubscription(foundWorkflow.userId)
272+
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
273+
274+
if (!actorUserId) {
275+
logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`)
276+
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
277+
}
278+
279+
const userSubscription = await getHighestPrioritySubscription(actorUserId)
272280

273281
const rateLimiter = new RateLimiter()
274282
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
275-
foundWorkflow.userId,
283+
actorUserId,
276284
userSubscription,
277285
'webhook',
278286
true
279287
)
280288

281289
if (!rateLimitCheck.allowed) {
282-
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, {
290+
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${actorUserId}`, {
283291
provider: foundWebhook.provider,
284292
remaining: rateLimitCheck.remaining,
285293
resetAt: rateLimitCheck.resetAt,
@@ -319,10 +327,17 @@ export async function checkUsageLimits(
319327
}
320328

321329
try {
322-
const usageCheck = await checkServerSideUsageLimits(foundWorkflow.userId)
330+
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
331+
332+
if (!actorUserId) {
333+
logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`)
334+
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
335+
}
336+
337+
const usageCheck = await checkServerSideUsageLimits(actorUserId)
323338
if (usageCheck.isExceeded) {
324339
logger.warn(
325-
`[${requestId}] User ${foundWorkflow.userId} has exceeded usage limits. Skipping webhook execution.`,
340+
`[${requestId}] User ${actorUserId} has exceeded usage limits. Skipping webhook execution.`,
326341
{
327342
currentUsage: usageCheck.currentUsage,
328343
limit: usageCheck.limit,
@@ -361,10 +376,16 @@ export async function queueWebhookExecution(
361376
options: WebhookProcessorOptions
362377
): Promise<NextResponse> {
363378
try {
379+
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
380+
if (!actorUserId) {
381+
logger.warn(`[${options.requestId}] Webhook requires pinned API key to attribute usage`)
382+
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
383+
}
384+
364385
const payload = {
365386
webhookId: foundWebhook.id,
366387
workflowId: foundWorkflow.id,
367-
userId: foundWorkflow.userId,
388+
userId: actorUserId,
368389
provider: foundWebhook.provider,
369390
body,
370391
headers: Object.fromEntries(request.headers.entries()),

0 commit comments

Comments
 (0)