Skip to content

Commit d0e0ec4

Browse files
KAFKA-20312: Handle null leader during OffsetFetcher regroup safely (#21760)
Description: This PR fixes a potential NullPointerException in OffsetFetcherUtils.regroupPartitionMapByNode when regrouping partitions by leader during offset reset / list-offsets. Background Partitions are grouped by leader via metadata.fetch().leaderFor(tp). If metadata changes between the initial leader lookup and the regroup step (e.g. leadership change or stale metadata), leaderFor(tp) can return null. The previous implementation used Collectors.groupingBy(..., leaderFor(...)), which throws an NPE when the classifier returns null. Fix OffsetFetcherUtils.regroupPartitionMapByNode Replaced the stream-based grouping with a loop that skips partitions whose leader is null, adds them to a caller-provided partitionsToRetry set, and does not trigger metadata refresh (callers are responsible for retry and metadata). Callers OffsetFetcher (classic consumer): passes partitionsToRetry into the helper; in resetPositionsAsync, when the set is non-empty, calls setNextAllowedRetry(partitionsToRetry, now + retryBackoffMs) and metadata.requestUpdate(false). OffsetsRequestManager (new consumer): passes a local retry set into the helper, then adds skipped partitions to state.remainingToSearch (with timestamp) and calls metadata.requestUpdate(false) when the set is non-empty. This keeps existing retry semantics and avoids the NPE. Tests OffsetFetcherTest.testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup Simulates leaderFor(tp) returning null during regroup (first metadata.fetch() stubbed to a cluster with no partition, then real method). Asserts no exception, partition stays pending reset, and after backoff and a second attempt with valid metadata the offset reset succeeds. OffsetsRequestManagerTest.testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE Simulates the same scenario in the fetch-offsets path: currentLeader has a leader but metadata.fetch() returns a cluster where one partition has no leader. Asserts no NPE, one request sent (for the partition with a leader), and that the skipped partition is retried after metadata update and completes successfully. Reviewers: TengYao Chi <frankvicky@apache.org> --------- Co-authored-by: TengYao Chi <kitingiao@gmail.com>
1 parent 76dcd5f commit d0e0ec4

5 files changed

Lines changed: 154 additions & 8 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,12 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
247247
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
248248
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
249249
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
250+
Set<TopicPartition> partitionsToRetry = new HashSet<>();
250251
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
251-
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
252+
groupListOffsetRequests(partitionResetTimestamps, partitionsToRetry);
253+
if (!partitionsToRetry.isEmpty()) {
254+
metadata.requestUpdate(false);
255+
}
252256
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
253257
Node node = entry.getKey();
254258
final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = entry.getValue();
@@ -413,7 +417,7 @@ private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequ
413417
}
414418
}
415419
}
416-
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
420+
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, partitionsToRetry);
417421
}
418422

419423
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,25 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
165165
return new OffsetFetcherUtils.ListOffsetResult(fetchedOffsets, partitionsToRetry);
166166
}
167167

168-
<T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartition, T> partitionMap) {
169-
return partitionMap.entrySet()
170-
.stream()
171-
.collect(Collectors.groupingBy(entry -> metadata.fetch().leaderFor(entry.getKey()),
172-
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
168+
<T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(
169+
Map<TopicPartition, T> partitionMap,
170+
Set<TopicPartition> partitionsToRetry) {
171+
Map<Node, Map<TopicPartition, T>> partitionsByNode = new HashMap<>();
172+
173+
final var cluster = metadata.fetch();
174+
175+
partitionMap.forEach((tp, value) -> {
176+
Node leader = cluster.leaderFor(tp);
177+
if (leader == null) {
178+
log.debug("Leader for partition {} is unknown while regrouping partition map by node", tp);
179+
partitionsToRetry.add(tp);
180+
return;
181+
}
182+
partitionsByNode.computeIfAbsent(leader, __ -> new HashMap<>())
183+
.put(tp, value);
184+
});
185+
186+
return partitionsByNode;
173187
}
174188

175189
Map<TopicPartition, SubscriptionState.FetchPosition> refreshAndGetPartitionsToValidate() {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,16 @@ private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartitio
911911
.setCurrentLeaderEpoch(currentLeaderEpoch));
912912
}
913913
}
914-
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
914+
Set<TopicPartition> partitionsSkippedInRegroup = new HashSet<>();
915+
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> result =
916+
offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, partitionsSkippedInRegroup);
917+
if (!partitionsSkippedInRegroup.isEmpty()) {
918+
metadata.requestUpdate(false);
919+
listOffsetsRequestState.ifPresent(state ->
920+
partitionsSkippedInRegroup.forEach(tp ->
921+
state.remainingToSearch.put(tp, timestampsToSearch.get(tp))));
922+
}
923+
return result;
915924
}
916925

