Skip to content

Commit e613c8e

Browse files
committed
fix(cleanup): batch orphaned snapshot deletes to avoid slow-query spike (#4348)
* fix(cleanup): batch orphaned snapshot deletes to avoid slow-query spike * fix(cleanup): recheck orphan condition in delete to close TOCTOU gap
1 parent 7315304 commit e613c8e

3 files changed

Lines changed: 120 additions & 16 deletions

File tree

apps/sim/lib/logs/execution/snapshot/service.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,4 +533,73 @@ describe('SnapshotService', () => {
533533
expect(databaseMock.db.select).not.toHaveBeenCalled()
534534
})
535535
})
536+
537+
describe('cleanupOrphanedSnapshots', () => {
538+
function setupCleanupMocks(selectBatches: Array<Array<{ id: string }>>) {
539+
const limitFn = vi.fn()
540+
for (const batch of selectBatches) limitFn.mockResolvedValueOnce(batch)
541+
limitFn.mockResolvedValue([])
542+
const whereSelect = vi.fn().mockReturnValue({ limit: limitFn })
543+
const fromFn = vi.fn().mockReturnValue({ where: whereSelect })
544+
databaseMock.db.select = vi.fn().mockReturnValue({ from: fromFn })
545+
546+
const returningFn = vi.fn().mockImplementation(() => Promise.resolve([]))
547+
const whereDelete = vi.fn().mockReturnValue({ returning: returningFn })
548+
let batchIdx = 0
549+
const deleteFn = vi.fn().mockImplementation(() => {
550+
const batch = selectBatches[batchIdx] ?? []
551+
batchIdx++
552+
returningFn.mockImplementationOnce(() => Promise.resolve(batch.map((r) => ({ id: r.id }))))
553+
return { where: whereDelete }
554+
})
555+
databaseMock.db.delete = deleteFn
556+
557+
return { deleteFn }
558+
}
559+
560+
it('returns 0 and skips delete when nothing is orphaned', async () => {
561+
const service = new SnapshotService()
562+
const { deleteFn } = setupCleanupMocks([])
563+
564+
const count = await service.cleanupOrphanedSnapshots(7)
565+
566+
expect(count).toBe(0)
567+
expect(deleteFn).not.toHaveBeenCalled()
568+
})
569+
570+
it('stops after the first short batch', async () => {
571+
const service = new SnapshotService()
572+
const partial = Array.from({ length: 3 }, (_, i) => ({ id: `s${i}` }))
573+
const { deleteFn } = setupCleanupMocks([partial])
574+
575+
const count = await service.cleanupOrphanedSnapshots(7)
576+
577+
expect(count).toBe(3)
578+
expect(deleteFn).toHaveBeenCalledTimes(1)
579+
})
580+
581+
it('loops through multiple full batches until exhausted', async () => {
582+
const service = new SnapshotService()
583+
const fullBatch = Array.from({ length: 1000 }, (_, i) => ({ id: `s${i}` }))
584+
const tail = [{ id: 'tail-1' }]
585+
const { deleteFn } = setupCleanupMocks([fullBatch, fullBatch, tail])
586+
587+
const count = await service.cleanupOrphanedSnapshots(7)
588+
589+
expect(count).toBe(2001)
590+
expect(deleteFn).toHaveBeenCalledTimes(3)
591+
})
592+
593+
it('caps at MAX_BATCHES (20 × 1000) even when more rows remain', async () => {
594+
const service = new SnapshotService()
595+
const fullBatch = Array.from({ length: 1000 }, (_, i) => ({ id: `s${i}` }))
596+
const batches = Array.from({ length: 25 }, () => fullBatch)
597+
const { deleteFn } = setupCleanupMocks(batches)
598+
599+
const count = await service.cleanupOrphanedSnapshots(7)
600+
601+
expect(count).toBe(20_000)
602+
expect(deleteFn).toHaveBeenCalledTimes(20)
603+
})
604+
})
536605
})

