Skip to content

Commit acedbf5

Browse files
authored
fix: Actor run cancellation handling and add tests for abort scenarios (#662)
* fix: Actor run cancellation handling and add tests for abort scenarios * fix: Enhance actor cancellation handling and add test for aborted signal scenario
1 parent 5ecfcc5 commit acedbf5

2 files changed

Lines changed: 154 additions & 12 deletions

File tree

src/tools/core/actor_execution.ts

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,34 +59,63 @@ export async function callActorGetDataset(options: {
5959
const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just an internal symbol to identify client abort
6060
const actorClient = apifyClient.actor(actorName);
6161

62+
const abortActorRun = async (runId: string) => {
63+
try {
64+
await apifyClient.run(runId).abort({ gracefully: false });
65+
} catch (e) {
66+
logHttpError(e, 'Error aborting Actor run', { runId });
67+
}
68+
};
69+
70+
// The Actor start request itself is not tied to our AbortSignal, so a client can cancel
71+
// before we even create the run. In that case we should exit without starting follow-up work.
72+
if (abortSignal?.aborted) {
73+
log.info('Actor run aborted by client before start', { actorName, mcpSessionId, input: redactSkyfirePayId(input) });
74+
return null;
75+
}
76+
6277
// Start the actor run
6378
const actorRun: ActorRun = await actorClient.start(input, callOptions);
6479

80+
// Cancellation can also arrive while actorClient.start() is still in flight. Once start()
81+
// returns we finally have a run ID, so we must immediately check again and abort that run
82+
// ourselves; otherwise the abort event is missed and the run continues in the background.
83+
if (abortSignal?.aborted) {
84+
await abortActorRun(actorRun.id);
85+
86+
log.info('Actor run aborted by client', { actorName, mcpSessionId, input: redactSkyfirePayId(input) });
87+
return null;
88+
}
89+
6590
// Start progress tracking if a tracker is provided
6691
if (progressTracker) {
6792
progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName);
6893
}
6994

70-
// Create abort promise that handles both API abort and race rejection
71-
const abortPromise = async () => new Promise<typeof CLIENT_ABORT>((resolve) => {
72-
abortSignal?.addEventListener('abort', async () => {
73-
// Abort the actor run via API
74-
try {
75-
await apifyClient.run(actorRun.id).abort({ gracefully: false });
76-
} catch (e) {
77-
logHttpError(e, 'Error aborting Actor run', { runId: actorRun.id });
78-
}
79-
// Reject to stop waiting
95+
// Resolve the race immediately on cancellation and abort the Actor run in the background.
96+
// If we waited for the abort API call to finish first, waitForFinish() could win the race
97+
// and the run might complete before we treat the request as cancelled.
98+
let abortListener: (() => void) | undefined;
99+
const abortPromise = new Promise<typeof CLIENT_ABORT>((resolve) => {
100+
abortListener = () => {
80101
resolve(CLIENT_ABORT);
81-
}, { once: true });
102+
void abortActorRun(actorRun.id);
103+
};
104+
abortSignal?.addEventListener('abort', abortListener, { once: true });
82105
});
83106

84107
// Wait for completion or cancellation
85108
const potentialAbortedRun = await Promise.race([
86109
apifyClient.run(actorRun.id).waitForFinish(),
87-
...(abortSignal ? [abortPromise()] : []),
110+
...(abortSignal ? [abortPromise] : []),
88111
]);
89112

113+
// Clean up the abort listener if waitForFinish() won the race, so a later signal
114+
// doesn't try to abort an already-completed run.
115+
if (abortListener) {
116+
abortSignal?.removeEventListener('abort', abortListener);
117+
}
118+
90119
if (potentialAbortedRun === CLIENT_ABORT) {
91120
log.info('Actor run aborted by client', { actorName, mcpSessionId, input: redactSkyfirePayId(input) });
92121
return null;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
import { callActorGetDataset } from '../../src/tools/core/actor_execution.js';
4+
5+
describe('callActorGetDataset', () => {
6+
it('should return null without starting a run when the signal is already aborted', async () => {
7+
const controller = new AbortController();
8+
controller.abort();
9+
10+
const start = vi.fn();
11+
12+
const apifyClient = {
13+
actor: vi.fn().mockReturnValue({ start }),
14+
};
15+
16+
const result = await callActorGetDataset({
17+
actorName: 'apify/rag-web-browser',
18+
input: { query: 'https://apify.com' },
19+
apifyClient: apifyClient as never,
20+
abortSignal: controller.signal,
21+
});
22+
23+
expect(result).toBeNull();
24+
expect(start).not.toHaveBeenCalled();
25+
});
26+
27+
it('should abort and return null when the signal is aborted during actor start', async () => {
28+
const controller = new AbortController();
29+
const actorRun = {
30+
id: 'run-123',
31+
defaultDatasetId: 'dataset-123',
32+
usageTotalUsd: 0,
33+
usageUsd: {},
34+
};
35+
36+
const abort = vi.fn().mockResolvedValue(undefined);
37+
const waitForFinish = vi.fn().mockResolvedValue(actorRun);
38+
const start = vi.fn().mockImplementation(async () => {
39+
controller.abort();
40+
return actorRun;
41+
});
42+
43+
const apifyClient = {
44+
actor: vi.fn().mockReturnValue({
45+
start,
46+
}),
47+
run: vi.fn().mockReturnValue({
48+
abort,
49+
waitForFinish,
50+
}),
51+
};
52+
53+
const result = await callActorGetDataset({
54+
actorName: 'apify/rag-web-browser',
55+
input: { query: 'https://apify.com' },
56+
apifyClient: apifyClient as never,
57+
abortSignal: controller.signal,
58+
});
59+
60+
expect(result).toBeNull();
61+
expect(abort).toHaveBeenCalledWith({ gracefully: false });
62+
expect(waitForFinish).not.toHaveBeenCalled();
63+
});
64+
65+
it('should return null immediately when cancelled after start even if API abort is slow', async () => {
66+
const controller = new AbortController();
67+
const actorRun = {
68+
id: 'run-456',
69+
defaultDatasetId: 'dataset-456',
70+
usageTotalUsd: 0,
71+
usageUsd: {},
72+
};
73+
74+
let resolveAbort: (() => void) | undefined;
75+
let resolveWaitForFinish: ((value: typeof actorRun) => void) | undefined;
76+
const abort = vi.fn().mockImplementation(async () => await new Promise<void>((resolve) => {
77+
resolveAbort = resolve;
78+
}));
79+
const waitForFinish = vi.fn().mockImplementation(async () => await new Promise<typeof actorRun>((resolve) => {
80+
resolveWaitForFinish = resolve;
81+
}));
82+
const start = vi.fn().mockResolvedValue(actorRun);
83+
const listItems = vi.fn().mockResolvedValue({ items: [], total: 0 });
84+
85+
const apifyClient = {
86+
actor: vi.fn().mockReturnValue({ start }),
87+
run: vi.fn().mockReturnValue({
88+
abort,
89+
waitForFinish,
90+
}),
91+
dataset: vi.fn().mockReturnValue({ listItems }),
92+
};
93+
94+
const resultPromise = callActorGetDataset({
95+
actorName: 'apify/rag-web-browser',
96+
input: { query: 'https://apify.com' },
97+
apifyClient: apifyClient as never,
98+
abortSignal: controller.signal,
99+
});
100+
101+
await vi.waitUntil(() => Boolean(resolveWaitForFinish));
102+
controller.abort();
103+
resolveWaitForFinish!(actorRun);
104+
105+
const result = await resultPromise;
106+
107+
expect(result).toBeNull();
108+
await vi.waitUntil(() => abort.mock.calls.length > 0);
109+
expect(abort).toHaveBeenCalledWith({ gracefully: false });
110+
expect(listItems).not.toHaveBeenCalled();
111+
resolveAbort!();
112+
});
113+
});

0 commit comments

Comments
 (0)