Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/sim/app/api/copilot/chat/stream/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
}),
encodeSSEEnvelope: (event: Record<string, unknown>) =>
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
encodeSSEComment: (comment: string) => new TextEncoder().encode(`: ${comment}\n\n`),
SSE_RESPONSE_HEADERS: {
'Content-Type': 'text/event-stream',
},
Expand Down Expand Up @@ -132,6 +133,7 @@ describe('copilot chat stream replay route', () => {
)

const chunks = await readAllChunks(response)
expect(chunks[0]).toBe(': accepted\n\n')
expect(chunks.join('')).toContain(
JSON.stringify({
status: MothershipStreamV1CompletionStatus.cancelled,
Expand Down
31 changes: 30 additions & 1 deletion apps/sim/app/api/copilot/chat/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { getCopilotTracer, markSpanForError } from '@/lib/copilot/request/otel'
import {
checkForReplayGap,
createEvent,
encodeSSEComment,
encodeSSEEnvelope,
readEvents,
readFilePreviewSessions,
Expand All @@ -31,6 +32,7 @@ export const maxDuration = 3600

const logger = createLogger('CopilotChatStreamAPI')
const POLL_INTERVAL_MS = 250
const REPLAY_KEEPALIVE_INTERVAL_MS = 15_000
const MAX_STREAM_MS = 60 * 60 * 1000

function extractCanonicalRequestId(value: unknown): string {
Expand Down Expand Up @@ -266,6 +268,7 @@ async function handleResumeRequestBody({
let controllerClosed = false
let sawTerminalEvent = false
let currentRequestId = extractRunRequestId(run)
let lastWriteTime = Date.now()
// Stamp the logical request id + chat id on the resume root as soon
// as we resolve them from the run row, so TraceQL joins work on
// resume legs the same way they do on the original POST.
Expand All @@ -291,6 +294,19 @@ async function handleResumeRequestBody({
if (controllerClosed) return false
try {
controller.enqueue(encodeSSEEnvelope(payload))
lastWriteTime = Date.now()
return true
} catch {
controllerClosed = true
return false
}
}

const enqueueComment = (comment: string) => {
if (controllerClosed) return false
try {
controller.enqueue(encodeSSEComment(comment))
lastWriteTime = Date.now()
return true
} catch {
controllerClosed = true
Expand Down Expand Up @@ -352,10 +368,19 @@ async function handleResumeRequestBody({
}

try {
enqueueComment('accepted')

const gap = await checkForReplayGap(streamId, afterCursor, currentRequestId)
if (gap) {
for (const envelope of gap.envelopes) {
enqueueEvent(envelope)
cursor = envelope.stream.cursor ?? String(envelope.seq)
currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId
if (envelope.type === MothershipStreamV1EventType.complete) {
sawTerminalEvent = true
}
if (!enqueueEvent(envelope)) {
break
}
}
return
Comment thread
icecrasher321 marked this conversation as resolved.
}
Expand Down Expand Up @@ -408,6 +433,10 @@ async function handleResumeRequestBody({
break
}

if (Date.now() - lastWriteTime >= REPLAY_KEEPALIVE_INTERVAL_MS) {
enqueueComment('keepalive')
}

await sleep(POLL_INTERVAL_MS)
}
if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use client'

import { useCallback, useLayoutEffect, useRef } from 'react'
import { useCallback, useLayoutEffect, useMemo, useRef } from 'react'
import { cn } from '@/lib/core/utils/cn'
import { MessageActions } from '@/app/workspace/[workspaceId]/components'
import { ChatMessageAttachments } from '@/app/workspace/[workspaceId]/home/components/chat-message-attachments'
Expand All @@ -22,6 +22,7 @@ import type {
QueuedMessage,
} from '@/app/workspace/[workspaceId]/home/types'
import { useAutoScroll } from '@/hooks/use-auto-scroll'
import { useProgressiveList } from '@/hooks/use-progressive-list'
import type { ChatContext } from '@/stores/panel'
import { MothershipChatSkeleton } from './mothership-chat-skeleton'

Expand Down Expand Up @@ -104,6 +105,21 @@ export function MothershipChat({
scrollOnMount: true,
})
const hasMessages = messages.length > 0
const stagingKey = chatId ?? 'pending-chat'
const { staged: stagedMessages, isStaging } = useProgressiveList(messages, stagingKey)
const stagedMessageCount = stagedMessages.length
const stagedOffset = messages.length - stagedMessages.length
const precedingUserContentByIndex = useMemo(() => {
const contentByIndex: Array<string | undefined> = []
let lastUserContent: string | undefined
for (const [index, message] of messages.entries()) {
contentByIndex[index] = lastUserContent
if (message.role === 'user') {
lastUserContent = message.content
}
}
return contentByIndex
}, [messages])
const initialScrollDoneRef = useRef(false)
const userInputRef = useRef<UserInputHandle>(null)
const handleSendQueuedHead = useCallback(() => {
Expand Down Expand Up @@ -134,14 +150,20 @@ export function MothershipChat({
scrollToBottom()
}, [hasMessages, initialScrollBlocked, scrollToBottom])

useLayoutEffect(() => {
if (!isStaging || initialScrollBlocked || !initialScrollDoneRef.current) return
scrollToBottom()
}, [isStaging, stagedMessageCount, initialScrollBlocked, scrollToBottom])

return (
<div className={cn('flex h-full min-h-0 flex-col', className)}>
<div ref={scrollContainerRef} className={styles.scrollContainer}>
{isLoading && !hasMessages ? (
<MothershipChatSkeleton layout={layout} />
) : (
<div className={styles.content}>
{messages.map((msg, index) => {
{stagedMessages.map((msg, localIndex) => {
const index = stagedOffset + localIndex
if (msg.role === 'user') {
const hasAttachments = Boolean(msg.attachments?.length)
return (
Expand Down Expand Up @@ -177,10 +199,7 @@ export function MothershipChat({
}

const isLastMessage = index === messages.length - 1
const precedingUserMsg = [...messages]
.slice(0, index)
.reverse()
.find((m) => m.role === 'user')
const precedingUserContent = precedingUserContentByIndex[index]

return (
<div key={msg.id} className={styles.assistantRow}>
Expand All @@ -196,7 +215,7 @@ export function MothershipChat({
<MessageActions
content={msg.content}
chatId={chatId}
userQuery={precedingUserMsg?.content}
userQuery={precedingUserContent}
requestId={msg.requestId}
/>
</div>
Expand Down
134 changes: 80 additions & 54 deletions apps/sim/hooks/use-progressive-list.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use client'

import { useEffect, useRef, useState } from 'react'
import { useEffect, useState } from 'react'

interface ProgressiveListOptions {
/** Number of items to render in the initial batch (most recent items) */
Expand All @@ -14,15 +14,31 @@ const DEFAULTS = {
batchSize: 5,
} satisfies Required<ProgressiveListOptions>

interface ProgressiveListState {
key: string
count: number
caughtUp: boolean
}

function createInitialState(
key: string,
itemCount: number,
initialBatch: number
): ProgressiveListState {
const count = Math.min(itemCount, initialBatch)
return {
key,
count,
caughtUp: itemCount > 0 && count >= itemCount,
}
}

/**
* Progressively renders a list of items so that first paint is fast.
*
* On mount (or when `key` changes), only the most recent `initialBatch`
* items are rendered. The rest are added in `batchSize` increments via
* `requestAnimationFrame` so the browser never blocks on a large DOM mount.
*
* Once staging completes for a given key it never re-stages -- new items
* appended to the list are rendered immediately.
* `requestAnimationFrame`.
*
* @param items Full list of items to render.
* @param key A session/conversation identifier. When it changes,
Expand All @@ -35,67 +51,77 @@ export function useProgressiveList<T>(
key: string,
options?: ProgressiveListOptions
): { staged: T[]; isStaging: boolean } {
const initialBatch = options?.initialBatch ?? DEFAULTS.initialBatch
const batchSize = options?.batchSize ?? DEFAULTS.batchSize
const initialBatch = Math.max(0, options?.initialBatch ?? DEFAULTS.initialBatch)
const batchSize = Math.max(1, options?.batchSize ?? DEFAULTS.batchSize)
const [state, setState] = useState(() => createInitialState(key, items.length, initialBatch))

const completedKeysRef = useRef(new Set<string>())
const prevKeyRef = useRef(key)
const stagingCountRef = useRef(initialBatch)
const [count, setCount] = useState(() => {
if (items.length <= initialBatch) return items.length
return initialBatch
})
const renderState =
state.key === key && (state.count > 0 || items.length === 0 || state.caughtUp)
? state
: createInitialState(key, items.length, initialBatch)

useEffect(() => {
if (completedKeysRef.current.has(key)) {
setCount(items.length)
return
}
setState((prev) => {
if (prev.key !== key) {
return createInitialState(key, items.length, initialBatch)
}

if (items.length <= initialBatch) {
setCount(items.length)
completedKeysRef.current.add(key)
return
}
if (items.length === 0) {
if (prev.count === 0 && !prev.caughtUp) {
return prev
}
return { key, count: 0, caughtUp: false }
}

let current = Math.max(stagingCountRef.current, initialBatch)
setCount(current)
if (prev.caughtUp) {
if (prev.count === items.length) {
return prev
}
return { key, count: items.length, caughtUp: true }
}

let frame: number | undefined
const minimumCount = Math.min(items.length, initialBatch)
if (prev.count >= minimumCount && prev.count <= items.length) {
return prev
}

const step = () => {
const total = items.length
current = Math.min(total, current + batchSize)
stagingCountRef.current = current
setCount(current)
if (current >= total) {
completedKeysRef.current.add(key)
frame = undefined
return
const count = Math.min(items.length, Math.max(prev.count, minimumCount))
return {
key,
count,
caughtUp: count >= items.length,
}
frame = requestAnimationFrame(step)
})
}, [key, items.length, initialBatch])

useEffect(() => {
if (state.key !== key || state.caughtUp || state.count >= items.length) {
return
}

frame = requestAnimationFrame(step)
const frame = requestAnimationFrame(() => {
setState((prev) => {
if (prev.key !== key || prev.caughtUp) {
return prev
}

return () => {
if (frame !== undefined) cancelAnimationFrame(frame)
}
}, [key, items.length, initialBatch, batchSize])
const count = Math.min(items.length, prev.count + batchSize)
return {
key,
count,
caughtUp: count >= items.length,
}
})
})

let effectiveCount = count
if (prevKeyRef.current !== key) {
effectiveCount = items.length <= initialBatch ? items.length : initialBatch
stagingCountRef.current = initialBatch
}
prevKeyRef.current = key

const isCompleted = completedKeysRef.current.has(key)
const isStaging = !isCompleted && effectiveCount < items.length
const staged =
isCompleted || effectiveCount >= items.length
? items
: items.slice(Math.max(0, items.length - effectiveCount))
return () => cancelAnimationFrame(frame)
}, [state.key, state.count, state.caughtUp, key, items.length, batchSize])
Comment thread
icecrasher321 marked this conversation as resolved.

const effectiveCount = renderState.caughtUp
? items.length
: Math.min(renderState.count, items.length)
const staged = items.slice(Math.max(0, items.length - effectiveCount))
const isStaging = effectiveCount < items.length

return { staged, isStaging }
}
Loading
Loading