Skip to content

Commit 4c66e1a

Browse files
committed
DRY
1 parent 91a420a commit 4c66e1a

5 files changed

Lines changed: 127 additions & 95 deletions

File tree

apps/sim/app/api/chat/[identifier]/route.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ describe('Chat Identifier API Route', () => {
337337
})
338338

339339
it('should return 503 when workflow is not available', async () => {
340-
// Mock preprocessing to return workflow not deployed error
341340
const { preprocessExecution } = await import('@/lib/execution/preprocessing')
342341
const originalImplementation = vi.mocked(preprocessExecution).getMockImplementation()
343342

apps/sim/app/api/webhooks/test/[id]/route.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
6767
return authError
6868
}
6969

70-
// Wrap preprocessing in try-catch to handle unexpected exceptions that might prevent
71-
// proper execution log completion (e.g., programming errors, out-of-memory, etc.)
7270
let preprocessError: NextResponse | null = null
7371
try {
7472
preprocessError = await checkWebhookPreprocessing(
@@ -81,15 +79,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
8179
return preprocessError
8280
}
8381
} catch (error) {
84-
// Handle unexpected preprocessing failures to ensure proper error response
8582
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
8683
error: error instanceof Error ? error.message : String(error),
8784
stack: error instanceof Error ? error.stack : undefined,
8885
webhookId: foundWebhook.id,
8986
workflowId: foundWorkflow.id,
9087
})
9188

