Skip to content

Commit 778f4a5

Browse files
authored
revert(executor): undo correctness-by-construction for workflow logs (#4382) (#4399)
* Revert "improvement(executor): correctness-by-construction for workflow logs (#4382)" This reverts commit add55b4. * fix(terminal): re-read entries inside reconcileFinalBlockLogs loop For workflows with loop or parallel iterations, finalBlockLogs can contain multiple terminal logs sharing the same blockId. The prior code captured entries once before the loop, so entries.find always matched the same first running entry — later iterations stayed isRunning: true and got swept to "cancelled" instead of showing their actual terminal state. Re-read the snapshot per iteration so each updateConsole's isRunning: false flush narrows the next match. * chore(terminal): remove dead consoleMode 'add' branch and helpers Every callsite passes consoleMode: 'update'. Deletes the field, the unreachable 'add' branches in onBlockCompleted/onBlockError, and the addConsoleEntry/addConsoleErrorEntry helpers that only the dead branch invoked. * fix(executor): reconcile finalBlockLogs on execution:completed block:completed callbacks fire-and-forget; the last block's event can arrive after execution:completed, leaving its console entry stuck isRunning. Mirror the error/cancelled paths: emit finalBlockLogs on execution:completed and reconcile + sweep on the client. * fix(executor): emit finalBlockLogs on queued/HITL error and cancel paths queued-workflow-execution and human-in-the-loop-manager only attached finalBlockLogs to execution:completed. Without it on cancel/error, reconcileFinalBlockLogs is a no-op and the client sweep flips already-completed blocks to canceled. Mirror the route.ts pattern.
1 parent b0881de commit 778f4a5

16 files changed

Lines changed: 424 additions & 385 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -919,8 +919,7 @@ async function handleExecutePost(
919919
blockType: string,
920920
executionOrder: number,
921921
iterationContext?: IterationContext,
922-
childWorkflowContext?: ChildWorkflowContext,
923-
blockExecutionId?: string
922+
childWorkflowContext?: ChildWorkflowContext
924923
) => {
925924
reqLogger.info('onBlockStart called', { blockId, blockName, blockType })
926925
sendEvent({
@@ -946,7 +945,6 @@ async function handleExecutePost(
946945
childWorkflowBlockId: childWorkflowContext.parentBlockId,
947946
childWorkflowName: childWorkflowContext.workflowName,
948947
}),
949-
...(blockExecutionId && { blockExecutionId }),
950948
},
951949
})
952950
}
@@ -957,8 +955,7 @@ async function handleExecutePost(
957955
blockType: string,
958956
callbackData: any,
959957
iterationContext?: IterationContext,
960-
childWorkflowContext?: ChildWorkflowContext,
961-
blockExecutionId?: string
958+
childWorkflowContext?: ChildWorkflowContext
962959
) => {
963960
const hasError = callbackData.output?.error
964961
const childWorkflowData = childWorkflowContext
@@ -972,11 +969,6 @@ async function handleExecutePost(
972969
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
973970
: {}
974971

975-
const resolvedBlockExecutionId = blockExecutionId ?? callbackData.blockExecutionId
976-
const blockExecData = resolvedBlockExecutionId
977-
? { blockExecutionId: resolvedBlockExecutionId }
978-
: {}
979-
980972
if (hasError) {
981973
reqLogger.info('onBlockComplete (error) called', {
982974
blockId,
@@ -1010,7 +1002,6 @@ async function handleExecutePost(
10101002
}),
10111003
...childWorkflowData,
10121004
...instanceData,
1013-
...blockExecData,
10141005
},
10151006
})
10161007
} else {
@@ -1045,7 +1036,6 @@ async function handleExecutePost(
10451036
}),
10461037
...childWorkflowData,
10471038
...instanceData,
1048-
...blockExecData,
10491039
},
10501040
})
10511041
}
@@ -1175,6 +1165,7 @@ async function handleExecutePost(
11751165
data: {
11761166
error: timeoutErrorMessage,
11771167
duration: result.metadata?.duration || 0,
1168+
finalBlockLogs: result.logs,
11781169
},
11791170
})
11801171
finalMetaStatus = 'error'
@@ -1188,6 +1179,7 @@ async function handleExecutePost(
11881179
workflowId,
11891180
data: {
11901181
duration: result.metadata?.duration || 0,
1182+
finalBlockLogs: result.logs,
11911183
},
11921184
})
11931185
finalMetaStatus = 'cancelled'
@@ -1228,6 +1220,7 @@ async function handleExecutePost(
12281220
duration: result.metadata?.duration || 0,
12291221
startTime: result.metadata?.startTime || startTime.toISOString(),
12301222
endTime: result.metadata?.endTime || new Date().toISOString(),
1223+
finalBlockLogs: result.logs,
12311224
},
12321225
})
12331226
}
@@ -1252,6 +1245,7 @@ async function handleExecutePost(
12521245
data: {
12531246
error: executionResult?.error || errorMessage,
12541247
duration: executionResult?.metadata?.duration || 0,
1248+
finalBlockLogs: executionResult?.logs,
12551249
},
12561250
})
12571251
finalMetaStatus = 'error'

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
addHttpErrorConsoleEntry,
2626
type BlockEventHandlerConfig,
2727
createBlockEventHandlers,
28+
reconcileFinalBlockLogs,
2829
addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry,
2930
handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole,
3031
handleExecutionErrorConsole as sharedHandleExecutionErrorConsole,
@@ -230,25 +231,31 @@ export function useWorkflowExecution() {
230231
durationMs?: number
231232
blockLogs: BlockLog[]
232233
isPreExecutionError?: boolean
234+
finalBlockLogs?: BlockLog[]
233235
}) => {
234236
if (!params.workflowId) return
235237
sharedHandleExecutionErrorConsole(
236-
{ addConsole, updateConsole },
238+
{ addConsole, updateConsole, cancelRunningEntries },
237239
{ ...params, workflowId: params.workflowId }
238240
)
239241
},
240-
[addConsole, updateConsole]
242+
[addConsole, cancelRunningEntries, updateConsole]
241243
)
242244

