Skip to content

Commit 73d6465

Browse files
MQ37claudejirispilka
authored
feat: propagate task statusMessage to tasks/get and tasks/list (#556)
* feat: Pass ActorRun.statusMessage into task statusMessage When an Actor run's statusMessage changes during long-running async task execution, update the task store's statusMessage via updateTaskStatus. This allows clients polling tasks/get to see real-time Actor progress. Closes #555 https://claude.ai/code/session_011o5nwJS86T6dkG889LaefQ * chore: Update package-lock.json https://claude.ai/code/session_011o5nwJS86T6dkG889LaefQ * feat: Poll Actor run status and update task statusMessage even without progressToken Refactor ProgressTracker to support working without a progressToken. When in task mode, the Actor run status is polled every 5s and the task store statusMessage is updated even if the client didn't provide a progressToken for progress notifications. Closes #555 https://claude.ai/code/session_011o5nwJS86T6dkG889LaefQ * feat: Send notifications/tasks/status when Actor run statusMessage changes After updating the task store, also send the MCP-spec notifications/tasks/status notification so clients get real-time task status updates without polling tasks/get. https://claude.ai/code/session_011o5nwJS86T6dkG889LaefQ * fix: propagate statusMessage to internal tools in task mode and add integration test The onStatusMessage callback was only wired up for 'actor' type tools but not for 'internal' tools like call-actor. This meant tasks/get, tasks/list, and notifications/tasks/status never included statusMessage for internal tool tasks. - Extract onStatusMessage callback above both tool type branches so it is shared - Fix streamable HTTP test URL from /mcp to / after route change - Add integration test verifying statusMessage in tasks/get, tasks/list, and notifications/tasks/status across all transports * fix: simplify statusMessage propagation — store only, no notifications - Remove notifications/tasks/status sending from onStatusMessage callback; clients retrieve statusMessage via tasks/get and tasks/list polling - Remove try/catch wrappers in updateProgress — errors propagate to the interval-level catch in startActorRunUpdates - Remove onStatusMessage unit tests (covered by integration test) - Tighten integration test: check stream (tasks/get) and tasks/list separately - Add TODO for future notifications/tasks/status push support * fix: restore try/catch in updateProgress, add actor tool task statusMessage test * docs: add flaky test warning comments linking to #558 * fix: Review feedback for PR #556 (#568) fix: wrap internal tool branch in try/finally, dedupe tests, add onStatusMessage unit tests - Wrap internal tool `tool.call()` in try/finally to ensure `progressTracker.stop()` is always called, matching the actor tool branch pattern. Prevents leaking the polling interval if the tool throws. - Extract `assertStatusMessagePropagated()` helper to deduplicate the two nearly identical statusMessage integration tests. - Add unit tests for the `onStatusMessage` callback: called with message, not called when message is undefined, and errors are handled gracefully. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Jiří Spilka <jiri.spilka@apify.com>
1 parent 64b4dff commit 73d6465

5 files changed

Lines changed: 202 additions & 83 deletions

File tree

package-lock.json

Lines changed: 0 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/mcp/server.ts

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,43 +1013,52 @@ Please verify the tool name and ensure the tool is properly registered.`;
10131013
}
10141014
}
10151015

1016+
// Callback to propagate Actor run statusMessage into the task store.
1017+
// Clients retrieve it via tasks/get and tasks/list polling.
1018+
// TODO: Also send notifications/tasks/status so clients get real-time push updates
1019+
const onStatusMessage = async (message: string) => {
1020+
await this.taskStore.updateTaskStatus(taskId, 'working', message, mcpSessionId);
1021+
};
1022+
10161023
// Handle internal tool execution in task mode
10171024
if (toolStatus === TOOL_STATUS.SUCCEEDED && tool.type === 'internal') {
1018-
const progressTracker = createProgressTracker(progressToken, extra.sendNotification, taskId);
1019-
1020-
log.info('Calling internal tool for task', { taskId, name: tool.name, mcpSessionId, input: redactSkyfirePayId(args) });
1021-
const res = await tool.call({
1022-
args,
1023-
extra,
1024-
apifyMcpServer: this,
1025-
mcpServer: this.server,
1026-
apifyToken,
1027-
userRentedActorIds,
1028-
progressTracker,
1029-
mcpSessionId,
1030-
}) as object;
1025+
const progressTracker = createProgressTracker(progressToken, extra.sendNotification, taskId, onStatusMessage);
10311026

1032-
if (progressTracker) {
1033-
progressTracker.stop();
1034-
}
1027+
try {
1028+
log.info('Calling internal tool for task', { taskId, name: tool.name, mcpSessionId, input: redactSkyfirePayId(args) });
1029+
const res = await tool.call({
1030+
args,
1031+
extra,
1032+
apifyMcpServer: this,
1033+
mcpServer: this.server,
1034+
apifyToken,
1035+
userRentedActorIds,
1036+
progressTracker,
1037+
mcpSessionId,
1038+
}) as object;
10351039

1036-
// If tool returned internalToolStatus, use it; otherwise infer from isError flag
1037-
const { internalToolStatus, ...rest } = res as { internalToolStatus?: ToolStatus; isError?: boolean };
1038-
if (internalToolStatus !== undefined) {
1039-
toolStatus = internalToolStatus;
1040-
} else if ('isError' in rest && rest.isError) {
1041-
toolStatus = TOOL_STATUS.FAILED;
1042-
} else {
1043-
toolStatus = TOOL_STATUS.SUCCEEDED;
1044-
}
1040+
// If the tool returned internalToolStatus, use it; otherwise infer from isError flag
1041+
const { internalToolStatus, ...rest } = res as { internalToolStatus?: ToolStatus; isError?: boolean };
1042+
if (internalToolStatus !== undefined) {
1043+
toolStatus = internalToolStatus;
1044+
} else if ('isError' in rest && rest.isError) {
1045+
toolStatus = TOOL_STATUS.FAILED;
1046+
} else {
1047+
toolStatus = TOOL_STATUS.SUCCEEDED;
1048+
}
10451049

1046-
// Never expose internalToolStatus to MCP clients
1047-
result = rest;
1050+
// Never expose internalToolStatus to MCP clients
1051+
result = rest;
1052+
} finally {
1053+
if (progressTracker) {
1054+
progressTracker.stop();
1055+
}
1056+
}
10481057
}
10491058

10501059
// Handle actor tool execution in task mode
10511060
if (toolStatus === TOOL_STATUS.SUCCEEDED && tool.type === 'actor') {
1052-
const progressTracker = createProgressTracker(progressToken, extra.sendNotification, taskId);
1061+
const progressTracker = createProgressTracker(progressToken, extra.sendNotification, taskId, onStatusMessage);
10531062
const { 'skyfire-pay-id': _skyfirePayId, ...actorArgs } = args as Record<string, unknown>;
10541063
const apifyClient = createApifyClientWithSkyfireSupport(this, args, apifyToken);
10551064

src/utils/progress.ts

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,61 @@ import type { ApifyClient } from '../apify_client.js';
44
import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js';
55

66
export class ProgressTracker {
7-
private progressToken: string | number;
8-
private sendNotification: (notification: ProgressNotification) => Promise<void>;
7+
private progressToken?: string | number;
8+
private sendNotification?: (notification: ProgressNotification) => Promise<void>;
99
private currentProgress = 0;
1010
private intervalId?: NodeJS.Timeout;
1111
private taskId?: string;
12-
13-
constructor(
14-
progressToken: string | number,
15-
sendNotification: (notification: ProgressNotification) => Promise<void>,
16-
taskId?: string,
17-
) {
18-
this.progressToken = progressToken;
19-
this.sendNotification = sendNotification;
20-
this.taskId = taskId;
12+
private onStatusMessage?: (message: string) => Promise<void>;
13+
14+
constructor(options: {
15+
progressToken?: string | number;
16+
sendNotification?: (notification: ProgressNotification) => Promise<void>;
17+
taskId?: string;
18+
onStatusMessage?: (message: string) => Promise<void>;
19+
}) {
20+
this.progressToken = options.progressToken;
21+
this.sendNotification = options.sendNotification;
22+
this.taskId = options.taskId;
23+
this.onStatusMessage = options.onStatusMessage;
2124
}
2225

2326
async updateProgress(message?: string): Promise<void> {
2427
this.currentProgress += 1;
2528

26-
try {
27-
const notification: ProgressNotification = {
28-
method: 'notifications/progress' as const,
29-
params: {
30-
progressToken: this.progressToken,
31-
progress: this.currentProgress,
32-
...(message && { message }),
33-
},
34-
// Per MCP spec: progress notifications during task execution should include related-task metadata
35-
...(this.taskId && {
36-
_meta: {
37-
'io.modelcontextprotocol/related-task': {
38-
taskId: this.taskId,
39-
},
29+
// Send progress notification only if progressToken and sendNotification are available
30+
if (this.progressToken && this.sendNotification) {
31+
try {
32+
const notification: ProgressNotification = {
33+
method: 'notifications/progress' as const,
34+
params: {
35+
progressToken: this.progressToken,
36+
progress: this.currentProgress,
37+
...(message && { message }),
4038
},
41-
}),
42-
};
39+
// Per MCP spec: progress notifications during task execution should include related-task metadata
40+
...(this.taskId && {
41+
_meta: {
42+
'io.modelcontextprotocol/related-task': {
43+
taskId: this.taskId,
44+
},
45+
},
46+
}),
47+
};
4348

44-
await this.sendNotification(notification);
45-
} catch {
46-
// Silent fail - don't break execution
49+
await this.sendNotification(notification);
50+
} catch {
51+
// Silent fail - don't break execution
52+
}
53+
}
54+
55+
// Update task statusMessage if callback is provided
56+
if (this.onStatusMessage && message) {
57+
try {
58+
await this.onStatusMessage(message);
59+
} catch {
60+
// Silent fail - don't break execution
61+
}
4762
}
4863
}
4964

@@ -93,10 +108,12 @@ export function createProgressTracker(
93108
progressToken: string | number | undefined,
94109
sendNotification: ((notification: ProgressNotification) => Promise<void>) | undefined,
95110
taskId?: string,
111+
onStatusMessage?: (message: string) => Promise<void>,
96112
): ProgressTracker | null {
97-
if (!progressToken || !sendNotification) {
113+
// Create tracker if we have either progress notification support or a status message callback
114+
if ((!progressToken || !sendNotification) && !onStatusMessage) {
98115
return null;
99116
}
100117

101-
return new ProgressTracker(progressToken, sendNotification, taskId);
118+
return new ProgressTracker({ progressToken, sendNotification, taskId, onStatusMessage });
102119
}

0 commit comments

Comments
 (0)