Skip to content

Commit 3514153

Browse files
icecrasher321Sg312
authored andcommitted
fix(concurrency): cleanup worker code
1 parent 5ca66c3 commit 3514153

68 files changed

Lines changed: 159 additions & 5857 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ docker compose -f docker-compose.prod.yml up -d
7474

7575
Open [http://localhost:3000](http://localhost:3000)
7676

77-
#### Background worker note
78-
79-
The Docker Compose stack starts a dedicated worker container by default. If `REDIS_URL` is not configured, the worker will start, log that it is idle, and do no queue processing. This is expected. Queue-backed API, webhook, and schedule execution requires Redis; installs without Redis continue to use the inline execution path.
80-
8177
Sim also supports local models via [Ollama](https://ollama.ai) and [vLLM](https://docs.vllm.ai/) — see the [Docker self-hosting docs](https://docs.sim.ai/self-hosting/docker) for setup details.
8278

8379
### Self-hosted: Manual Setup
@@ -123,12 +119,10 @@ cd packages/db && bun run db:migrate
123119
5. Start development servers:
124120

125121
```bash
126-
bun run dev:full # Starts Next.js app, realtime socket server, and the BullMQ worker
122+
bun run dev:full # Starts Next.js app and realtime socket server
127123
```
128124

129-
If `REDIS_URL` is not configured, the worker will remain idle and execution continues inline.
130-
131-
Or run separately: `bun run dev` (Next.js), `cd apps/sim && bun run dev:sockets` (realtime), and `cd apps/sim && bun run worker` (BullMQ worker).
125+
Or run separately: `bun run dev` (Next.js) and `cd apps/sim && bun run dev:sockets` (realtime).
132126

133127
## Copilot API Keys
134128

apps/docs/content/docs/en/execution/costs.mdx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -195,17 +195,6 @@ By default, your usage is capped at the credits included in your plan. To allow
195195

196196
Max (individual) shares the same rate limits as team plans. Team plans (Pro or Max for Teams) use the Max-tier rate limits.
197197

198-
### Concurrent Execution Limits
199-
200-
| Plan | Concurrent Executions |
201-
|------|----------------------|
202-
| **Free** | 5 |
203-
| **Pro** | 50 |
204-
| **Max / Team** | 200 |
205-
| **Enterprise** | 200 (customizable) |
206-
207-
Concurrent execution limits control how many workflow executions can run simultaneously within a workspace. When the limit is reached, new executions are queued and admitted as running executions complete. Manual runs from the editor are not subject to these limits.
208-
209198
### File Storage
210199

211200
| Plan | Storage |

apps/sim/app/api/jobs/[jobId]/route.test.ts

Lines changed: 55 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,13 @@
44
import type { NextRequest } from 'next/server'
55
import { beforeEach, describe, expect, it, vi } from 'vitest'
66

7-
const {
8-
mockCheckHybridAuth,
9-
mockGetDispatchJobRecord,
10-
mockGetJobQueue,
11-
mockVerifyWorkflowAccess,
12-
mockGetWorkflowById,
13-
} = vi.hoisted(() => ({
14-
mockCheckHybridAuth: vi.fn(),
15-
mockGetDispatchJobRecord: vi.fn(),
16-
mockGetJobQueue: vi.fn(),
17-
mockVerifyWorkflowAccess: vi.fn(),
18-
mockGetWorkflowById: vi.fn(),
19-
}))
7+
const { mockCheckHybridAuth, mockGetJobQueue, mockVerifyWorkflowAccess, mockGetWorkflowById } =
8+
vi.hoisted(() => ({
9+
mockCheckHybridAuth: vi.fn(),
10+
mockGetJobQueue: vi.fn(),
11+
mockVerifyWorkflowAccess: vi.fn(),
12+
mockGetWorkflowById: vi.fn(),
13+
}))
2014

2115
vi.mock('@sim/logger', () => ({
2216
createLogger: () => ({
@@ -32,19 +26,9 @@ vi.mock('@/lib/auth/hybrid', () => ({
3226
}))
3327

3428
vi.mock('@/lib/core/async-jobs', () => ({
35-
JOB_STATUS: {
36-
PENDING: 'pending',
37-
PROCESSING: 'processing',
38-
COMPLETED: 'completed',
39-
FAILED: 'failed',
40-
},
4129
getJobQueue: mockGetJobQueue,
4230
}))
4331

44-
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
45-
getDispatchJobRecord: mockGetDispatchJobRecord,
46-
}))
47-
4832
vi.mock('@/lib/core/utils/request', () => ({
4933
generateRequestId: vi.fn().mockReturnValue('request-1'),
5034
}))
@@ -89,72 +73,78 @@ describe('GET /api/jobs/[jobId]', () => {
8973
})
9074
})
9175

92-
it('returns dispatcher-aware waiting status with metadata', async () => {
93-
mockGetDispatchJobRecord.mockResolvedValue({
94-
id: 'dispatch-1',
95-
workspaceId: 'workspace-1',
96-
lane: 'runtime',
97-
queueName: 'workflow-execution',
98-
bullmqJobName: 'workflow-execution',
99-
bullmqPayload: {},
100-
metadata: {
101-
workflowId: 'workflow-1',
102-
},
103-
priority: 10,
104-
status: 'waiting',
105-
createdAt: 1000,
106-
admittedAt: 2000,
76+
it('returns pending status for a queued job', async () => {
77+
mockGetJobQueue.mockResolvedValue({
78+
getJob: vi.fn().mockResolvedValue({
79+
id: 'job-1',
80+
type: 'workflow-execution',
81+
payload: {},
82+
status: 'pending',
83+
createdAt: new Date('2025-01-01T00:00:00Z'),
84+
attempts: 0,
85+
maxAttempts: 1,
86+
metadata: {
87+
workflowId: 'workflow-1',
88+
},
89+
}),
10790
})
10891

10992
const response = await GET(createMockRequest(), {
110-
params: Promise.resolve({ jobId: 'dispatch-1' }),
93+
params: Promise.resolve({ jobId: 'job-1' }),
11194
})
11295
const body = await response.json()
11396

11497
expect(response.status).toBe(200)
115-
expect(body.status).toBe('waiting')
116-
expect(body.metadata.queueName).toBe('workflow-execution')
117-
expect(body.metadata.lane).toBe('runtime')
118-
expect(body.metadata.workspaceId).toBe('workspace-1')
98+
expect(body.status).toBe('pending')
11999
})
120100

121-
it('returns completed output from dispatch state', async () => {
122-
mockGetDispatchJobRecord.mockResolvedValue({
123-
id: 'dispatch-2',
124-
workspaceId: 'workspace-1',
125-
lane: 'interactive',
126-
queueName: 'workflow-execution',
127-
bullmqJobName: 'direct-workflow-execution',
128-
bullmqPayload: {},
129-
metadata: {
130-
workflowId: 'workflow-1',
131-
},
132-
priority: 1,
133-
status: 'completed',
134-
createdAt: 1000,
135-
startedAt: 2000,
136-
completedAt: 7000,
137-
output: { success: true },
101+
it('returns completed output from job', async () => {
102+
mockGetJobQueue.mockResolvedValue({
103+
getJob: vi.fn().mockResolvedValue({
104+
id: 'job-2',
105+
type: 'workflow-execution',
106+
payload: {},
107+
status: 'completed',
108+
createdAt: new Date('2025-01-01T00:00:00Z'),
109+
startedAt: new Date('2025-01-01T00:00:01Z'),
110+
completedAt: new Date('2025-01-01T00:00:06Z'),
111+
attempts: 1,
112+
maxAttempts: 1,
113+
output: { success: true },
114+
metadata: {
115+
workflowId: 'workflow-1',
116+
},
117+
}),
138118
})
139119

140120
const response = await GET(createMockRequest(), {
141-
params: Promise.resolve({ jobId: 'dispatch-2' }),
121+
params: Promise.resolve({ jobId: 'job-2' }),
142122
})
143123
const body = await response.json()
144124

145125
expect(response.status).toBe(200)
146126
expect(body.status).toBe('completed')
147127
expect(body.output).toEqual({ success: true })
148-
expect(body.metadata.duration).toBe(5000)
149128
})
150129

151-
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
152-
mockGetDispatchJobRecord.mockResolvedValue(null)
153-
130+
it('returns 404 when job does not exist', async () => {
154131
const response = await GET(createMockRequest(), {
155132
params: Promise.resolve({ jobId: 'missing-job' }),
156133
})
157134

158135
expect(response.status).toBe(404)
159136
})
137+
138+
it('returns 401 for unauthenticated requests', async () => {
139+
mockCheckHybridAuth.mockResolvedValue({
140+
success: false,
141+
error: 'Not authenticated',
142+
})
143+
144+
const response = await GET(createMockRequest(), {
145+
params: Promise.resolve({ jobId: 'job-1' }),
146+
})
147+
148+
expect(response.status).toBe(401)
149+
})
160150
})

apps/sim/app/api/jobs/[jobId]/route.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,27 @@ import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { getJobQueue } from '@/lib/core/async-jobs'
5+
import type { Job } from '@/lib/core/async-jobs/types'
56
import { generateRequestId } from '@/lib/core/utils/request'
6-
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
7-
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
87
import { createErrorResponse } from '@/app/api/workflows/utils'
98

109
const logger = createLogger('TaskStatusAPI')
1110

11+
function presentJobStatus(job: Job) {
12+
return {
13+
status: job.status,
14+
metadata: {
15+
createdAt: job.createdAt.toISOString(),
16+
startedAt: job.startedAt?.toISOString(),
17+
completedAt: job.completedAt?.toISOString(),
18+
attempts: job.attempts,
19+
maxAttempts: job.maxAttempts,
20+
},
21+
output: job.output,
22+
error: job.error,
23+
}
24+
}
25+
1226
export async function GET(
1327
request: NextRequest,
1428
{ params }: { params: Promise<{ jobId: string }> }
@@ -25,15 +39,14 @@ export async function GET(
2539

2640
const authenticatedUserId = authResult.userId
2741

28-
const dispatchJob = await getDispatchJobRecord(taskId)
2942
const jobQueue = await getJobQueue()
30-
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
43+
const job = await jobQueue.getJob(taskId)
3144

32-
if (!job && !dispatchJob) {
45+
if (!job) {
3346
return createErrorResponse('Task not found', 404)
3447
}
3548

36-
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
49+
const metadataToCheck = job.metadata
3750

3851
if (metadataToCheck?.workflowId) {
3952
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
@@ -61,7 +74,7 @@ export async function GET(
6174
return createErrorResponse('Access denied', 403)
6275
}
6376

64-
const presented = presentDispatchOrJobStatus(dispatchJob, job)
77+
const presented = presentJobStatus(job)
6578
const response: any = {
6679
success: true,
6780
taskId,
@@ -71,9 +84,6 @@ export async function GET(
7184

7285
if (presented.output !== undefined) response.output = presented.output
7386
if (presented.error !== undefined) response.error = presented.error
74-
if (presented.estimatedDuration !== undefined) {
75-
response.estimatedDuration = presented.estimatedDuration
76-
}
7787

7888
return NextResponse.json(response)
7989
} catch (error: any) {

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const {
1414
mockDbReturning,
1515
mockDbUpdate,
1616
mockEnqueue,
17-
mockEnqueueWorkspaceDispatch,
1817
mockStartJob,
1918
mockCompleteJob,
2019
mockMarkJobFailed,
@@ -24,7 +23,6 @@ const {
2423
const mockDbSet = vi.fn().mockReturnValue({ where: mockDbWhere })
2524
const mockDbUpdate = vi.fn().mockReturnValue({ set: mockDbSet })
2625
const mockEnqueue = vi.fn().mockResolvedValue('job-id-1')
27-
const mockEnqueueWorkspaceDispatch = vi.fn().mockResolvedValue('job-id-1')
2826
const mockStartJob = vi.fn().mockResolvedValue(undefined)
2927
const mockCompleteJob = vi.fn().mockResolvedValue(undefined)
3028
const mockMarkJobFailed = vi.fn().mockResolvedValue(undefined)
@@ -42,7 +40,6 @@ const {
4240
mockDbReturning,
4341
mockDbUpdate,
4442
mockEnqueue,
45-
mockEnqueueWorkspaceDispatch,
4643
mockStartJob,
4744
mockCompleteJob,
4845
mockMarkJobFailed,
@@ -75,15 +72,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
7572
shouldExecuteInline: vi.fn().mockReturnValue(false),
7673
}))
7774

78-
vi.mock('@/lib/core/bullmq', () => ({
79-
isBullMQEnabled: vi.fn().mockReturnValue(true),
80-
createBullMQJobData: vi.fn((payload: unknown) => ({ payload })),
81-
}))
82-
83-
vi.mock('@/lib/core/workspace-dispatch', () => ({
84-
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
85-
}))
86-
8775
vi.mock('@/lib/workflows/utils', () => ({
8876
getWorkflowById: vi.fn().mockResolvedValue({
8977
id: 'workflow-1',
@@ -250,29 +238,19 @@ describe('Scheduled Workflow Execution API Route', () => {
250238
expect(data).toHaveProperty('executedCount', 2)
251239
})
252240

253-
it('should queue mothership jobs to BullMQ when available', async () => {
241+
it('should execute mothership jobs inline', async () => {
254242
mockDbReturning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
255243

256244
const response = await GET(createMockRequest())
257245

258246
expect(response.status).toBe(200)
259-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
247+
expect(mockExecuteJobInline).toHaveBeenCalledWith(
260248
expect.objectContaining({
261-
workspaceId: 'workspace-1',
262-
lane: 'runtime',
263-
queueName: 'mothership-job-execution',
264-
bullmqJobName: 'mothership-job-execution',
265-
bullmqPayload: {
266-
payload: {
267-
scheduleId: 'job-1',
268-
cronExpression: '0 * * * *',
269-
failedCount: 0,
270-
now: expect.any(String),
271-
},
272-
},
249+
scheduleId: 'job-1',
250+
cronExpression: '0 * * * *',
251+
failedCount: 0,
273252
})
274253
)
275-
expect(mockExecuteJobInline).not.toHaveBeenCalled()
276254
})
277255

278256
it('should enqueue preassigned correlation metadata for schedules', async () => {
@@ -281,25 +259,23 @@ describe('Scheduled Workflow Execution API Route', () => {
281259
const response = await GET(createMockRequest())
282260

283261
expect(response.status).toBe(200)
284-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
262+
expect(mockEnqueue).toHaveBeenCalledWith(
263+
'schedule-execution',
264+
expect.objectContaining({
265+
scheduleId: 'schedule-1',
266+
workflowId: 'workflow-1',
267+
executionId: 'schedule-execution-1',
268+
}),
285269
expect.objectContaining({
286-
id: 'schedule-execution-1',
287-
workspaceId: 'workspace-1',
288-
lane: 'runtime',
289-
queueName: 'schedule-execution',
290-
bullmqJobName: 'schedule-execution',
291-
metadata: {
270+
metadata: expect.objectContaining({
292271
workflowId: 'workflow-1',
293-
correlation: {
272+
correlation: expect.objectContaining({
294273
executionId: 'schedule-execution-1',
295274
requestId: 'test-request-id',
296275
source: 'schedule',
297276
workflowId: 'workflow-1',
298-
scheduleId: 'schedule-1',
299-
triggerType: 'schedule',
300-
scheduledFor: '2025-01-01T00:00:00.000Z',
301-
},
302-
},
277+
}),
278+
}),
303279
})
304280
)
305281
})

0 commit comments

Comments
 (0)