917926
// Visible for testing

clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.junit.jupiter.api.AfterEach;
6363
import org.junit.jupiter.api.BeforeEach;
6464
import org.junit.jupiter.api.Test;
65+
import org.mockito.invocation.InvocationOnMock;
6566

6667
import java.time.Duration;
6768
import java.time.Instant;
@@ -94,6 +95,7 @@
9495
import static org.junit.jupiter.api.Assertions.assertTrue;
9596
import static org.junit.jupiter.api.Assertions.fail;
9697
import static org.mockito.Mockito.mock;
98+
import static org.mockito.Mockito.spy;
9799
import static org.mockito.Mockito.when;
98100

99101
public class OffsetFetcherTest {
@@ -350,6 +352,63 @@ public void testresetPositionsMetadataRefresh() {
350352
assertEquals(5, subscriptions.position(tp0).offset);
351353
}
352354

355+
/**
356+
* Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition with null leader
357+
* (e.g. leader was known when building partitionDataMap but metadata changed before regroup),
358+
* we should not throw NPE; partition is skipped and retried after backoff.
359+
*/
360+
@Test
361+
public void testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup() {
362+
buildFetcher();
363+
assignFromUser(singleton(tp0));
364+
subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST);
365+
366+
// Cluster with no partition info so leaderFor(tp0) returns null during regroup
367+
Cluster clusterWithNoLeader = new Cluster(
368+
"mockClusterId",
369+
Collections.singletonList(new Node(0, "localhost", 9092)),
370+
Collections.emptyList(),
371+
Collections.emptySet(),
372+
Collections.emptySet());
373+
374+
// First fetch() (during regroup) returns cluster with no leader; subsequent calls use real metadata
375+
ConsumerMetadata metadataSpy = spy(metadata);
376+
when(metadataSpy.fetch()).thenReturn(clusterWithNoLeader).thenAnswer(InvocationOnMock::callRealMethod);
377+
378+
LogContext logContext = new LogContext();
379+
offsetFetcher = new OffsetFetcher(logContext,
380+
consumerClient,
381+
metadataSpy,
382+
subscriptions,
383+
time,
384+
retryBackoffMs,
385+
requestTimeoutMs,
386+
IsolationLevel.READ_UNCOMMITTED,
387+
apiVersions);
388+
389+
offsetFetcher.resetPositionsIfNeeded();
390+
consumerClient.pollNoWakeup();
391+
392+
// Should not crash; partition still needs reset (skipped in regroup)
393+
assertTrue(subscriptions.isOffsetResetNeeded(tp0));
394+
assertFalse(subscriptions.hasValidPosition(tp0));
395+
396+
// Metadata refresh, then retry after backoff with valid metadata and successful response
397+
client.prepareMetadataUpdate(initialUpdateResponse);
398+
consumerClient.pollNoWakeup();
399+
400+
time.sleep(retryBackoffMs);
401+
client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, validLeaderEpoch),
402+
listOffsetResponse(Errors.NONE, 1L, 5L));
403+
404+
offsetFetcher.resetPositionsIfNeeded();
405+
consumerClient.pollNoWakeup();
406+
407+
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
408+
assertTrue(subscriptions.isFetchable(tp0));
409+
assertEquals(5L, subscriptions.position(tp0).offset);
410+
}
411+
353412
@Test
354413
public void testListOffsetNoUpdateMissingEpoch() {
355414
buildFetcher();

clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,66 @@ public void testListOffsetsWaitingForMetadataUpdate_RetrySucceeds() throws Execu
258258
verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets);
259259
}
260260

