Skip to content

Commit 4b9eddc

Browse files
lianetmCopilot
andauthored
KAFKA-20428: Fix unsubscribe failure with assignment updates (#22011)
Fix to ensure that unsubscribe does not apply any pending assignment update that may exist in the background queue (e.g, if a reconciliation completed right before unsubscribing). Fix by filtering out the assignment update events on unsubscribe (same approach already done for filtering out error events that the unsubscribe should not process). This issue and fix only affects unsubscribe (not close), as unsubscribe is the only one, other than poll, that processes background events. Reviewers: Kirk True <kirk@kirktrue.pro>, Andrew Schofield <aschofield@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent a332a84 commit 4b9eddc

4 files changed

Lines changed: 165 additions & 10 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,11 @@ public void unsubscribe() {
19281928
try {
19291929
// If users have fatal error, they will get some exceptions in the background queue.
19301930
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
1931-
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException));
1931+
// We also skip processing assignment events (PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
1932+
// they are not relevant anymore (consumer already unsubscribing).
1933+
processBackgroundEvents(unsubscribeEvent.future(), timer,
1934+
e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException),
1935+
true);
19321936
log.info("Unsubscribed all topics or patterns and assigned partitions");
19331937
} catch (TimeoutException e) {
19341938
log.error("Failed while waiting for the unsubscribe event to complete");
@@ -2310,6 +2314,31 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
23102314
* Visible for testing.
23112315
*/
23122316
boolean processBackgroundEvents() {
2317+
return processBackgroundEvents(false);
2318+
}
2319+
2320+
/**
2321+
* Checks if the given background event is an assignment update event.
2322+
* Those are to update reconciled assignments, so should only be processed from poll() and not from unsubscribe().
2323+
*/
2324+
private static boolean isAssignmentEvent(BackgroundEvent event) {
2325+
return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
2326+
event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
2327+
}
2328+
2329+
/**
2330+
* Process the events produced by the background thread.
2331+
* It is possible that {@link ErrorEvent an error}
2332+
* could occur when processing the events. In such cases, the processor will take a reference to the first
2333+
* error, continue to process the remaining events, and then throw the first error that occurred.
2334+
* Visible for testing.
2335+
*
2336+
* @param skipAssignmentEvents If true, skip processing events that update a new assignment after a reconciliation
2337+
* (PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED)
2338+
* These events should only be processed from poll(), not from unsubscribe().
2339+
* @return true if any events were drained from the queue
2340+
*/
2341+
boolean processBackgroundEvents(boolean skipAssignmentEvents) {
23132342
AtomicReference<KafkaException> firstError = new AtomicReference<>();
23142343

23152344
List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
@@ -2321,6 +2350,18 @@ boolean processBackgroundEvents() {
23212350
if (event instanceof CompletableEvent)
23222351
backgroundEventReaper.add((CompletableEvent<?>) event);
23232352

2353+
// Skip assignment events if requested (e.g., during unsubscribe).
2354+
// These events should only be processed from poll().
2355+
// Complete them exceptionally to unblock the reconciliation in the background.
2356+
if (skipAssignmentEvents && isAssignmentEvent(event)) {
2357+
if (event instanceof CompletableEvent) {
2358+
((CompletableEvent<?>) event).future().completeExceptionally(
2359+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
2360+
}
2361+
log.debug("Skipped processing {} during unsubscribe", event.type());
2362+
continue;
2363+
}
2364+
23242365
backgroundEventProcessor.process(event);
23252366
} catch (Throwable t) {
23262367
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
@@ -2378,14 +2419,19 @@ boolean processBackgroundEvents() {
23782419
* @param timer Overall timer that bounds how long to wait for the event to complete
23792420
* @param ignoreErrorEventException Predicate to ignore background errors.
23802421
* Any exceptions found while processing background events that match the predicate won't be propagated.
2381-
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
2422+
* @param skipAssignmentEvents If true, skip processing PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
2423+
* events and complete them exceptionally. These events should only be
2424+
* processed from poll(), not from unsubscribe() or other operations.
2425+
* @return the completed result of the supplied {@code future}
2426+
* @throws TimeoutException if the operation does not complete before the timer expires
23822427
*/
23832428
// Visible for testing
2384-
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
2429+
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException,
2430+
boolean skipAssignmentEvents) {
23852431
do {
23862432
boolean hadEvents = false;
23872433
try {
2388-
hadEvents = processBackgroundEvents();
2434+
hadEvents = processBackgroundEvents(skipAssignmentEvents);
23892435
} catch (Exception e) {
23902436
if (!ignoreErrorEventException.test(e))
23912437
throw e;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,7 +1825,7 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
18251825
}
18261826

18271827
/**
1828-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1828+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18291829
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
18301830
*/
18311831
@Test
@@ -1851,14 +1851,14 @@ public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
18511851
return null;
18521852
}).when(future).get(any(Long.class), any(TimeUnit.class));
18531853

1854-
consumer.processBackgroundEvents(future, timer, e -> false);
1854+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18551855

18561856
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
18571857
assertEquals(800, timer.remainingMs());
18581858
}
18591859

18601860
/**
1861-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1861+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18621862
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
18631863
*/
18641864
@Test
@@ -1869,15 +1869,15 @@ public void testProcessBackgroundEventsWithoutDelay() {
18691869
// Create a future that is already completed.
18701870
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
18711871

1872-
consumer.processBackgroundEvents(future, timer, e -> false);
1872+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18731873

18741874
// Because we didn't need to perform a timed get, we should still have every last millisecond
18751875
// of our initial timeout.
18761876
assertEquals(1000, timer.remainingMs());
18771877
}
18781878

18791879
/**
1880-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1880+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18811881
* handles the case where the {@link Future} does not complete within the timeout.
18821882
*/
18831883
@Test
@@ -1892,7 +1892,7 @@ public void testProcessBackgroundEventsTimesOut() throws Exception {
18921892
throw new java.util.concurrent.TimeoutException("Intentional timeout");
18931893
}).when(future).get(any(Long.class), any(TimeUnit.class));
18941894

1895-
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false));
1895+
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false, false));
18961896

