diff --git a/apps/sim/app/api/copilot/chat/stream/route.test.ts b/apps/sim/app/api/copilot/chat/stream/route.test.ts index 803f91af2c..aa7c85b250 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.test.ts @@ -38,6 +38,7 @@ vi.mock('@/lib/copilot/request/session', () => ({ }), encodeSSEEnvelope: (event: Record) => new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`), + encodeSSEComment: (comment: string) => new TextEncoder().encode(`: ${comment}\n\n`), SSE_RESPONSE_HEADERS: { 'Content-Type': 'text/event-stream', }, @@ -132,6 +133,7 @@ describe('copilot chat stream replay route', () => { ) const chunks = await readAllChunks(response) + expect(chunks[0]).toBe(': accepted\n\n') expect(chunks.join('')).toContain( JSON.stringify({ status: MothershipStreamV1CompletionStatus.cancelled, diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index 45a4c3c987..3d7ab03b43 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -19,6 +19,7 @@ import { getCopilotTracer, markSpanForError } from '@/lib/copilot/request/otel' import { checkForReplayGap, createEvent, + encodeSSEComment, encodeSSEEnvelope, readEvents, readFilePreviewSessions, @@ -31,6 +32,7 @@ export const maxDuration = 3600 const logger = createLogger('CopilotChatStreamAPI') const POLL_INTERVAL_MS = 250 +const REPLAY_KEEPALIVE_INTERVAL_MS = 15_000 const MAX_STREAM_MS = 60 * 60 * 1000 function extractCanonicalRequestId(value: unknown): string { @@ -266,6 +268,7 @@ async function handleResumeRequestBody({ let controllerClosed = false let sawTerminalEvent = false let currentRequestId = extractRunRequestId(run) + let lastWriteTime = Date.now() // Stamp the logical request id + chat id on the resume root as soon // as we resolve them from the run row, so TraceQL joins work on // resume legs the same way they do on the original POST. @@ -291,6 +294,19 @@ async function handleResumeRequestBody({ if (controllerClosed) return false try { controller.enqueue(encodeSSEEnvelope(payload)) + lastWriteTime = Date.now() + return true + } catch { + controllerClosed = true + return false + } + } + + const enqueueComment = (comment: string) => { + if (controllerClosed) return false + try { + controller.enqueue(encodeSSEComment(comment)) + lastWriteTime = Date.now() return true } catch { controllerClosed = true @@ -306,7 +322,6 @@ async function handleResumeRequestBody({ const flushEvents = async () => { const events = await readEvents(streamId, cursor) if (events.length > 0) { - totalEventsFlushed += events.length logger.debug('[Resume] Flushing events', { streamId, afterCursor: cursor, @@ -314,14 +329,15 @@ async function handleResumeRequestBody({ }) } for (const envelope of events) { + if (!enqueueEvent(envelope)) { + break + } + totalEventsFlushed += 1 cursor = envelope.stream.cursor ?? String(envelope.seq) currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId if (envelope.type === MothershipStreamV1EventType.complete) { sawTerminalEvent = true } - if (!enqueueEvent(envelope)) { - break - } } } @@ -341,21 +357,30 @@ async function handleResumeRequestBody({ reason: options?.reason, requestId: currentRequestId, })) { + if (!enqueueEvent(envelope)) { + break + } cursor = envelope.stream.cursor ?? String(envelope.seq) if (envelope.type === MothershipStreamV1EventType.complete) { sawTerminalEvent = true } - if (!enqueueEvent(envelope)) { - break - } } } try { + enqueueComment('accepted') + const gap = await checkForReplayGap(streamId, afterCursor, currentRequestId) if (gap) { for (const envelope of gap.envelopes) { - enqueueEvent(envelope) + if (!enqueueEvent(envelope)) { + break + } + cursor = envelope.stream.cursor ?? String(envelope.seq) + currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId + if (envelope.type === MothershipStreamV1EventType.complete) { + sawTerminalEvent = true + } } return } @@ -408,6 +433,10 @@ async function handleResumeRequestBody({ break } + if (Date.now() - lastWriteTime >= REPLAY_KEEPALIVE_INTERVAL_MS) { + enqueueComment('keepalive') + } + await sleep(POLL_INTERVAL_MS) } if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) { diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx index b477c4744e..ccf194d732 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx @@ -1,6 +1,6 @@ 'use client' -import { useCallback, useLayoutEffect, useRef } from 'react' +import { useCallback, useLayoutEffect, useMemo, useRef } from 'react' import { cn } from '@/lib/core/utils/cn' import { MessageActions } from '@/app/workspace/[workspaceId]/components' import { ChatMessageAttachments } from '@/app/workspace/[workspaceId]/home/components/chat-message-attachments' @@ -22,6 +22,7 @@ import type { QueuedMessage, } from '@/app/workspace/[workspaceId]/home/types' import { useAutoScroll } from '@/hooks/use-auto-scroll' +import { useProgressiveList } from '@/hooks/use-progressive-list' import type { ChatContext } from '@/stores/panel' import { MothershipChatSkeleton } from './mothership-chat-skeleton' @@ -104,6 +105,21 @@ export function MothershipChat({ scrollOnMount: true, }) const hasMessages = messages.length > 0 + const stagingKey = chatId ?? 'pending-chat' + const { staged: stagedMessages, isStaging } = useProgressiveList(messages, stagingKey) + const stagedMessageCount = stagedMessages.length + const stagedOffset = messages.length - stagedMessages.length + const precedingUserContentByIndex = useMemo(() => { + const contentByIndex: Array = [] + let lastUserContent: string | undefined + for (const [index, message] of messages.entries()) { + contentByIndex[index] = lastUserContent + if (message.role === 'user') { + lastUserContent = message.content + } + } + return contentByIndex + }, [messages]) const initialScrollDoneRef = useRef(false) const userInputRef = useRef(null) const handleSendQueuedHead = useCallback(() => { @@ -134,6 +150,11 @@ export function MothershipChat({ scrollToBottom() }, [hasMessages, initialScrollBlocked, scrollToBottom]) + useLayoutEffect(() => { + if (!isStaging || initialScrollBlocked || !initialScrollDoneRef.current) return + scrollToBottom() + }, [isStaging, stagedMessageCount, initialScrollBlocked, scrollToBottom]) + return (
@@ -141,7 +162,8 @@ export function MothershipChat({ ) : (
- {messages.map((msg, index) => { + {stagedMessages.map((msg, localIndex) => { + const index = stagedOffset + localIndex if (msg.role === 'user') { const hasAttachments = Boolean(msg.attachments?.length) return ( @@ -177,10 +199,7 @@ export function MothershipChat({ } const isLastMessage = index === messages.length - 1 - const precedingUserMsg = [...messages] - .slice(0, index) - .reverse() - .find((m) => m.role === 'user') + const precedingUserContent = precedingUserContentByIndex[index] return (
@@ -196,7 +215,7 @@ export function MothershipChat({
diff --git a/apps/sim/hooks/use-progressive-list.ts b/apps/sim/hooks/use-progressive-list.ts index 74d7dc87a9..bf60f4d67b 100644 --- a/apps/sim/hooks/use-progressive-list.ts +++ b/apps/sim/hooks/use-progressive-list.ts @@ -1,6 +1,6 @@ 'use client' -import { useEffect, useRef, useState } from 'react' +import { useEffect, useLayoutEffect, useRef, useState } from 'react' interface ProgressiveListOptions { /** Number of items to render in the initial batch (most recent items) */ @@ -14,15 +14,31 @@ const DEFAULTS = { batchSize: 5, } satisfies Required +interface ProgressiveListState { + key: string + count: number + caughtUp: boolean +} + +function createInitialState( + key: string, + itemCount: number, + initialBatch: number +): ProgressiveListState { + const count = Math.min(itemCount, initialBatch) + return { + key, + count, + caughtUp: itemCount > 0 && count >= itemCount, + } +} + /** * Progressively renders a list of items so that first paint is fast. * * On mount (or when `key` changes), only the most recent `initialBatch` * items are rendered. The rest are added in `batchSize` increments via - * `requestAnimationFrame` so the browser never blocks on a large DOM mount. - * - * Once staging completes for a given key it never re-stages -- new items - * appended to the list are rendered immediately. + * `requestAnimationFrame`. * * @param items Full list of items to render. * @param key A session/conversation identifier. When it changes, @@ -35,67 +51,83 @@ export function useProgressiveList( key: string, options?: ProgressiveListOptions ): { staged: T[]; isStaging: boolean } { - const initialBatch = options?.initialBatch ?? DEFAULTS.initialBatch - const batchSize = options?.batchSize ?? DEFAULTS.batchSize + const initialBatch = Math.max(0, options?.initialBatch ?? DEFAULTS.initialBatch) + const batchSize = Math.max(1, options?.batchSize ?? DEFAULTS.batchSize) + const [state, setState] = useState(() => createInitialState(key, items.length, initialBatch)) + const latestItemCountRef = useRef(items.length) + + useLayoutEffect(() => { + latestItemCountRef.current = items.length + }, [items.length]) - const completedKeysRef = useRef(new Set()) - const prevKeyRef = useRef(key) - const stagingCountRef = useRef(initialBatch) - const [count, setCount] = useState(() => { - if (items.length <= initialBatch) return items.length - return initialBatch - }) + const renderState = + state.key === key && (state.count > 0 || items.length === 0 || state.caughtUp) + ? state + : createInitialState(key, items.length, initialBatch) useEffect(() => { - if (completedKeysRef.current.has(key)) { - setCount(items.length) - return - } + setState((prev) => { + if (prev.key !== key) { + return createInitialState(key, items.length, initialBatch) + } - if (items.length <= initialBatch) { - setCount(items.length) - completedKeysRef.current.add(key) - return - } + if (items.length === 0) { + if (prev.count === 0 && !prev.caughtUp) { + return prev + } + return { key, count: 0, caughtUp: false } + } - let current = Math.max(stagingCountRef.current, initialBatch) - setCount(current) + if (prev.caughtUp) { + if (prev.count === items.length) { + return prev + } + return { key, count: items.length, caughtUp: true } + } - let frame: number | undefined + const minimumCount = Math.min(items.length, initialBatch) + if (prev.count >= minimumCount && prev.count <= items.length) { + return prev + } - const step = () => { - const total = items.length - current = Math.min(total, current + batchSize) - stagingCountRef.current = current - setCount(current) - if (current >= total) { - completedKeysRef.current.add(key) - frame = undefined - return + const count = Math.min(items.length, Math.max(prev.count, minimumCount)) + return { + key, + count, + caughtUp: count >= items.length, } - frame = requestAnimationFrame(step) + }) + }, [key, items.length, initialBatch]) + + useEffect(() => { + if (state.key !== key || state.caughtUp || state.count >= items.length) { + return } - frame = requestAnimationFrame(step) + const frame = requestAnimationFrame(() => { + setState((prev) => { + if (prev.key !== key || prev.caughtUp) { + return prev + } - return () => { - if (frame !== undefined) cancelAnimationFrame(frame) - } - }, [key, items.length, initialBatch, batchSize]) + const itemCount = latestItemCountRef.current + const count = Math.min(itemCount, prev.count + batchSize) + return { + key, + count, + caughtUp: count >= itemCount, + } + }) + }) - let effectiveCount = count - if (prevKeyRef.current !== key) { - effectiveCount = items.length <= initialBatch ? items.length : initialBatch - stagingCountRef.current = initialBatch - } - prevKeyRef.current = key - - const isCompleted = completedKeysRef.current.has(key) - const isStaging = !isCompleted && effectiveCount < items.length - const staged = - isCompleted || effectiveCount >= items.length - ? items - : items.slice(Math.max(0, items.length - effectiveCount)) + return () => cancelAnimationFrame(frame) + }, [state.key, state.count, state.caughtUp, key, items.length, batchSize]) + + const effectiveCount = renderState.caughtUp + ? items.length + : Math.min(renderState.count, items.length) + const staged = items.slice(Math.max(0, items.length - effectiveCount)) + const isStaging = effectiveCount < items.length return { staged, isStaging } } diff --git a/apps/sim/lib/copilot/request/go/stream.test.ts b/apps/sim/lib/copilot/request/go/stream.test.ts index f9f80384c8..8234658ddc 100644 --- a/apps/sim/lib/copilot/request/go/stream.test.ts +++ b/apps/sim/lib/copilot/request/go/stream.test.ts @@ -194,6 +194,64 @@ describe('copilot go stream helpers', () => { ) }) + it('does not retry transient backend statuses because stream requests are not idempotent', async () => { + vi.mocked(fetch).mockResolvedValueOnce(new Response('bad gateway', { status: 502 })) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + + await expect( + runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + }) + ).rejects.toMatchObject({ + name: 'CopilotBackendError', + status: 502, + body: 'bad gateway', + }) + + expect(fetch).toHaveBeenCalledTimes(1) + }) + + it('does not retry non-transient backend statuses before the SSE stream opens', async () => { + vi.mocked(fetch).mockResolvedValueOnce(new Response('limit reached', { status: 402 })) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + + await expect( + runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + }) + ).rejects.toThrow('Usage limit reached') + + expect(fetch).toHaveBeenCalledTimes(1) + }) + + it('does not retry network errors because Go may already be executing the request', async () => { + vi.mocked(fetch).mockRejectedValueOnce(new TypeError('fetch failed')) + + const context = createStreamingContext() + const execContext: ExecutionContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + + await expect( + runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, { + timeout: 1000, + }) + ).rejects.toThrow('fetch failed') + + expect(fetch).toHaveBeenCalledTimes(1) + }) + it('fails closed when the shared stream ends before a terminal event', async () => { const textEvent = createEvent({ streamId: 'stream-1', diff --git a/apps/sim/lib/copilot/request/go/stream.ts b/apps/sim/lib/copilot/request/go/stream.ts index a3e42f9437..d179653086 100644 --- a/apps/sim/lib/copilot/request/go/stream.ts +++ b/apps/sim/lib/copilot/request/go/stream.ts @@ -134,17 +134,27 @@ export async function runStreamLoop( requestBodyBytes, }) const fetchStart = performance.now() - const response = await fetchGo(fetchUrl, { - ...fetchOptions, - signal: abortSignal, - otelContext: options.otelContext, - spanName: `sim → go ${pathname}`, - operation: 'stream', - attributes: { - [TraceAttr.CopilotStream]: true, - ...(requestBodyBytes ? { [TraceAttr.HttpRequestContentLength]: requestBodyBytes } : {}), - }, - }) + let response: Response + try { + response = await fetchGo(fetchUrl, { + ...fetchOptions, + signal: abortSignal, + otelContext: options.otelContext, + spanName: `sim → go ${pathname}`, + operation: 'stream', + attributes: { + [TraceAttr.CopilotStream]: true, + ...(requestBodyBytes ? { [TraceAttr.HttpRequestContentLength]: requestBodyBytes } : {}), + }, + }) + } catch (error) { + fetchSpan.attributes = { + ...(fetchSpan.attributes ?? {}), + headersMs: Math.round(performance.now() - fetchStart), + } + context.trace.endSpan(fetchSpan, abortSignal?.aborted ? 'cancelled' : 'error') + throw error + } const headersElapsedMs = Math.round(performance.now() - fetchStart) fetchSpan.attributes = { ...(fetchSpan.attributes ?? {}),