Skip to content

Commit add55b4

Browse files
waleedlatif1claude
andauthored
improvement(executor): correctness-by-construction for workflow logs (#4382)
* improvement(executor): correctness-by-construction for workflow logs Replace the post-hoc reconciliation layer with a deterministic emission protocol: drain pending callback promises at terminal boundaries, mint a per-invocation blockExecutionId, and key console entries by that ID. Eliminates races between block:* and execution:* events without changing per-block latency. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(executor): remove stale fire-and-forget assertion The deleted test asserted wrappedOnBlockStart returns before the user callback resolves. After the Stage 1 drain refactor, wrappedOnBlockStart must await the user callback so the executor's trackCallback set covers SSE writes — otherwise the drain at terminal-event time can't guarantee block:* flushes before execution:*. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(console-store): cover blockExecutionId keying and idempotency Locks in the new console store invariants: - Primary lookup via entryIdByBlockExecutionId fires no legacy warn - Unknown blockExecutionId falls back to legacy keying and warns - No-blockExecutionId updates use legacy path silently - addConsole twice with same blockExecutionId returns the existing entry - Distinct blockExecutionIds (loop iterations) produce distinct entries Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(executor): capture BlockExecutor locally so finally drains its own instance Previously this.blockExecutor was overwritten on every buildExecutionPipeline call. Concurrent or re-entrant execute()/executeFromBlock() calls would have their finally block drain the wrong instance, allowing the first execution's block events to land after its terminal event. Returning { engine, blockExecutor } and capturing both locally makes the drain pinned to the same instance the engine.run() ran against. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(terminal): capture store logger by label, not first warn-bearing mock The previous capture used `.find` over all createLogger results, which returned whichever module created a logger first — not the store's logger — causing the legacy-keying warn assertion to see 0 calls. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent bdaf112 commit add55b4

14 files changed

Lines changed: 384 additions & 458 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,8 @@ async function handleExecutePost(
919919
blockType: string,
920920
executionOrder: number,
921921
iterationContext?: IterationContext,
922-
childWorkflowContext?: ChildWorkflowContext
922+
childWorkflowContext?: ChildWorkflowContext,
923+
blockExecutionId?: string
923924
) => {
924925
reqLogger.info('onBlockStart called', { blockId, blockName, blockType })
925926
sendEvent({
@@ -945,6 +946,7 @@ async function handleExecutePost(
945946
childWorkflowBlockId: childWorkflowContext.parentBlockId,
946947
childWorkflowName: childWorkflowContext.workflowName,
947948
}),
949+
...(blockExecutionId && { blockExecutionId }),
948950
},
949951
})
950952
}
@@ -955,7 +957,8 @@ async function handleExecutePost(
955957
blockType: string,
956958
callbackData: any,
957959
iterationContext?: IterationContext,
958-
childWorkflowContext?: ChildWorkflowContext
960+
childWorkflowContext?: ChildWorkflowContext,
961+
blockExecutionId?: string
959962
) => {
960963
const hasError = callbackData.output?.error
961964
const childWorkflowData = childWorkflowContext
@@ -969,6 +972,11 @@ async function handleExecutePost(
969972
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
970973
: {}
971974

975+
const resolvedBlockExecutionId = blockExecutionId ?? callbackData.blockExecutionId
976+
const blockExecData = resolvedBlockExecutionId
977+
? { blockExecutionId: resolvedBlockExecutionId }
978+
: {}
979+
972980
if (hasError) {
973981
reqLogger.info('onBlockComplete (error) called', {
974982
blockId,
@@ -1002,6 +1010,7 @@ async function handleExecutePost(
10021010
}),
10031011
...childWorkflowData,
10041012
...instanceData,
1013+
...blockExecData,
10051014
},
10061015
})
10071016
} else {
@@ -1036,6 +1045,7 @@ async function handleExecutePost(
10361045
}),
10371046
...childWorkflowData,
10381047
...instanceData,
1048+
...blockExecData,
10391049
},
10401050
})
10411051
}
@@ -1165,7 +1175,6 @@ async function handleExecutePost(
11651175
data: {
11661176
error: timeoutErrorMessage,
11671177
duration: result.metadata?.duration || 0,
1168-
finalBlockLogs: result.logs,
11691178
},
11701179
})
11711180
finalMetaStatus = 'error'
@@ -1179,7 +1188,6 @@ async function handleExecutePost(
11791188
workflowId,
11801189
data: {
11811190
duration: result.metadata?.duration || 0,
1182-
finalBlockLogs: result.logs,
11831191
},
11841192
})
11851193
finalMetaStatus = 'cancelled'
@@ -1244,7 +1252,6 @@ async function handleExecutePost(
12441252
data: {
12451253
error: executionResult?.error || errorMessage,
12461254
duration: executionResult?.metadata?.duration || 0,
1247-
finalBlockLogs: executionResult?.logs,
12481255
},
12491256
})
12501257
finalMetaStatus = 'error'

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -230,31 +230,25 @@ export function useWorkflowExecution() {
230230
durationMs?: number
231231
blockLogs: BlockLog[]
232232
isPreExecutionError?: boolean
233-
finalBlockLogs?: BlockLog[]
234233
}) => {
235234
if (!params.workflowId) return
236235
sharedHandleExecutionErrorConsole(
237-
{ addConsole, updateConsole, cancelRunningEntries },
236+
{ addConsole, updateConsole },
238237
{ ...params, workflowId: params.workflowId }
239238
)
240239
},
241-
[addConsole, cancelRunningEntries, updateConsole]
240+
[addConsole, updateConsole]
242241
)
243242

