Skip to content

Commit 52f0a01

Browse files
committed
fix chat execution
1 parent 4ccc0f2 commit 52f0a01

1 file changed

Lines changed: 117 additions & 21 deletions

File tree

apps/sim/app/api/chat/utils.ts

Lines changed: 117 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import { createLogger } from '@/lib/logs/console/logger'
1010
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1111
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
1212
import { hasAdminPermission } from '@/lib/permissions/utils'
13+
import { processStreamingBlockLogs } from '@/lib/tokenization'
1314
import { decryptSecret, generateRequestId } from '@/lib/utils'
1415
import { TriggerUtils } from '@/lib/workflows/triggers'
1516
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
1617
import { getBlock } from '@/blocks'
1718
import { Executor } from '@/executor'
18-
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
19+
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
1920
import { Serializer } from '@/serializer'
2021
import { mergeSubblockState } from '@/stores/workflows/server-utils'
2122
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -688,31 +689,126 @@ export async function executeWorkflowForChat(
688689

689690
executionResultForLogging = executionResult
690691

691-
logger.info(`[${requestId}] Chat workflow execution completed:`, {
692-
success: executionResult.success,
693-
executionTime: executionResult.metadata?.duration,
694-
})
692+
if (executionResult?.logs) {
693+
const processedOutputs = new Set<string>()
694+
executionResult.logs.forEach((log: BlockLog) => {
695+
if (streamedContent.has(log.blockId)) {
696+
const content = streamedContent.get(log.blockId)
697+
if (log.output && content) {
698+
const separator = processedOutputs.size > 0 ? '\n\n' : ''
699+
log.output.content = separator + content
700+
processedOutputs.add(log.blockId)
701+
}
702+
}
703+
})
695704

696-
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
705+
const nonStreamingLogs = executionResult.logs.filter(
706+
(log: BlockLog) => !streamedContent.has(log.blockId)
707+
)
697708

698-
await loggingSession.safeComplete({
699-
endedAt: new Date().toISOString(),
700-
totalDurationMs: totalDuration || 0,
701-
finalOutput: executionResult.output || {},
702-
traceSpans: (traceSpans || []) as any,
703-
})
709+
const extractBlockIdFromOutputId = (outputId: string): string => {
710+
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
711+
}
704712

705-
sessionCompleted = true
713+
const extractPathFromOutputId = (outputId: string, blockId: string): string => {
714+
return outputId.substring(blockId.length + 1)
715+
}
706716

707-
controller.enqueue(
708-
encoder.encode(
709-
`data: ${JSON.stringify({
710-
event: 'complete',
711-
success: executionResult.success,
712-
output: executionResult.output,
713-
})}\n\n`
717+
const parseOutputContentSafely = (output: any): any => {
718+
if (!output?.content) {
719+
return output
720+
}
721+
722+
if (typeof output.content === 'string') {
723+
try {
724+
return JSON.parse(output.content)
725+
} catch (e) {
726+
return output
727+
}
728+
}
729+
730+
return output
731+
}
732+
733+
const outputsToRender = selectedOutputIds.filter((outputId) => {
734+
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
735+
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
736+
})
737+
738+
for (const outputId of outputsToRender) {
739+
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
740+
const path = extractPathFromOutputId(outputId, blockIdForOutput)
741+
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
742+
743+
if (log) {
744+
let outputValue: any = log.output
745+
746+
if (path) {
747+
outputValue = parseOutputContentSafely(outputValue)
748+
749+
const pathParts = path.split('.')
750+
for (const part of pathParts) {
751+
if (outputValue && typeof outputValue === 'object' && part in outputValue) {
752+
outputValue = outputValue[part]
753+
} else {
754+
outputValue = undefined
755+
break
756+
}
757+
}
758+
}
759+
760+
if (outputValue !== undefined) {
761+
const separator = processedOutputs.size > 0 ? '\n\n' : ''
762+
763+
const formattedOutput =
764+
typeof outputValue === 'string'
765+
? outputValue
766+
: JSON.stringify(outputValue, null, 2)
767+
768+
if (!log.output.content) {
769+
log.output.content = separator + formattedOutput
770+
} else {
771+
log.output.content = separator + formattedOutput
772+
}
773+
processedOutputs.add(log.blockId)
774+
}
775+
}
776+
}
777+
778+
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
779+
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
780+
781+
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
782+
const enrichedResult = { ...executionResult, traceSpans, totalDuration }
783+
if (conversationId) {
784+
if (!enrichedResult.metadata) {
785+
enrichedResult.metadata = {
786+
duration: totalDuration,
787+
startTime: new Date().toISOString(),
788+
}
789+
}
790+
;(enrichedResult.metadata as any).conversationId = conversationId
791+
}
792+
}
793+
794+
if (!(result && typeof result === 'object' && 'stream' in result)) {
795+
controller.enqueue(
796+
encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`)
714797
)
715-
)
798+
}
799+
800+
if (!sessionCompleted) {
801+
const resultForTracing =
802+
executionResult || ({ success: true, output: {}, logs: [] } as ExecutionResult)
803+
const { traceSpans } = buildTraceSpans(resultForTracing)
804+
await loggingSession.safeComplete({
805+
endedAt: new Date().toISOString(),
806+
totalDurationMs: executionResult?.metadata?.duration || 0,
807+
finalOutput: executionResult?.output || {},
808+
traceSpans,
809+
})
810+
sessionCompleted = true
811+
}
716812

717813
controller.close()
718814
} catch (error: any) {

0 commit comments

Comments
 (0)