243245
const handleExecutionCancelledConsole = useCallback(
244-
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
246+
(params: {
247+
workflowId?: string
248+
executionId?: string
249+
durationMs?: number
250+
finalBlockLogs?: BlockLog[]
251+
}) => {
245252
if (!params.workflowId) return
246253
sharedHandleExecutionCancelledConsole(
247-
{ addConsole, updateConsole },
254+
{ addConsole, updateConsole, cancelRunningEntries },
248255
{ ...params, workflowId: params.workflowId }
249256
)
250257
},
251-
[addConsole, updateConsole]
258+
[addConsole, cancelRunningEntries, updateConsole]
252259
)
253260

254261
const buildBlockEventHandlers = useCallback(
@@ -1030,6 +1037,7 @@ export function useWorkflowExecution() {
10301037
accumulatedBlockLogs,
10311038
accumulatedBlockStates,
10321039
executedBlockIds,
1040+
includeStartConsoleEntry: true,
10331041
onBlockCompleteCallback: onBlockComplete,
10341042
})
10351043

@@ -1123,6 +1131,13 @@ export function useWorkflowExecution() {
11231131

11241132
if (activeWorkflowId) {
11251133
setCurrentExecutionId(activeWorkflowId, null)
1134+
reconcileFinalBlockLogs(
1135+
updateConsole,
1136+
activeWorkflowId,
1137+
executionIdRef.current,
1138+
data.finalBlockLogs
1139+
)
1140+
cancelRunningEntries(activeWorkflowId)
11261141
}
11271142

11281143
executionResult = {
@@ -1232,6 +1247,7 @@ export function useWorkflowExecution() {
12321247
durationMs: data.duration,
12331248
blockLogs: accumulatedBlockLogs,
12341249
isPreExecutionError,
1250+
finalBlockLogs: data.finalBlockLogs,
12351251
})
12361252

12371253
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1258,6 +1274,7 @@ export function useWorkflowExecution() {
12581274
workflowId: activeWorkflowId,
12591275
executionId: executionIdRef.current,
12601276
durationMs: data?.duration,
1277+
finalBlockLogs: data?.finalBlockLogs,
12611278
})
12621279

