1717package org .apache .kafka .clients ;
1818
1919import org .apache .kafka .common .Cluster ;
20+ import org .apache .kafka .common .ClusterResource ;
2021import org .apache .kafka .common .KafkaException ;
2122import org .apache .kafka .common .Node ;
2223import org .apache .kafka .common .TopicPartition ;
@@ -120,6 +121,9 @@ private enum State {
120121
121122 private final MetadataRecoveryStrategy metadataRecoveryStrategy ;
122123
124+ /* Whether to send the cluster ID and node ID on ApiVersions RPC for checking by the broker */
125+ private final boolean metadataClusterCheckEnable ;
126+
123127 private final Time time ;
124128
125129 /**
@@ -154,7 +158,8 @@ public NetworkClient(Selectable selector,
154158 boolean discoverBrokerVersions ,
155159 ApiVersions apiVersions ,
156160 LogContext logContext ,
157- MetadataRecoveryStrategy metadataRecoveryStrategy ) {
161+ MetadataRecoveryStrategy metadataRecoveryStrategy ,
162+ boolean metadataClusterCheckEnable ) {
158163 this (selector ,
159164 metadata ,
160165 clientId ,
@@ -171,7 +176,8 @@ public NetworkClient(Selectable selector,
171176 apiVersions ,
172177 logContext ,
173178 Long .MAX_VALUE ,
174- metadataRecoveryStrategy );
179+ metadataRecoveryStrategy ,
180+ metadataClusterCheckEnable );
175181 }
176182
177183 public NetworkClient (Selectable selector ,
@@ -190,7 +196,8 @@ public NetworkClient(Selectable selector,
190196 ApiVersions apiVersions ,
191197 LogContext logContext ,
192198 long rebootstrapTriggerMs ,
193- MetadataRecoveryStrategy metadataRecoveryStrategy ) {
199+ MetadataRecoveryStrategy metadataRecoveryStrategy ,
200+ boolean metadataClusterCheckEnable ) {
194201 this (null ,
195202 metadata ,
196203 selector ,
@@ -211,7 +218,8 @@ public NetworkClient(Selectable selector,
211218 new DefaultHostResolver (),
212219 null ,
213220 rebootstrapTriggerMs ,
214- metadataRecoveryStrategy );
221+ metadataRecoveryStrategy ,
222+ metadataClusterCheckEnable );
215223 }
216224
217225 public NetworkClient (Selectable selector ,
@@ -230,7 +238,8 @@ public NetworkClient(Selectable selector,
230238 ApiVersions apiVersions ,
231239 Sensor throttleTimeSensor ,
232240 LogContext logContext ,
233- MetadataRecoveryStrategy metadataRecoveryStrategy ) {
241+ MetadataRecoveryStrategy metadataRecoveryStrategy ,
242+ boolean metadataClusterCheckEnable ) {
234243 this (null ,
235244 metadata ,
236245 selector ,
@@ -251,7 +260,8 @@ public NetworkClient(Selectable selector,
251260 new DefaultHostResolver (),
252261 null ,
253262 Long .MAX_VALUE ,
254- metadataRecoveryStrategy );
263+ metadataRecoveryStrategy ,
264+ metadataClusterCheckEnable );
255265 }
256266
257267 public NetworkClient (Selectable selector ,
@@ -269,7 +279,8 @@ public NetworkClient(Selectable selector,
269279 boolean discoverBrokerVersions ,
270280 ApiVersions apiVersions ,
271281 LogContext logContext ,
272- MetadataRecoveryStrategy metadataRecoveryStrategy ) {
282+ MetadataRecoveryStrategy metadataRecoveryStrategy ,
283+ boolean metadataClusterCheckEnable ) {
273284 this (metadataUpdater ,
274285 null ,
275286 selector ,
@@ -290,7 +301,8 @@ public NetworkClient(Selectable selector,
290301 new DefaultHostResolver (),
291302 null ,
292303 Long .MAX_VALUE ,
293- metadataRecoveryStrategy );
304+ metadataRecoveryStrategy ,
305+ metadataClusterCheckEnable );
294306 }
295307
296308 public NetworkClient (MetadataUpdater metadataUpdater ,
@@ -313,7 +325,8 @@ public NetworkClient(MetadataUpdater metadataUpdater,
313325 HostResolver hostResolver ,
314326 ClientTelemetrySender clientTelemetrySender ,
315327 long rebootstrapTriggerMs ,
316- MetadataRecoveryStrategy metadataRecoveryStrategy ) {
328+ MetadataRecoveryStrategy metadataRecoveryStrategy ,
329+ boolean metadataClusterCheckEnable ) {
317330 /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
318331 * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
319332 * super constructor is invoked.
@@ -346,6 +359,7 @@ public NetworkClient(MetadataUpdater metadataUpdater,
346359 this .telemetrySender = (clientTelemetrySender != null ) ? new TelemetrySender (clientTelemetrySender ) : null ;
347360 this .rebootstrapTriggerMs = rebootstrapTriggerMs ;
348361 this .metadataRecoveryStrategy = metadataRecoveryStrategy ;
362+ this .metadataClusterCheckEnable = metadataClusterCheckEnable ;
349363 }
350364
351365 /**
@@ -1024,7 +1038,10 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
10241038 InFlightRequest req , long now , ApiVersionsResponse apiVersionsResponse ) {
10251039 final String node = req .destination ;
10261040 if (apiVersionsResponse .data ().errorCode () != Errors .NONE .code ()) {
1027- if (req .request .version () == 0 || apiVersionsResponse .data ().errorCode () != Errors .UNSUPPORTED_VERSION .code ()) {
1041+ if (metadataRecoveryStrategy == MetadataRecoveryStrategy .REBOOTSTRAP && apiVersionsResponse .data ().errorCode () == Errors .REBOOTSTRAP_REQUIRED .code ()) {
1042+ log .info ("Rebootstrap requested by server due to cluster metadata mismatch." );
1043+ metadataUpdater .rebootstrap (now );
1044+ } else if (req .request .version () == 0 || apiVersionsResponse .data ().errorCode () != Errors .UNSUPPORTED_VERSION .code ()) {
10281045 log .warn ("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting." ,
10291046 Errors .forCode (apiVersionsResponse .data ().errorCode ()), node , req .header .correlationId ());
10301047 this .selector .close (node );
@@ -1107,6 +1124,19 @@ private void handleInitiateApiVersionRequests(long now) {
11071124 // not before ready.
11081125 this .connectionStates .checkingApiVersions (node );
11091126 ApiVersionsRequest .Builder apiVersionRequestBuilder = entry .getValue ();
1127+ // If we know the cluster ID and node ID we are connecting to, we can include
1128+ // those details in the ApiVersions request for checking in the broker,
1129+ // provided that the metadata recovery strategy is not NONE. (KIP-1242)
1130+ if (metadataRecoveryStrategy != MetadataRecoveryStrategy .NONE && metadataClusterCheckEnable ) {
1131+ String clusterId = this .metadataUpdater .clusterId ();
1132+ int nodeId = Integer .parseInt (node );
1133+ // When connecting to coordinators, the client uses large positive node ID
1134+ // values which do not match the target broker's node ID. Exclude those.
1135+ if (clusterId != null && nodeId > 0 && nodeId < Integer .MAX_VALUE / 2 ) {
1136+ apiVersionRequestBuilder .setClusterId (clusterId );
1137+ apiVersionRequestBuilder .setNodeId (nodeId );
1138+ }
1139+ }
11101140 ClientRequest clientRequest = newClientRequest (node , apiVersionRequestBuilder , now , true );
11111141 doSend (clientRequest , true , now );
11121142 iter .remove ();
@@ -1193,6 +1223,15 @@ class DefaultMetadataUpdater implements MetadataUpdater {
11931223 this .inProgress = null ;
11941224 }
11951225
1226+ @ Override
1227+ public String clusterId () {
1228+ ClusterResource clusterResource = metadata .fetch ().clusterResource ();
1229+ if (clusterResource != null ) {
1230+ return clusterResource .clusterId ();
1231+ }
1232+ return null ;
1233+ }
1234+
11961235 @ Override
11971236 public List <Node > fetchNodes () {
11981237 return metadata .fetch ().nodes ();
0 commit comments