Skip to content

Commit 04f1d01

Browse files
fix(mothership): Use heartbeat mechanism for chat locks (#4286)
1 parent ccb5f1e commit 04f1d01

7 files changed

Lines changed: 256 additions & 3 deletions

File tree

apps/sim/lib/copilot/request/lifecycle/start.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
210210

211211
const abortPoller = startAbortPoller(streamId, abortController, {
212212
requestId,
213+
chatId,
213214
})
214215
publisher.startKeepalive()
215216

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
5+
import { redisConfigMock, redisConfigMockFns } from '@sim/testing'
6+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
7+
8+
const { mockHasAbortMarker, mockClearAbortMarker, mockWriteAbortMarker } = vi.hoisted(() => ({
9+
mockHasAbortMarker: vi.fn().mockResolvedValue(false),
10+
mockClearAbortMarker: vi.fn().mockResolvedValue(undefined),
11+
mockWriteAbortMarker: vi.fn().mockResolvedValue(undefined),
12+
}))
13+
14+
vi.mock('@/lib/core/config/redis', () => redisConfigMock)
15+
vi.mock('@/lib/copilot/request/session/buffer', () => ({
16+
hasAbortMarker: mockHasAbortMarker,
17+
clearAbortMarker: mockClearAbortMarker,
18+
writeAbortMarker: mockWriteAbortMarker,
19+
}))
20+
vi.mock('@/lib/copilot/request/otel', () => ({
21+
withCopilotSpan: (_span: unknown, _attrs: unknown, fn: (span: unknown) => unknown) =>
22+
fn({ setAttribute: vi.fn() }),
23+
}))
24+
25+
import { startAbortPoller } from '@/lib/copilot/request/session/abort'
26+
27+
describe('startAbortPoller heartbeat', () => {
28+
beforeEach(() => {
29+
vi.clearAllMocks()
30+
vi.useFakeTimers()
31+
mockHasAbortMarker.mockResolvedValue(false)
32+
redisConfigMockFns.mockExtendLock.mockResolvedValue(true)
33+
})
34+
35+
afterEach(() => {
36+
vi.useRealTimers()
37+
})
38+
39+
it('extends the chat stream lock approximately every heartbeat interval', async () => {
40+
const controller = new AbortController()
41+
const streamId = 'stream-heartbeat-1'
42+
const chatId = 'chat-heartbeat-1'
43+
44+
const interval = startAbortPoller(streamId, controller, { chatId })
45+
46+
try {
47+
await vi.advanceTimersByTimeAsync(15_000)
48+
expect(redisConfigMockFns.mockExtendLock).not.toHaveBeenCalled()
49+
50+
await vi.advanceTimersByTimeAsync(6_000)
51+
52+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(1)
53+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenLastCalledWith(
54+
`copilot:chat-stream-lock:${chatId}`,
55+
streamId,
56+
60
57+
)
58+
59+
await vi.advanceTimersByTimeAsync(20_000)
60+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(2)
61+
62+
await vi.advanceTimersByTimeAsync(20_000)
63+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(3)
64+
} finally {
65+
clearInterval(interval)
66+
}
67+
})
68+
69+
it('does not extend the lock when no chatId is passed (backward compat)', async () => {
70+
const controller = new AbortController()
71+
const interval = startAbortPoller('stream-no-chat', controller, {})
72+
73+
try {
74+
await vi.advanceTimersByTimeAsync(90_000)
75+
expect(redisConfigMockFns.mockExtendLock).not.toHaveBeenCalled()
76+
} finally {
77+
clearInterval(interval)
78+
}
79+
})
80+
81+
it('retries on the next tick when extendLock throws (no 20s backoff)', async () => {
82+
const controller = new AbortController()
83+
const streamId = 'stream-retry'
84+
const chatId = 'chat-retry'
85+
86+
redisConfigMockFns.mockExtendLock.mockRejectedValueOnce(new Error('redis down'))
87+
88+
const interval = startAbortPoller(streamId, controller, { chatId })
89+
90+
try {
91+
await vi.advanceTimersByTimeAsync(20_000)
92+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(1)
93+
94+
await vi.advanceTimersByTimeAsync(1_000)
95+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(2)
96+
} finally {
97+
clearInterval(interval)
98+
}
99+
})
100+
101+
it('stops heartbeating after ownership is lost', async () => {
102+
const controller = new AbortController()
103+
const streamId = 'stream-lost'
104+
const chatId = 'chat-lost'
105+
106+
redisConfigMockFns.mockExtendLock.mockResolvedValueOnce(false)
107+
108+
const interval = startAbortPoller(streamId, controller, { chatId })
109+
110+
try {
111+
await vi.advanceTimersByTimeAsync(21_000)
112+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(1)
113+
114+
await vi.advanceTimersByTimeAsync(60_000)
115+
expect(redisConfigMockFns.mockExtendLock).toHaveBeenCalledTimes(1)
116+
} finally {
117+
clearInterval(interval)
118+
}
119+
})
120+
})

apps/sim/lib/copilot/request/session/abort.ts

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { AbortBackend } from '@/lib/copilot/generated/trace-attribute-values-v1'
55
import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1'
66
import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
77
import { withCopilotSpan } from '@/lib/copilot/request/otel'
8-
import { acquireLock, getRedisClient, releaseLock } from '@/lib/core/config/redis'
8+
import { acquireLock, extendLock, getRedisClient, releaseLock } from '@/lib/core/config/redis'
99
import { AbortReason } from './abort-reason'
1010
import { clearAbortMarker, hasAbortMarker, writeAbortMarker } from './buffer'
1111

@@ -18,7 +18,22 @@ const pendingChatStreams = new Map<
1818
>()
1919

2020
const DEFAULT_ABORT_POLL_MS = 1000
21-
const CHAT_STREAM_LOCK_TTL_SECONDS = 2 * 60 * 60
21+
22+
/**
23+
* TTL for the per-chat stream lock. Kept short so that if the Sim pod
24+
* holding the lock dies (SIGKILL, OOM, a SIGTERM drain that doesn't
25+
* reach the release path), the lock self-heals inside a minute rather
26+
* than stranding the chat for hours. A live stream keeps the lock alive
27+
* via `CHAT_STREAM_LOCK_HEARTBEAT_INTERVAL_MS` heartbeats.
28+
*/
29+
const CHAT_STREAM_LOCK_TTL_SECONDS = 60
30+
31+
/**
32+
* Heartbeat cadence for extending the per-chat stream lock. Set to a
33+
* third of the TTL so one missed beat still leaves room for recovery
34+
* before the lock expires under a still-live stream.
35+
*/
36+
const CHAT_STREAM_LOCK_HEARTBEAT_INTERVAL_MS = 20_000
2237

2338
function registerPendingChatStream(chatId: string, streamId: string): void {
2439
let resolve!: () => void
@@ -262,10 +277,14 @@ const pollingStreams = new Set<string>()
262277
export function startAbortPoller(
263278
streamId: string,
264279
abortController: AbortController,
265-
options?: { pollMs?: number; requestId?: string }
280+
options?: { pollMs?: number; requestId?: string; chatId?: string }
266281
): ReturnType<typeof setInterval> {
267282
const pollMs = options?.pollMs ?? DEFAULT_ABORT_POLL_MS
268283
const requestId = options?.requestId
284+
const chatId = options?.chatId
285+
286+
let lastHeartbeatAt = Date.now()
287+
let heartbeatOwnershipLost = false
269288

270289
return setInterval(() => {
271290
if (pollingStreams.has(streamId)) return
@@ -287,6 +306,33 @@ export function startAbortPoller(
287306
} finally {
288307
pollingStreams.delete(streamId)
289308
}
309+
310+
if (!chatId || heartbeatOwnershipLost) return
311+
if (Date.now() - lastHeartbeatAt < CHAT_STREAM_LOCK_HEARTBEAT_INTERVAL_MS) return
312+
313+
try {
314+
const owned = await extendLock(
315+
getChatStreamLockKey(chatId),
316+
streamId,
317+
CHAT_STREAM_LOCK_TTL_SECONDS
318+
)
319+
lastHeartbeatAt = Date.now()
320+
if (!owned) {
321+
heartbeatOwnershipLost = true
322+
logger.warn('Lost ownership of chat stream lock — stopping heartbeat', {
323+
chatId,
324+
streamId,
325+
...(requestId ? { requestId } : {}),
326+
})
327+
}
328+
} catch (error) {
329+
logger.warn('Failed to extend chat stream lock TTL', {
330+
chatId,
331+
streamId,
332+
...(requestId ? { requestId } : {}),
333+
error: toError(error).message,
334+
})
335+
}
290336
})()
291337
}, pollMs)
292338
}

apps/sim/lib/core/config/redis.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ vi.mock('ioredis', () => ({
1515

1616
import {
1717
closeRedisConnection,
18+
extendLock,
1819
getRedisClient,
1920
onRedisReconnect,
2021
resetForTesting,
@@ -120,6 +121,48 @@ describe('redis config', () => {
120121
})
121122
})
122123

124+
describe('extendLock', () => {
125+
const lockKey = 'copilot:chat-stream-lock:chat-1'
126+
const value = 'stream-abc'
127+
const ttlSeconds = 60
128+
129+
it('returns true when the caller still owns the lock and EXPIRE succeeds', async () => {
130+
mockRedisInstance.eval.mockResolvedValueOnce(1)
131+
132+
const extended = await extendLock(lockKey, value, ttlSeconds)
133+
134+
expect(extended).toBe(true)
135+
expect(mockRedisInstance.eval).toHaveBeenCalledWith(
136+
expect.stringContaining('expire'),
137+
1,
138+
lockKey,
139+
value,
140+
ttlSeconds
141+
)
142+
})
143+
144+
it('returns false when the value does not match (lock owned by another)', async () => {
145+
mockRedisInstance.eval.mockResolvedValueOnce(0)
146+
147+
const extended = await extendLock(lockKey, value, ttlSeconds)
148+
149+
expect(extended).toBe(false)
150+
})
151+
152+
it('returns true as a no-op when Redis is unavailable', async () => {
153+
vi.resetModules()
154+
vi.doMock('@/lib/core/config/env', () =>
155+
createEnvMock({ REDIS_URL: undefined as unknown as string })
156+
)
157+
const { extendLock: extendLockNoRedis } = await import('@/lib/core/config/redis')
158+
159+
const extended = await extendLockNoRedis(lockKey, value, ttlSeconds)
160+
161+
expect(extended).toBe(true)
162+
vi.doUnmock('@/lib/core/config/env')
163+
})
164+
})
165+
123166
describe('retryStrategy', () => {
124167
function captureRetryStrategy(): (times: number) => number {
125168
let capturedConfig: Record<string, unknown> = {}

apps/sim/lib/core/config/redis.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,21 @@ else
136136
end
137137
`
138138

139+
/**
140+
* Lua script for safe lock TTL extension.
141+
* Only refreshes the expiry if the value matches (ownership verification),
142+
* so a stale heartbeat from a prior owner cannot extend a lock currently
143+
* held by someone else after a TTL eviction.
144+
* Returns 1 if the TTL was extended, 0 if not (value mismatch or key gone).
145+
*/
146+
const EXTEND_LOCK_SCRIPT = `
147+
if redis.call("get", KEYS[1]) == ARGV[1] then
148+
return redis.call("expire", KEYS[1], ARGV[2])
149+
else
150+
return 0
151+
end
152+
`
153+
139154
/**
140155
* Acquire a distributed lock using Redis SET NX.
141156
* Returns true if lock acquired, false if already held.
@@ -175,6 +190,29 @@ export async function releaseLock(lockKey: string, value: string): Promise<boole
175190
return result === 1
176191
}
177192

193+
/**
194+
* Extend the TTL of a distributed lock if still owned by the caller.
195+
* Returns true if the caller still owns the lock and the TTL was refreshed,
196+
* false if the lock has been taken over by another owner or has expired.
197+
*
198+
* When Redis is not available, returns true (no-op) to match the behavior
199+
* of `acquireLock` / `releaseLock`: single-replica deployments without
200+
* Redis never held a real lock, so heartbeat success is implicit.
201+
*/
202+
export async function extendLock(
203+
lockKey: string,
204+
value: string,
205+
expirySeconds: number
206+
): Promise<boolean> {
207+
const redis = getRedisClient()
208+
if (!redis) {
209+
return true
210+
}
211+
212+
const result = await redis.eval(EXTEND_LOCK_SCRIPT, 1, lockKey, value, expirySeconds)
213+
return result === 1
214+
}
215+
178216
/**
179217
* Close the Redis connection.
180218
* Use for graceful shutdown.

packages/testing/src/mocks/redis-config.mock.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export const redisConfigMockFns = {
1717
mockOnRedisReconnect: vi.fn(),
1818
mockAcquireLock: vi.fn().mockResolvedValue(true),
1919
mockReleaseLock: vi.fn().mockResolvedValue(true),
20+
mockExtendLock: vi.fn().mockResolvedValue(true),
2021
mockCloseRedisConnection: vi.fn().mockResolvedValue(undefined),
2122
mockResetForTesting: vi.fn(),
2223
}
@@ -34,6 +35,7 @@ export const redisConfigMock = {
3435
onRedisReconnect: redisConfigMockFns.mockOnRedisReconnect,
3536
acquireLock: redisConfigMockFns.mockAcquireLock,
3637
releaseLock: redisConfigMockFns.mockReleaseLock,
38+
extendLock: redisConfigMockFns.mockExtendLock,
3739
closeRedisConnection: redisConfigMockFns.mockCloseRedisConnection,
3840
resetForTesting: redisConfigMockFns.mockResetForTesting,
3941
}

packages/testing/src/mocks/redis.mock.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ export function createMockRedis() {
5656
exec: vi.fn().mockResolvedValue([]),
5757
})),
5858

59+
// Scripting
60+
eval: vi.fn().mockResolvedValue(0),
61+
5962
// Connection
6063
ping: vi.fn().mockResolvedValue('PONG'),
6164
quit: vi.fn().mockResolvedValue('OK'),

0 commit comments

Comments
 (0)