244243
const handleExecutionCancelledConsole = useCallback(
245-
(params: {
246-
workflowId?: string
247-
executionId?: string
248-
durationMs?: number
249-
finalBlockLogs?: BlockLog[]
250-
}) => {
244+
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
251245
if (!params.workflowId) return
252246
sharedHandleExecutionCancelledConsole(
253-
{ addConsole, updateConsole, cancelRunningEntries },
247+
{ addConsole, updateConsole },
254248
{ ...params, workflowId: params.workflowId }
255249
)
256250
},
257-
[addConsole, cancelRunningEntries, updateConsole]
251+
[addConsole, updateConsole]
258252
)
259253

260254
const buildBlockEventHandlers = useCallback(
@@ -1036,8 +1030,6 @@ export function useWorkflowExecution() {
10361030
accumulatedBlockLogs,
10371031
accumulatedBlockStates,
10381032
executedBlockIds,
1039-
consoleMode: 'update',
1040-
includeStartConsoleEntry: true,
10411033
onBlockCompleteCallback: onBlockComplete,
10421034
})
10431035

@@ -1240,7 +1232,6 @@ export function useWorkflowExecution() {
12401232
durationMs: data.duration,
12411233
blockLogs: accumulatedBlockLogs,
12421234
isPreExecutionError,
1243-
finalBlockLogs: data.finalBlockLogs,
12441235
})
12451236

12461237
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1267,7 +1258,6 @@ export function useWorkflowExecution() {
12671258
workflowId: activeWorkflowId,
12681259
executionId: executionIdRef.current,
12691260
durationMs: data?.duration,
1270-
finalBlockLogs: data?.finalBlockLogs,
12711261
})
12721262

12731263
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1684,8 +1674,6 @@ export function useWorkflowExecution() {
16841674
accumulatedBlockLogs,
16851675
accumulatedBlockStates,
16861676
executedBlockIds,
1687-
consoleMode: 'update',
1688-
includeStartConsoleEntry: true,
16891677
})
16901678

16911679
await executionStream.executeFromBlock({
@@ -1755,7 +1743,6 @@ export function useWorkflowExecution() {
17551743
error: data.error,
17561744
durationMs: data.duration,
17571745
blockLogs: accumulatedBlockLogs,
1758-
finalBlockLogs: data.finalBlockLogs,
17591746
})
17601747

17611748
setCurrentExecutionId(workflowId, null)
@@ -1768,7 +1755,6 @@ export function useWorkflowExecution() {
17681755
workflowId,
17691756
executionId: executionIdRef.current,
17701757
durationMs: data?.duration,
1771-
finalBlockLogs: data?.finalBlockLogs,
17721758
})
17731759

17741760
setCurrentExecutionId(workflowId, null)
@@ -1915,8 +1901,6 @@ export function useWorkflowExecution() {
19151901
accumulatedBlockLogs,
19161902
accumulatedBlockStates,
19171903
executedBlockIds,
1918-
consoleMode: 'update',
1919-
includeStartConsoleEntry: true,
19201904
})
19211905

19221906
const capturedExecutionId = executionId
@@ -2017,7 +2001,6 @@ export function useWorkflowExecution() {
20172001
executionId: capturedExecutionId,
20182002
error: data.error,
20192003
blockLogs: accumulatedBlockLogs,
2020-
finalBlockLogs: data.finalBlockLogs,
20212004
})
20222005
},
20232006
onExecutionCancelled: (data) => {
@@ -2038,7 +2021,6 @@ export function useWorkflowExecution() {
20382021
workflowId: reconnectWorkflowId,
20392022
executionId: capturedExecutionId,
20402023
durationMs: data?.duration,
2041-
finalBlockLogs: data?.finalBlockLogs,
20422024
})
20432025
},
20442026
},

0 commit comments

Comments
 (0)