261+
/**
262+
* Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition with null leader
263+
* (e.g. metadata race), it should skip that partition and add it to remainingToSearch instead
264+
* of throwing NullPointerException.
265+
*/
266+
@Test
267+
public void testFetchOffsetsRegroupSkipsNullLeaderPartitionNoNPE() throws ExecutionException,
268+
InterruptedException {
269+
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
270+
timestampsToSearch.put(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
271+
timestampsToSearch.put(TEST_PARTITION_2, ListOffsetsRequest.EARLIEST_TIMESTAMP);
272+
273+
// currentLeader returns a leader for both partitions (so both enter partitionDataMap)
274+
when(metadata.currentLeader(TEST_PARTITION_1)).thenReturn(testLeaderEpoch(LEADER_1, Optional.empty()));
275+
when(metadata.currentLeader(TEST_PARTITION_2)).thenReturn(testLeaderEpoch(LEADER_2, Optional.empty()));
276+
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
277+
278+
// metadata.fetch() returns a cluster where PARTITION_2 has null leader (e.g. race: leader lost)
279+
List<PartitionInfo> partitions = new ArrayList<>();
280+
partitions.add(new PartitionInfo(TEST_TOPIC, 1, LEADER_1, null, null));
281+
partitions.add(new PartitionInfo(TEST_TOPIC, 2, null, null, null));
282+
Cluster clusterWithNullLeader = new Cluster("clusterId", Collections.singletonList(LEADER_1),
283+
partitions, Collections.emptySet(), Collections.emptySet());
284+
when(metadata.fetch()).thenReturn(clusterWithNullLeader);
285+
286+
CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture =
287+
assertDoesNotThrow(
288+
() -> requestManager.fetchOffsets(timestampsToSearch, false),
289+
"Should not throw NPE; only PARTITION_1 has a leader in regroup, so one request for LEADER_1");
290+
assertEquals(1, requestManager.requestsToSend());
291+
// requestsToRetry is populated when the in-flight request completes and remainingToSearch is non-empty, not yet
292+
assertEquals(0, requestManager.requestsToRetry());
293+
294+
// Complete request for PARTITION_1
295+
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
296+
assertEquals(1, res.unsentRequests.size());
297+
NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
298+
ClientResponse clientResponse = buildClientResponse(unsentRequest,
299+
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty())));
300+
clientResponse.onComplete();
301+
assertFalse(fetchOffsetsFuture.isDone());
302+
303+
// Metadata update: now both partitions have leaders; retry should send request for PARTITION_2
304+
mockSuccessfulRequest(Map.of(TEST_PARTITION_1, LEADER_1, TEST_PARTITION_2, LEADER_2));
305+
requestManager.onUpdate(new ClusterResource(""));
306+
assertEquals(1, requestManager.requestsToSend());
307+
308+
// Complete the retry request (only PARTITION_2 in this batch)
309+
NetworkClientDelegate.PollResult retryPoll = requestManager.poll(time.milliseconds());
310+
assertEquals(1, retryPoll.unsentRequests.size());
311+
ClientResponse retryResponse = buildClientResponse(retryPoll.unsentRequests.get(0),
312+
Collections.singletonMap(TEST_PARTITION_2, new OffsetAndTimestampInternal(10L, -1, Optional.empty())));
313+
retryResponse.onComplete();
314+
315+
Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = new HashMap<>();
316+
expectedOffsets.put(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty()));
317+
expectedOffsets.put(TEST_PARTITION_2, new OffsetAndTimestampInternal(10L, -1, Optional.empty()));
318+
verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets);
319+
}
320+
261321
@ParameterizedTest
262322
@MethodSource("retriableErrors")
263323
public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throws ExecutionException, InterruptedException {

0 commit comments

Comments
 (0)