apps/sim/lib/logs/execution/snapshot/service.ts

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schem
33
import { createLogger } from '@sim/logger'
44
import { sha256Hex } from '@sim/security/hash'
55
import { generateId } from '@sim/utils/id'
6-
import { and, eq, lt, notExists, sql } from 'drizzle-orm'
6+
import { and, eq, inArray, lt, notExists, sql } from 'drizzle-orm'
77
import type {
88
SnapshotService as ISnapshotService,
99
SnapshotCreationResult,
@@ -92,24 +92,57 @@ export class SnapshotService implements ISnapshotService {
9292
const cutoffDate = new Date()
9393
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays)
9494

95-
const deletedSnapshots = await db
96-
.delete(workflowExecutionSnapshots)
97-
.where(
98-
and(
99-
lt(workflowExecutionSnapshots.createdAt, cutoffDate),
100-
notExists(
101-
db
102-
.select({ id: workflowExecutionLogs.id })
103-
.from(workflowExecutionLogs)
104-
.where(eq(workflowExecutionLogs.stateSnapshotId, workflowExecutionSnapshots.id))
95+
const BATCH_SIZE = 1000
96+
const MAX_BATCHES = 20
97+
98+
let totalDeleted = 0
99+
let stoppedEarly = false
100+
101+
for (let batch = 0; batch < MAX_BATCHES; batch++) {
102+
const candidates = await db
103+
.select({ id: workflowExecutionSnapshots.id })
104+
.from(workflowExecutionSnapshots)
105+
.where(
106+
and(
107+
lt(workflowExecutionSnapshots.createdAt, cutoffDate),
108+
notExists(
109+
db
110+
.select({ one: sql`1` })
111+
.from(workflowExecutionLogs)
112+
.where(eq(workflowExecutionLogs.stateSnapshotId, workflowExecutionSnapshots.id))
113+
)
105114
)
106115
)
107-
)
108-
.returning({ id: workflowExecutionSnapshots.id })
116+
.limit(BATCH_SIZE)
117+
118+
if (candidates.length === 0) break
119+
120+
const ids = candidates.map((c) => c.id)
121+
const deleted = await db
122+
.delete(workflowExecutionSnapshots)
123+
.where(
124+
and(
125+
inArray(workflowExecutionSnapshots.id, ids),
126+
notExists(
127+
db
128+
.select({ one: sql`1` })
129+
.from(workflowExecutionLogs)
130+
.where(eq(workflowExecutionLogs.stateSnapshotId, workflowExecutionSnapshots.id))
131+
)
132+
)
133+
)
134+
.returning({ id: workflowExecutionSnapshots.id })
109135

110-
const deletedCount = deletedSnapshots.length
111-
logger.info(`Cleaned up ${deletedCount} orphaned snapshots older than ${olderThanDays} days`)
112-
return deletedCount
136+
totalDeleted += deleted.length
137+
138+
if (candidates.length < BATCH_SIZE) break
139+
if (batch === MAX_BATCHES - 1) stoppedEarly = true
140+
}
141+
142+
logger.info(
143+
`Cleaned up ${totalDeleted} orphaned snapshots older than ${olderThanDays} days${stoppedEarly ? ' (batch cap reached, remainder deferred to next run)' : ''}`
144+
)
145+
return totalDeleted
113146
}
114147
}
115148

packages/testing/src/mocks/database.mock.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export function createMockSqlOperators() {
4949
isNotNull: vi.fn((column) => ({ type: 'isNotNull', column })),
5050
inArray: vi.fn((column, values) => ({ type: 'inArray', column, values })),
5151
notInArray: vi.fn((column, values) => ({ type: 'notInArray', column, values })),
52+
exists: vi.fn((subquery) => ({ type: 'exists', subquery })),
53+
notExists: vi.fn((subquery) => ({ type: 'notExists', subquery })),
5254
like: vi.fn((column, pattern) => ({ type: 'like', column, pattern })),
5355
ilike: vi.fn((column, pattern) => ({ type: 'ilike', column, pattern })),
5456
desc: vi.fn((column) => ({ type: 'desc', column })),

0 commit comments

Comments
 (0)