92-
// Return provider-specific error response
9389
if (foundWebhook.provider === 'microsoft-teams') {
9490
return NextResponse.json(
9591
{

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ export async function POST(
123123
return authError
124124
}
125125

126-
// Wrap preprocessing in try-catch to handle unexpected exceptions that might prevent
127-
// proper execution log completion (e.g., programming errors, out-of-memory, etc.)
128126
let preprocessError: NextResponse | null = null
129127
try {
130128
preprocessError = await checkWebhookPreprocessing(
@@ -137,15 +135,13 @@ export async function POST(
137135
return preprocessError
138136
}
139137
} catch (error) {
140-
// Handle unexpected preprocessing failures to ensure proper error response
141138
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
142139
error: error instanceof Error ? error.message : String(error),
143140
stack: error instanceof Error ? error.stack : undefined,
144141
webhookId: foundWebhook.id,
145142
workflowId: foundWorkflow.id,
146143
})
147144

148-
// Return provider-specific error response
149145
if (foundWebhook.provider === 'microsoft-teams') {
150146
return NextResponse.json(
151147
{

apps/sim/background/schedule-execution.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -381,11 +381,8 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
381381
if (!preprocessResult.success) {
382382
const statusCode = preprocessResult.error?.statusCode || 500
383383

384-
// Handle specific error types with appropriate retry/disable logic
385384
switch (statusCode) {
386385
case 401: {
387-
// Authentication error - credentials invalid or expired
388-
// Don't retry as authentication issues require manual intervention
389386
logger.warn(
390387
`[${requestId}] Authentication error during preprocessing, disabling schedule`
391388
)
@@ -405,8 +402,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
405402
}
406403

407404
case 403: {
408-
// Authorization error - typically workflow not deployed
409-
// Don't retry as authorization issues (like undeployed workflows) require manual intervention
410405
logger.warn(
411406
`[${requestId}] Authorization error during preprocessing, disabling schedule: ${preprocessResult.error?.message}`
412407
)
@@ -426,7 +421,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
426421
}
427422

428423
case 404: {
429-
// Workflow not found - permanent error, disable schedule
430424
logger.warn(`[${requestId}] Workflow not found, disabling schedule`)
431425
await applyScheduleUpdate(
432426
payload.scheduleId,
@@ -443,7 +437,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
443437
}
444438

445439
case 429: {
446-
// Rate limit exceeded - temporary error, schedule retry after delay
447440
logger.warn(`[${requestId}] Rate limit exceeded, scheduling retry`)
448441
const retryDelay = 5 * 60 * 1000
449442
const nextRetryAt = new Date(now.getTime() + retryDelay)
@@ -462,7 +455,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
462455
}
463456

464457
case 402: {
465-
// Usage limit exceeded - schedule next run based on schedule configuration
466458
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
467459
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
468460
if (nextRunAt) {
@@ -481,7 +473,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
481473
}
482474

483475
default: {
484-
// Other errors (500, etc.) - track failures and disable after max consecutive failures
485476
logger.error(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`)
486477
const nextRunAt = await determineNextRunAfterError(payload, now, requestId)
487478
const newFailedCount = (payload.failedCount || 0) + 1

apps/sim/lib/execution/preprocessing.ts

Lines changed: 127 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,100 @@ import { RateLimiter } from '@/services/queue/RateLimiter'
1010

1111
const logger = createLogger('ExecutionPreprocessing')
1212

13+
const BILLING_ERROR_MESSAGES = {
14+
BILLING_REQUIRED:
15+
'Unable to resolve billing account. This workflow cannot execute without a valid billing account.',
16+
BILLING_ERROR_GENERIC: 'Error resolving billing account',
17+
} as const
18+
19+
/**
20+
* Attempts to resolve billing actor with fallback for resume contexts.
21+
* Returns the resolved actor user ID or null if resolution fails and should block execution.
22+
*
23+
* For resume contexts, this function allows fallback to the workflow owner if workspace
24+
* billing cannot be resolved, ensuring users can complete their paused workflows even
25+
* if billing configuration changes mid-execution.
26+
*
27+
* @returns Object containing actorUserId (null if should block) and shouldBlock flag
28+
*/
29+
async function resolveBillingActorWithFallback(params: {
30+
requestId: string
31+
workflowId: string
32+
workspaceId: string
33+
executionId: string
34+
triggerType: string
35+
workflowRecord: WorkflowRecord
36+
userId: string
37+
isResumeContext: boolean
38+
baseActorUserId: string | null
39+
failureReason: 'null' | 'error'
40+
error?: unknown
41+
loggingSession?: LoggingSession
42+
}): Promise<
43+
{ actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true }
44+
> {
45+
const {
46+
requestId,
47+
workflowId,
48+
workspaceId,
49+
executionId,
50+
triggerType,
51+
workflowRecord,
52+
userId,
53+
isResumeContext,
54+
baseActorUserId,
55+
failureReason,
56+
error,
57+
loggingSession,
58+
} = params
59+
60+
if (baseActorUserId) {
61+
return { actorUserId: baseActorUserId, shouldBlock: false }
62+
}
63+
64+
const workflowOwner = workflowRecord.userId?.trim()
65+
if (isResumeContext && workflowOwner) {
66+
const logMessage =
67+
failureReason === 'null'
68+
? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.'
69+
: '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.'
70+
71+
logger.warn(`[${requestId}] ${logMessage}`, {
72+
workflowId,
73+
workspaceId,
74+
fallbackUserId: workflowOwner,
75+
...(error ? { error } : {}),
76+
})
77+
78+
return { actorUserId: workflowOwner, shouldBlock: false }
79+
}
80+
81+
const fallbackUserId = workflowRecord.userId || userId || 'unknown'
82+
const errorMessage =
83+
failureReason === 'null'
84+
? BILLING_ERROR_MESSAGES.BILLING_REQUIRED
85+
: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC
86+
87+
logger.warn(`[${requestId}] ${errorMessage}`, {
88+
workflowId,
89+
workspaceId,
90+
...(error ? { error } : {}),
91+
})
92+
93+
await logPreprocessingError({
94+
workflowId,
95+
executionId,
96+
triggerType,
97+
requestId,
98+
userId: fallbackUserId,
99+
workspaceId,
100+
errorMessage,
101+
loggingSession,
102+
})
103+
104+
return { actorUserId: null, shouldBlock: true }
105+
}
106+
13107
export interface PreprocessExecutionOptions {
14108
// Required fields
15109
workflowId: string
@@ -84,7 +178,6 @@ export async function preprocessExecution(
84178
if (records.length === 0) {
85179
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
86180

87-
// Log error to database
88181
await logPreprocessingError({
89182
workflowId,
90183
executionId,
@@ -176,47 +269,21 @@ export async function preprocessExecution(
176269
}
177270

178271
if (!actorUserId) {
179-
// Special handling for resume context: allow fallback to workflow owner
180-
if (isResumeContext && workflowRecord.userId) {
181-
actorUserId = workflowRecord.userId
182-
logger.warn(
183-
`[${requestId}] Billing account resolution failed in resume context. Falling back to workflow owner.`,
184-
{
185-
workflowId,
186-
workspaceId,
187-
fallbackUserId: actorUserId,
188-
}
189-
)
190-
// Log warning but don't block execution
191-
await logPreprocessingError({
192-
workflowId,
193-
executionId,
194-
triggerType,
195-
requestId,
196-
userId: actorUserId,
197-
workspaceId,
198-
errorMessage:
199-
'Warning: Workspace billing account could not be resolved. Using workflow owner for billing. Please verify workspace billing settings.',
200-
loggingSession: providedLoggingSession,
201-
})
202-
} else {
203-
logger.warn(`[${requestId}] Unable to resolve billing account`, {
204-
workflowId,
205-
workspaceId,
206-
})
207-
208-
await logPreprocessingError({
209-
workflowId,
210-
executionId,
211-
triggerType,
212-
requestId,
213-
userId: workflowRecord.userId || userId || 'unknown',
214-
workspaceId,
215-
errorMessage:
216-
'Unable to resolve billing account. This workflow cannot execute without a valid billing account.',
217-
loggingSession: providedLoggingSession,
218-
})
272+
const result = await resolveBillingActorWithFallback({
273+
requestId,
274+
workflowId,
275+
workspaceId,
276+
executionId,
277+
triggerType,
278+
workflowRecord,
279+
userId,
280+
isResumeContext,
281+
baseActorUserId: actorUserId,
282+
failureReason: 'null',
283+
loggingSession: providedLoggingSession,
284+
})
219285

286+
if (result.shouldBlock) {
220287
return {
221288
success: false,
222289
error: {
@@ -226,47 +293,28 @@ export async function preprocessExecution(
226293
},
227294
}
228295
}
296+
297+
actorUserId = result.actorUserId
229298
}
230299
} catch (error) {
231300
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })
232301

233-
// Special handling for resume context: allow fallback on error
234-
if (isResumeContext && workflowRecord.userId) {
235-
actorUserId = workflowRecord.userId
236-
logger.warn(
237-
`[${requestId}] Billing account resolution error in resume context. Falling back to workflow owner.`,
238-
{
239-
workflowId,
240-
workspaceId,
241-
fallbackUserId: actorUserId,
242-
error,
243-
}
244-
)
245-
// Log warning but continue execution
246-
await logPreprocessingError({
247-
workflowId,
248-
executionId,
249-
triggerType,
250-
requestId,
251-
userId: actorUserId,
252-
workspaceId,
253-
errorMessage:
254-
'Warning: Error resolving workspace billing account. Using workflow owner for billing. Please verify workspace billing settings.',
255-
loggingSession: providedLoggingSession,
256-
})
257-
// Continue to next step
258-
} else {
259-
await logPreprocessingError({
260-
workflowId,
261-
executionId,
262-
triggerType,
263-
requestId,
264-
userId: workflowRecord.userId || userId || 'unknown',
265-
workspaceId,
266-
errorMessage: 'Error resolving billing account',
267-
loggingSession: providedLoggingSession,
268-
})
302+
const result = await resolveBillingActorWithFallback({
303+
requestId,
304+
workflowId,
305+
workspaceId,
306+
executionId,
307+
triggerType,
308+
workflowRecord,
309+
userId,
310+
isResumeContext,
311+
baseActorUserId: null,
312+
failureReason: 'error',
313+
error,
314+
loggingSession: providedLoggingSession,
315+
})
269316

317+
if (result.shouldBlock) {
270318
return {
271319
success: false,
272320
error: {
@@ -276,6 +324,8 @@ export async function preprocessExecution(
276324
},
277325
}
278326
}
327+
328+
actorUserId = result.actorUserId
279329
}
280330

281331
// ========== STEP 4: Get User Subscription ==========

0 commit comments

Comments
 (0)