18971897
// Because we forced our mocked future to continuously time out, we should have no time remaining.
18981898
assertEquals(0, timer.remainingMs());
@@ -1993,6 +1993,40 @@ public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() {
19931993
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
19941994
}
19951995

1996+
private static Stream<CompletableBackgroundEvent<?>> assignmentEventsSource() {
1997+
return Stream.of(
1998+
new PartitionsAssignedEvent(Set.of(), new TreeSet<>(TOPIC_PARTITION_COMPARATOR)),
1999+
new StreamsTasksAssignedEvent(
2000+
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
2001+
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
2002+
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), true))
2003+
);
2004+
}
2005+
2006+
/**
2007+
* Test to ensure that assignment updates are not applied while unsubscribing
2008+
* (it would cause an IllegalArgumentException when calling unsubscribe()).
2009+
* Validates the fix for KAFKA-20428.
2010+
*/
2011+
@ParameterizedTest
2012+
@MethodSource("assignmentEventsSource")
2013+
public void testUnsubscribeWithPendingAssignmentEvent(CompletableBackgroundEvent<?> assignedEvent) {
2014+
consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
2015+
completeTopicSubscriptionChangeEventSuccessfully();
2016+
consumer.subscribe(singletonList("topic"));
2017+
completeUnsubscribeApplicationEventSuccessfully();
2018+
2019+
// Add assignment event to the background queue (simulating an ongoing reconciliation
2020+
// that completed just before unsubscribe was called)
2021+
backgroundEventQueue.add(assignedEvent);
2022+
2023+
// The call to unsubscribe should complete successfully (assignment event not processed and completed exceptionally)
2024+
assertDoesNotThrow(() -> consumer.unsubscribe());
2025+
verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing"))
2026+
.addAndGet(any(ApplyAssignmentEvent.class));
2027+
assertTrue(assignedEvent.future().isCompletedExceptionally());
2028+
}
2029+
19962030
@Test
19972031
public void testSeekToBeginning() {
19982032
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,45 @@ public void testLeaveGroupWhenMemberIsStale() {
12481248
assertEquals(MemberState.STALE, membershipManager.state());
12491249
}
12501250

1251+
/**
1252+
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
1253+
* assignment event is completed exceptionally, the member can still rejoin and start
1254+
* a new reconciliation.
1255+
*/
1256+
@Test
1257+
public void testLeaveGroupDuringReconciliationThenRejoin() {
1258+
Uuid topicId = Uuid.randomUuid();
1259+
String topicName = "topic1";
1260+
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
1261+
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
1262+
1263+
// Start reconciliation - assignment event is pending
1264+
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
1265+
membershipManager.maybeReconcile(true);
1266+
PartitionsAssignedEvent pendingAssignmentEvent = (PartitionsAssignedEvent) backgroundEventQueue.poll();
1267+
assertNotNull(pendingAssignmentEvent);
1268+
1269+
// Call leaveGroup while reconciliation is in progress
1270+
mockLeaveGroup();
1271+
membershipManager.leaveGroup();
1272+
assertEquals(MemberState.LEAVING, membershipManager.state());
1273+
1274+
// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
1275+
pendingAssignmentEvent.future().completeExceptionally(
1276+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
1277+
1278+
// Complete leave and rejoin
1279+
membershipManager.onHeartbeatRequestGenerated();
1280+
clearInvocations(membershipManager);
1281+
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
1282+
membershipManager.transitionToJoining();
1283+
1284+
// Receive assignment - verify new reconciliation starts
1285+
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
1286+
membershipManager.maybeReconcile(true);
1287+
verifyReconciliationTriggered(membershipManager);
1288+
}
1289+
12511290
@Test
12521291
public void testFatalFailureWhenStateIsUnjoined() {
12531292
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,42 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
13431343
assertFalse(onGroupLeft.isCompletedExceptionally());
13441344
}
13451345

1346+
/**
1347+
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
1348+
* assignment event is completed exceptionally, the member can still rejoin and start
1349+
* a new reconciliation.
1350+
*/
1351+
@Test
1352+
public void testLeaveGroupDuringReconciliationThenRejoin() {
1353+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1354+
final Set<StreamsRebalanceData.TaskId> activeTasks =
1355+
Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
1356+
when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
1357+
joining();
1358+
1359+
// Start reconciliation - assignment event is pending
1360+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1361+
final StreamsTasksAssignedEvent pendingAssignmentEvent =
1362+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
1363+
1364+
// Call leaveGroup while reconciliation is in progress
1365+
membershipManager.leaveGroup();
1366+
1367+
// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
1368+
pendingAssignmentEvent.future().completeExceptionally(
1369+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
1370+
1371+
// Complete leave and rejoin
1372+
membershipManager.onHeartbeatRequestGenerated();
1373+
Mockito.clearInvocations(backgroundEventHandler);
1374+
tasksAssignedAddCount = 0;
1375+
joining();
1376+
1377+
// Receive assignment - verify new reconciliation starts
1378+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1379+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
1380+
}
1381+
13461382
@Test
13471383
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
13481384
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");

0 commit comments

Comments
 (0)