Skip to content

Commit 637b2dd

Browse files
committed
feat(nested-workflow-logs): nested workflow logs display
1 parent 86ed32e commit 637b2dd

14 files changed

Lines changed: 767 additions & 362 deletions

File tree

apps/sim/app/api/chat/utils.ts

Lines changed: 50 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { db } from '@sim/db'
2-
import { chat, userStats, workflow } from '@sim/db/schema'
3-
import { eq, sql } from 'drizzle-orm'
2+
import { chat, workflow } from '@sim/db/schema'
3+
import { eq } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
55
import { v4 as uuidv4 } from 'uuid'
66
import { checkServerSideUsageLimits } from '@/lib/billing'
@@ -10,13 +10,12 @@ import { createLogger } from '@/lib/logs/console/logger'
1010
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1111
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
1212
import { hasAdminPermission } from '@/lib/permissions/utils'
13-
import { processStreamingBlockLogs } from '@/lib/tokenization'
1413
import { decryptSecret, generateRequestId } from '@/lib/utils'
1514
import { TriggerUtils } from '@/lib/workflows/triggers'
1615
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
1716
import { getBlock } from '@/blocks'
1817
import { Executor } from '@/executor'
19-
import type { BlockLog, ExecutionResult } from '@/executor/types'
18+
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
2019
import { Serializer } from '@/serializer'
2120
import { mergeSubblockState } from '@/stores/workflows/server-utils'
2221
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -548,6 +547,7 @@ export async function executeWorkflowForChat(
548547
const stream = new ReadableStream({
549548
async start(controller) {
550549
const encoder = new TextEncoder()
550+
let executionResultForLogging: ExecutionResult | null = null
551551

552552
try {
553553
const streamedContent = new Map<string, string>()
@@ -603,6 +603,7 @@ export async function executeWorkflowForChat(
603603
endedAt: new Date().toISOString(),
604604
totalDurationMs: 0,
605605
error: { message: errorMessage },
606+
traceSpans: [],
606607
})
607608
sessionCompleted = true
608609
}
@@ -644,16 +645,24 @@ export async function executeWorkflowForChat(
644645
// Set up logging on the executor
645646
loggingSession.setupExecutor(executor)
646647

647-
let result
648+
let result: ExecutionResult | StreamingExecution | undefined
648649
try {
649650
result = await executor.execute(workflowId, startBlockId)
650651
} catch (error: any) {
651652
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
652653
if (!sessionCompleted) {
654+
const executionResult = error?.executionResult || {
655+
success: false,
656+
output: {},
657+
logs: [],
658+
}
659+
const { traceSpans } = buildTraceSpans(executionResult)
660+
653661
await loggingSession.safeCompleteWithError({
654662
endedAt: new Date().toISOString(),
655663
totalDurationMs: 0,
656664
error: { message: error.message || 'Chat workflow execution failed' },
665+
traceSpans,
657666
})
658667
sessionCompleted = true
659668
}
@@ -677,186 +686,65 @@ export async function executeWorkflowForChat(
677686
? (result.execution as ExecutionResult)
678687
: (result as ExecutionResult)
679688

680-
if (executionResult?.logs) {
681-
// Update streamed content and apply tokenization - process regardless of overall success
682-
// This ensures partial successes (some agents succeed, some fail) still return results
683-
684-
// Add newlines between different agent outputs for better readability
685-
const processedOutputs = new Set<string>()
686-
executionResult.logs.forEach((log: BlockLog) => {
687-
if (streamedContent.has(log.blockId)) {
688-
const content = streamedContent.get(log.blockId)
689-
if (log.output && content) {
690-
// Add newline separation between different outputs (but not before the first one)
691-
const separator = processedOutputs.size > 0 ? '\n\n' : ''
692-
log.output.content = separator + content
693-
processedOutputs.add(log.blockId)
694-
}
695-
}
696-
})
697-
698-
// Also process non-streamed outputs from selected blocks (like function blocks)
699-
// This uses the same logic as the chat panel to ensure identical behavior
700-
const nonStreamingLogs = executionResult.logs.filter(
701-
(log: BlockLog) => !streamedContent.has(log.blockId)
702-
)
703-
704-
// Extract the exact same functions used by the chat panel
705-
const extractBlockIdFromOutputId = (outputId: string): string => {
706-
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
707-
}
708-
709-
const extractPathFromOutputId = (outputId: string, blockId: string): string => {
710-
return outputId.substring(blockId.length + 1)
711-
}
689+
executionResultForLogging = executionResult
712690

713-
const parseOutputContentSafely = (output: any): any => {
714-
if (!output?.content) {
715-
return output
716-
}
691+
logger.info(`[${requestId}] Chat workflow execution completed:`, {
692+
success: executionResult.success,
693+
executionTime: executionResult.metadata?.duration,
694+
})
717695

718-
if (typeof output.content === 'string') {
719-
try {
720-
return JSON.parse(output.content)
721-
} catch (e) {
722-
// Fallback to original structure if parsing fails
723-
return output
724-
}
725-
}
696+
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
726697

727-
return output
728-
}
698+
await loggingSession.safeComplete({
699+
endedAt: new Date().toISOString(),
700+
totalDurationMs: totalDuration || 0,
701+
finalOutput: executionResult.output || {},
702+
traceSpans: (traceSpans || []) as any,
703+
})
729704

730-
// Filter outputs that have matching logs (exactly like chat panel)
731-
const outputsToRender = selectedOutputIds.filter((outputId) => {
732-
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
733-
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
734-
})
705+
sessionCompleted = true
735706

736-
// Process each selected output (exactly like chat panel)
737-
for (const outputId of outputsToRender) {
738-
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
739-
const path = extractPathFromOutputId(outputId, blockIdForOutput)
740-
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
741-
742-
if (log) {
743-
let outputValue: any = log.output
744-
745-
if (path) {
746-
// Parse JSON content safely (exactly like chat panel)
747-
outputValue = parseOutputContentSafely(outputValue)
748-
749-
const pathParts = path.split('.')
750-
for (const part of pathParts) {
751-
if (outputValue && typeof outputValue === 'object' && part in outputValue) {
752-
outputValue = outputValue[part]
753-
} else {
754-
outputValue = undefined
755-
break
756-
}
757-
}
758-
}
707+
controller.enqueue(
708+
encoder.encode(
709+
`data: ${JSON.stringify({
710+
event: 'complete',
711+
success: executionResult.success,
712+
output: executionResult.output,
713+
})}\n\n`
714+
)
715+
)
759716

760-
if (outputValue !== undefined) {
761-
// Add newline separation between different outputs
762-
const separator = processedOutputs.size > 0 ? '\n\n' : ''
763-
764-
// Format the output exactly like the chat panel
765-
const formattedOutput =
766-
typeof outputValue === 'string'
767-
? outputValue
768-
: JSON.stringify(outputValue, null, 2)
769-
770-
// Update the log content
771-
if (!log.output.content) {
772-
log.output.content = separator + formattedOutput
773-
} else {
774-
log.output.content = separator + formattedOutput
775-
}
776-
processedOutputs.add(log.blockId)
777-
}
778-
}
779-
}
717+
controller.close()
718+
} catch (error: any) {
719+
logger.error(`[${requestId}] Chat execution streaming error:`, error)
780720

781-
// Process all logs for streaming tokenization
782-
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
783-
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
784-
785-
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
786-
const enrichedResult = { ...executionResult, traceSpans, totalDuration }
787-
if (conversationId) {
788-
if (!enrichedResult.metadata) {
789-
enrichedResult.metadata = {
790-
duration: totalDuration,
791-
startTime: new Date().toISOString(),
792-
}
793-
}
794-
;(enrichedResult.metadata as any).conversationId = conversationId
795-
}
796-
// Use the executionId created at the beginning of this function
797-
logger.debug(`Using execution ID for deployed chat: ${executionId}`)
798-
799-
if (executionResult.success) {
800-
try {
801-
await db
802-
.update(userStats)
803-
.set({
804-
totalChatExecutions: sql`total_chat_executions + 1`,
805-
lastActive: new Date(),
806-
})
807-
.where(eq(userStats.userId, deployment.userId))
808-
logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`)
809-
} catch (error) {
810-
logger.error(`Failed to update user stats for deployed chat:`, error)
721+
if (!sessionCompleted && loggingSession) {
722+
const executionResult = executionResultForLogging ||
723+
(error?.executionResult as ExecutionResult | undefined) || {
724+
success: false,
725+
output: {},
726+
logs: [],
811727
}
812-
}
813-
}
728+
const { traceSpans } = buildTraceSpans(executionResult)
814729

815-
if (!(result && typeof result === 'object' && 'stream' in result)) {
816-
controller.enqueue(
817-
encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`)
818-
)
819-
}
820-
821-
if (!sessionCompleted) {
822-
const resultForTracing =
823-
executionResult || ({ success: true, output: {}, logs: [] } as ExecutionResult)
824-
const { traceSpans } = buildTraceSpans(resultForTracing)
825-
await loggingSession.safeComplete({
730+
await loggingSession.safeCompleteWithError({
826731
endedAt: new Date().toISOString(),
827-
totalDurationMs: executionResult?.metadata?.duration || 0,
828-
finalOutput: executionResult?.output || {},
732+
totalDurationMs: 0,
733+
error: { message: error.message || 'Stream processing error' },
829734
traceSpans,
830735
})
831736
sessionCompleted = true
832737
}
833738

834-
controller.close()
835-
} catch (error: any) {
836-
// Handle any errors that occur in the stream
837-
logger.error(`[${requestId}] Stream error:`, error)
838-
839-
// Send error event to client
840-
const encoder = new TextEncoder()
841739
controller.enqueue(
842740
encoder.encode(
843741
`data: ${JSON.stringify({
844742
event: 'error',
845-
error: error.message || 'An unexpected error occurred',
743+
error: error.message || 'Stream processing error',
846744
})}\n\n`
847745
)
848746
)
849747

850-
// Try to complete the logging session with error if not already completed
851-
if (!sessionCompleted && loggingSession) {
852-
await loggingSession.safeCompleteWithError({
853-
endedAt: new Date().toISOString(),
854-
totalDurationMs: 0,
855-
error: { message: error.message || 'Stream processing error' },
856-
})
857-
sessionCompleted = true
858-
}
859-
860748
controller.close()
861749
}
862750
},

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ export async function GET() {
446446
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
447447
stackTrace: earlyError.stack,
448448
},
449+
traceSpans: [],
449450
})
450451
} catch (loggingError) {
451452
logger.error(
@@ -565,6 +566,7 @@ export async function GET() {
565566
message: `Schedule execution failed: ${error.message}`,
566567
stackTrace: error.stack,
567568
},
569+
traceSpans: [],
568570
})
569571
} catch (loggingError) {
570572
logger.error(

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
2424
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
2525
import { Executor } from '@/executor'
26+
import type { ExecutionResult } from '@/executor/types'
2627
import { Serializer } from '@/serializer'
2728
import { RateLimitError, RateLimiter, type TriggerType } from '@/services/queue'
2829
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -354,13 +355,21 @@ async function executeWorkflow(
354355
} catch (error: any) {
355356
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, error)
356357

358+
const executionResultForError = (error?.executionResult as ExecutionResult | undefined) || {
359+
success: false,
360+
output: {},
361+
logs: [],
362+
}
363+
const { traceSpans } = buildTraceSpans(executionResultForError)
364+
357365
await loggingSession.safeCompleteWithError({
358366
endedAt: new Date().toISOString(),
359367
totalDurationMs: 0,
360368
error: {
361369
message: error.message || 'Workflow execution failed',
362370
stackTrace: error.stack,
363371
},
372+
traceSpans,
364373
})
365374

366375
throw error

apps/sim/app/api/workflows/[id]/log/route.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
4444
variables: {},
4545
})
4646

47+
const { traceSpans } = buildTraceSpans(result)
48+
4749
if (result.success === false) {
4850
const message = result.error || 'Workflow execution failed'
4951
await loggingSession.safeCompleteWithError({
5052
endedAt: new Date().toISOString(),
5153
totalDurationMs: result.metadata?.duration || 0,
5254
error: { message },
55+
traceSpans,
5356
})
5457
} else {
55-
const { traceSpans } = buildTraceSpans(result)
5658
await loggingSession.safeComplete({
5759
endedAt: new Date().toISOString(),
5860
totalDurationMs: result.metadata?.duration || 0,

0 commit comments

Comments
 (0)