Skip to content

KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (2/N)#22187

Open
AndrewJSchofield wants to merge 1 commit intoapache:trunkfrom
AndrewJSchofield:KAFKA-20246-3
Open

KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (2/N)#22187
AndrewJSchofield wants to merge 1 commit intoapache:trunkfrom
AndrewJSchofield:KAFKA-20246-3

Conversation

@AndrewJSchofield
Copy link
Copy Markdown
Member

@AndrewJSchofield AndrewJSchofield commented Apr 29, 2026

Part of the implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1242%3A+Detection+and+handling+of+misrouted+connections.

Adds cluster ID and node ID to the ApiVersionsRequest and checks it on
the broker.

Reviewers: Chia-Ping Tsai chia7712@gmail.com, Ming-Yen Chung
mingyen066@gmail.com

@github-actions github-actions Bot added core Kafka Broker producer consumer tools connect storage Pull requests that target the storage module clients labels Apr 29, 2026
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield thanks for this cool feature!

int nodeId = Integer.parseInt(node);
// When connecting to coordinators, the client uses large positive node ID
// values which do not match the target broker's node ID. Exclude those.
if (clusterId != null && nodeId > 0 && nodeId < Integer.MAX_VALUE / 2) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be nodeId >= 0?

public boolean isValid() {
if (version() >= 5) {
// Either cluster ID and node ID are both specified, or neither is.
if ((data.clusterId() == null && data.nodeId() != -1) || (data.clusterId() != null && data.nodeId() == -1)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document in ApiVersionsRequest.json that both fields must be defined together?

final String node = req.destination;
if (apiVersionsResponse.data().errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && apiVersionsResponse.data().errorCode() == Errors.REBOOTSTRAP_REQUIRED.code()) {
Copy link
Copy Markdown
Collaborator

@mingyen066 mingyen066 May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clean up the connection here like handleRebootstrap does?

private void handleRebootstrap(List<ClientResponse> responses, long now) {
if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && metadataUpdater.needsRebootstrap(now, rebootstrapTriggerMs)) {
this.metadataUpdater.fetchNodes().forEach(node -> {
String nodeId = node.idString();
this.selector.close(nodeId);
if (connectionStates.isConnecting(nodeId) || connectionStates.isConnected(nodeId)) {
log.info("Disconnecting from node {} due to client rebootstrap.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
});
metadataUpdater.rebootstrap(now);
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients connect consumer core Kafka Broker producer storage Pull requests that target the storage module tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants