Skip to content

Commit 8c704d0

Browse files
committed
improvement(mothership): stream rety state machine, progressive re-rendering
1 parent b86ebb3 commit 8c704d0

6 files changed

Lines changed: 217 additions & 73 deletions

File tree

apps/sim/app/api/copilot/chat/stream/route.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
3838
}),
3939
encodeSSEEnvelope: (event: Record<string, unknown>) =>
4040
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
41+
encodeSSEComment: (comment: string) => new TextEncoder().encode(`: ${comment}\n\n`),
4142
SSE_RESPONSE_HEADERS: {
4243
'Content-Type': 'text/event-stream',
4344
},
@@ -132,6 +133,7 @@ describe('copilot chat stream replay route', () => {
132133
)
133134

134135
const chunks = await readAllChunks(response)
136+
expect(chunks[0]).toBe(': accepted\n\n')
135137
expect(chunks.join('')).toContain(
136138
JSON.stringify({
137139
status: MothershipStreamV1CompletionStatus.cancelled,

apps/sim/app/api/copilot/chat/stream/route.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { getCopilotTracer, markSpanForError } from '@/lib/copilot/request/otel'
1919
import {
2020
checkForReplayGap,
2121
createEvent,
22+
encodeSSEComment,
2223
encodeSSEEnvelope,
2324
readEvents,
2425
readFilePreviewSessions,
@@ -31,6 +32,7 @@ export const maxDuration = 3600
3132

3233
const logger = createLogger('CopilotChatStreamAPI')
3334
const POLL_INTERVAL_MS = 250
35+
const REPLAY_KEEPALIVE_INTERVAL_MS = 15_000
3436
const MAX_STREAM_MS = 60 * 60 * 1000
3537

3638
function extractCanonicalRequestId(value: unknown): string {
@@ -266,6 +268,7 @@ async function handleResumeRequestBody({
266268
let controllerClosed = false
267269
let sawTerminalEvent = false
268270
let currentRequestId = extractRunRequestId(run)
271+
let lastWriteTime = Date.now()
269272
// Stamp the logical request id + chat id on the resume root as soon
270273
// as we resolve them from the run row, so TraceQL joins work on
271274
// resume legs the same way they do on the original POST.
@@ -291,6 +294,19 @@ async function handleResumeRequestBody({
291294
if (controllerClosed) return false
292295
try {
293296
controller.enqueue(encodeSSEEnvelope(payload))
297+
lastWriteTime = Date.now()
298+
return true
299+
} catch {
300+
controllerClosed = true
301+
return false
302+
}
303+
}
304+
305+
const enqueueComment = (comment: string) => {
306+
if (controllerClosed) return false
307+
try {
308+
controller.enqueue(encodeSSEComment(comment))
309+
lastWriteTime = Date.now()
294310
return true
295311
} catch {
296312
controllerClosed = true
@@ -352,10 +368,19 @@ async function handleResumeRequestBody({
352368
}
353369

354370
try {
371+
enqueueComment('accepted')
372+
355373
const gap = await checkForReplayGap(streamId, afterCursor, currentRequestId)
356374
if (gap) {
357375
for (const envelope of gap.envelopes) {
358-
enqueueEvent(envelope)
376+
cursor = envelope.stream.cursor ?? String(envelope.seq)
377+
currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId
378+
if (envelope.type === MothershipStreamV1EventType.complete) {
379+
sawTerminalEvent = true
380+
}
381+
if (!enqueueEvent(envelope)) {
382+
break
383+
}
359384
}
360385
return
361386
}
@@ -408,6 +433,10 @@ async function handleResumeRequestBody({
408433
break
409434
}
410435

436+
if (Date.now() - lastWriteTime >= REPLAY_KEEPALIVE_INTERVAL_MS) {
437+
enqueueComment('keepalive')
438+
}
439+
411440
await sleep(POLL_INTERVAL_MS)
412441
}
413442
if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) {

apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use client'
22

3-
import { useCallback, useLayoutEffect, useRef } from 'react'
3+
import { useCallback, useLayoutEffect, useMemo, useRef } from 'react'
44
import { cn } from '@/lib/core/utils/cn'
55
import { MessageActions } from '@/app/workspace/[workspaceId]/components'
66
import { ChatMessageAttachments } from '@/app/workspace/[workspaceId]/home/components/chat-message-attachments'
@@ -22,6 +22,7 @@ import type {
2222
QueuedMessage,
2323
} from '@/app/workspace/[workspaceId]/home/types'
2424
import { useAutoScroll } from '@/hooks/use-auto-scroll'
25+
import { useProgressiveList } from '@/hooks/use-progressive-list'
2526
import type { ChatContext } from '@/stores/panel'
2627
import { MothershipChatSkeleton } from './mothership-chat-skeleton'
2728

@@ -104,6 +105,21 @@ export function MothershipChat({
104105
scrollOnMount: true,
105106
})
106107
const hasMessages = messages.length > 0
108+
const stagingKey = chatId ?? 'pending-chat'
109+
const { staged: stagedMessages, isStaging } = useProgressiveList(messages, stagingKey)
110+
const stagedMessageCount = stagedMessages.length
111+
const stagedOffset = messages.length - stagedMessages.length
112+
const precedingUserContentByIndex = useMemo(() => {
113+
const contentByIndex: Array<string | undefined> = []
114+
let lastUserContent: string | undefined
115+
for (const [index, message] of messages.entries()) {
116+
contentByIndex[index] = lastUserContent
117+
if (message.role === 'user') {
118+
lastUserContent = message.content
119+
}
120+
}
121+
return contentByIndex
122+
}, [messages])
107123
const initialScrollDoneRef = useRef(false)
108124
const userInputRef = useRef<UserInputHandle>(null)
109125
const handleSendQueuedHead = useCallback(() => {
@@ -134,14 +150,20 @@ export function MothershipChat({
134150
scrollToBottom()
135151
}, [hasMessages, initialScrollBlocked, scrollToBottom])
136152

153+
useLayoutEffect(() => {
154+
if (!isStaging || initialScrollBlocked || !initialScrollDoneRef.current) return
155+
scrollToBottom()
156+
}, [isStaging, stagedMessageCount, initialScrollBlocked, scrollToBottom])
157+
137158
return (
138159
<div className={cn('flex h-full min-h-0 flex-col', className)}>
139160
<div ref={scrollContainerRef} className={styles.scrollContainer}>
140161
{isLoading && !hasMessages ? (
141162
<MothershipChatSkeleton layout={layout} />
142163
) : (
143164
<div className={styles.content}>
144-
{messages.map((msg, index) => {
165+
{stagedMessages.map((msg, localIndex) => {
166+
const index = stagedOffset + localIndex
145167
if (msg.role === 'user') {
146168
const hasAttachments = Boolean(msg.attachments?.length)
147169
return (
@@ -177,10 +199,7 @@ export function MothershipChat({
177199
}
178200

179201
const isLastMessage = index === messages.length - 1
180-
const precedingUserMsg = [...messages]
181-
.slice(0, index)
182-
.reverse()
183-
.find((m) => m.role === 'user')
202+
const precedingUserContent = precedingUserContentByIndex[index]
184203

185204
return (
186205
<div key={msg.id} className={styles.assistantRow}>
@@ -196,7 +215,7 @@ export function MothershipChat({
196215
<MessageActions
197216
content={msg.content}
198217
chatId={chatId}
199-
userQuery={precedingUserMsg?.content}
218+
userQuery={precedingUserContent}
200219
requestId={msg.requestId}
201220
/>
202221
</div>
Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use client'
22

3-
import { useEffect, useRef, useState } from 'react'
3+
import { useEffect, useState } from 'react'
44

55
interface ProgressiveListOptions {
66
/** Number of items to render in the initial batch (most recent items) */
@@ -14,15 +14,31 @@ const DEFAULTS = {
1414
batchSize: 5,
1515
} satisfies Required<ProgressiveListOptions>
1616

17+
interface ProgressiveListState {
18+
key: string
19+
count: number
20+
caughtUp: boolean
21+
}
22+
23+
function createInitialState(
24+
key: string,
25+
itemCount: number,
26+
initialBatch: number
27+
): ProgressiveListState {
28+
const count = Math.min(itemCount, initialBatch)
29+
return {
30+
key,
31+
count,
32+
caughtUp: itemCount > 0 && count >= itemCount,
33+
}
34+
}
35+
1736
/**
1837
* Progressively renders a list of items so that first paint is fast.
1938
*
2039
* On mount (or when `key` changes), only the most recent `initialBatch`
2140
* items are rendered. The rest are added in `batchSize` increments via
22-
* `requestAnimationFrame` so the browser never blocks on a large DOM mount.
23-
*
24-
* Once staging completes for a given key it never re-stages -- new items
25-
* appended to the list are rendered immediately.
41+
* `requestAnimationFrame`.
2642
*
2743
* @param items Full list of items to render.
2844
* @param key A session/conversation identifier. When it changes,
@@ -35,67 +51,77 @@ export function useProgressiveList<T>(
3551
key: string,
3652
options?: ProgressiveListOptions
3753
): { staged: T[]; isStaging: boolean } {
38-
const initialBatch = options?.initialBatch ?? DEFAULTS.initialBatch
39-
const batchSize = options?.batchSize ?? DEFAULTS.batchSize
54+
const initialBatch = Math.max(0, options?.initialBatch ?? DEFAULTS.initialBatch)
55+
const batchSize = Math.max(1, options?.batchSize ?? DEFAULTS.batchSize)
56+
const [state, setState] = useState(() => createInitialState(key, items.length, initialBatch))
4057

41-
const completedKeysRef = useRef(new Set<string>())
42-
const prevKeyRef = useRef(key)
43-
const stagingCountRef = useRef(initialBatch)
44-
const [count, setCount] = useState(() => {
45-
if (items.length <= initialBatch) return items.length
46-
return initialBatch
47-
})
58+
const renderState =
59+
state.key === key && (state.count > 0 || items.length === 0 || state.caughtUp)
60+
? state
61+
: createInitialState(key, items.length, initialBatch)
4862

4963
useEffect(() => {
50-
if (completedKeysRef.current.has(key)) {
51-
setCount(items.length)
52-
return
53-
}
64+
setState((prev) => {
65+
if (prev.key !== key) {
66+
return createInitialState(key, items.length, initialBatch)
67+
}
5468

55-
if (items.length <= initialBatch) {
56-
setCount(items.length)
57-
completedKeysRef.current.add(key)
58-
return
59-
}
69+
if (items.length === 0) {
70+
if (prev.count === 0 && !prev.caughtUp) {
71+
return prev
72+
}
73+
return { key, count: 0, caughtUp: false }
74+
}
6075

61-
let current = Math.max(stagingCountRef.current, initialBatch)
62-
setCount(current)
76+
if (prev.caughtUp) {
77+
if (prev.count === items.length) {
78+
return prev
79+
}
80+
return { key, count: items.length, caughtUp: true }
81+
}
6382

64-
let frame: number | undefined
83+
const minimumCount = Math.min(items.length, initialBatch)
84+
if (prev.count >= minimumCount && prev.count <= items.length) {
85+
return prev
86+
}
6587

66-
const step = () => {
67-
const total = items.length
68-
current = Math.min(total, current + batchSize)
69-
stagingCountRef.current = current
70-
setCount(current)
71-
if (current >= total) {
72-
completedKeysRef.current.add(key)
73-
frame = undefined
74-
return
88+
const count = Math.min(items.length, Math.max(prev.count, minimumCount))
89+
return {
90+
key,
91+
count,
92+
caughtUp: count >= items.length,
7593
}
76-
frame = requestAnimationFrame(step)
94+
})
95+
}, [key, items.length, initialBatch])
96+
97+
useEffect(() => {
98+
if (state.key !== key || state.caughtUp || state.count >= items.length) {
99+
return
77100
}
78101

79-
frame = requestAnimationFrame(step)
102+
const frame = requestAnimationFrame(() => {
103+
setState((prev) => {
104+
if (prev.key !== key || prev.caughtUp) {
105+
return prev
106+
}
80107

81-
return () => {
82-
if (frame !== undefined) cancelAnimationFrame(frame)
83-
}
84-
}, [key, items.length, initialBatch, batchSize])
108+
const count = Math.min(items.length, prev.count + batchSize)
109+
return {
110+
key,
111+
count,
112+
caughtUp: count >= items.length,
113+
}
114+
})
115+
})
85116

86-
let effectiveCount = count
87-
if (prevKeyRef.current !== key) {
88-
effectiveCount = items.length <= initialBatch ? items.length : initialBatch
89-
stagingCountRef.current = initialBatch
90-
}
91-
prevKeyRef.current = key
92-
93-
const isCompleted = completedKeysRef.current.has(key)
94-
const isStaging = !isCompleted && effectiveCount < items.length
95-
const staged =
96-
isCompleted || effectiveCount >= items.length
97-
? items
98-
: items.slice(Math.max(0, items.length - effectiveCount))
117+
return () => cancelAnimationFrame(frame)
118+
}, [state.key, state.count, state.caughtUp, key, items.length, batchSize])
119+
120+
const effectiveCount = renderState.caughtUp
121+
? items.length
122+
: Math.min(renderState.count, items.length)
123+
const staged = items.slice(Math.max(0, items.length - effectiveCount))
124+
const isStaging = effectiveCount < items.length
99125

100126
return { staged, isStaging }
101127
}

0 commit comments

Comments
 (0)