12631280
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1674,6 +1691,7 @@ export function useWorkflowExecution() {
16741691
accumulatedBlockLogs,
16751692
accumulatedBlockStates,
16761693
executedBlockIds,
1694+
includeStartConsoleEntry: true,
16771695
})
16781696

16791697
await executionStream.executeFromBlock({
@@ -1692,6 +1710,14 @@ export function useWorkflowExecution() {
16921710
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
16931711

16941712
onExecutionCompleted: (data) => {
1713+
reconcileFinalBlockLogs(
1714+
updateConsole,
1715+
workflowId,
1716+
executionIdRef.current,
1717+
data.finalBlockLogs
1718+
)
1719+
cancelRunningEntries(workflowId)
1720+
16951721
if (data.success) {
16961722
executedBlockIds.add(blockId)
16971723

@@ -1743,6 +1769,7 @@ export function useWorkflowExecution() {
17431769
error: data.error,
17441770
durationMs: data.duration,
17451771
blockLogs: accumulatedBlockLogs,
1772+
finalBlockLogs: data.finalBlockLogs,
17461773
})
17471774

17481775
setCurrentExecutionId(workflowId, null)
@@ -1755,6 +1782,7 @@ export function useWorkflowExecution() {
17551782
workflowId,
17561783
executionId: executionIdRef.current,
17571784
durationMs: data?.duration,
1785+
finalBlockLogs: data?.finalBlockLogs,
17581786
})
17591787

17601788
setCurrentExecutionId(workflowId, null)
@@ -1901,6 +1929,7 @@ export function useWorkflowExecution() {
19011929
accumulatedBlockLogs,
19021930
accumulatedBlockStates,
19031931
executedBlockIds,
1932+
includeStartConsoleEntry: true,
19041933
})
19051934

19061935
const capturedExecutionId = executionId
@@ -1967,7 +1996,7 @@ export function useWorkflowExecution() {
19671996
onBlockCompleted: wrapHandler(handlers.onBlockCompleted),
19681997
onBlockError: wrapHandler(handlers.onBlockError),
19691998
onBlockChildWorkflowStarted: wrapHandler(handlers.onBlockChildWorkflowStarted),
1970-
onExecutionCompleted: () => {
1999+
onExecutionCompleted: (data) => {
19712000
reconnectionComplete = true
19722001
activeReconnections.delete(reconnectWorkflowId)
19732002
if (!activated) {
@@ -1981,6 +2010,13 @@ export function useWorkflowExecution() {
19812010
setCurrentExecutionId(reconnectWorkflowId, null)
19822011
setIsExecuting(reconnectWorkflowId, false)
19832012
setActiveBlocks(reconnectWorkflowId, new Set())
2013+
reconcileFinalBlockLogs(
2014+
updateConsole,
2015+
reconnectWorkflowId,
2016+
capturedExecutionId,
2017+
data?.finalBlockLogs
2018+
)
2019+
cancelRunningEntries(reconnectWorkflowId)
19842020
},
19852021
onExecutionError: (data) => {
19862022
reconnectionComplete = true
@@ -2001,6 +2037,7 @@ export function useWorkflowExecution() {
20012037
executionId: capturedExecutionId,
20022038
error: data.error,
20032039
blockLogs: accumulatedBlockLogs,
2040+
finalBlockLogs: data.finalBlockLogs,
20042041
})
20052042
},
20062043
onExecutionCancelled: (data) => {
@@ -2021,6 +2058,7 @@ export function useWorkflowExecution() {
20212058
workflowId: reconnectWorkflowId,
20222059
executionId: capturedExecutionId,
20232060
durationMs: data?.duration,
2061+
finalBlockLogs: data?.finalBlockLogs,
20242062
})
20252063
},
20262064
},

0 commit comments

Comments
 (0)