Skip to content

Commit d6afb13

Browse files
committed
first test code
1 parent 7b46af6 commit d6afb13

10 files changed

Lines changed: 367 additions & 56 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.clients.consumer.internals;
1818

19+
import org.apache.kafka.clients.consumer.CloseOptions;
1920
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
2021
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
2122
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -210,9 +211,9 @@ public int hashCode() {
210211

211212
/**
212213
* If the member is currently leaving the group after a call to {@link #leaveGroup()} or
213-
* {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
214-
* completes (callbacks executed and heartbeat request to leave is sent out). This will be empty if the
215-
* member is not leaving.
214+
* {@link #leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}, this will have a future that will
215+
* complete when the ongoing leave operation completes (callbacks executed and heartbeat request to leave
216+
* is sent out). This will be empty if the member is not leaving.
216217
*/
217218
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
218219

@@ -875,23 +876,43 @@ private static Map<String, SortedSet<Integer>> toTasksAssignment(final List<Stre
875876
}
876877

877878
/**
878-
* Leaves the group when the member closes.
879+
* Closes the member's participation in the group, honoring the requested {@link CloseOptions.GroupMembershipOperation}:
879880
*
880-
* <p>
881-
* This method does the following:
882-
* <ol>
883-
* <li>Transitions member state to {@link MemberState#PREPARE_LEAVING}.</li>
884-
* <li>Skips the invocation of the revocation callback or lost callback.</li>
885-
* <li>Clears the current and target assignment, unsubscribes from all topics and
886-
* transitions the member state to {@link MemberState#LEAVING}.</li>
887-
* </ol>
888-
* States {@link MemberState#PREPARE_LEAVING} and {@link MemberState#LEAVING} cause the heartbeat request manager
889-
* to send a leave group heartbeat.
890-
* </p>
881+
* <ul>
882+
* <li>{@code REMAIN_IN_GROUP}: clears local task assignments, unsubscribes, and completes
883+
* immediately without sending a leave group heartbeat. The broker will remove the member
884+
* via session timeout.</li>
885+
* <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
886+
* {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING} and sends the leave
887+
* group heartbeat.</li>
888+
* </ul>
891889
*
892-
* @return future that will complete when the heartbeat to leave the group has been sent out.
890+
* @param membershipOperation the requested close behavior
891+
* @return future that will complete when the close operation is done
893892
*/
894-
public CompletableFuture<Void> leaveGroupOnClose() {
893+
public CompletableFuture<Void> leaveGroupOnClose(final CloseOptions.GroupMembershipOperation membershipOperation) {
894+
if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == membershipOperation) {
895+
// Skip the leave group heartbeat. Clean up locally and complete immediately.
896+
// The broker will remove the member from the group via session timeout.
897+
log.info("Skipping leave group heartbeat for member {} with epoch {} because REMAIN_IN_GROUP was specified.",
898+
memberId, memberEpoch);
899+
if (isNotInGroup()) {
900+
subscriptionState.unsubscribe();
901+
notifyAssignmentChange(Collections.emptySet());
902+
return CompletableFuture.completedFuture(null);
903+
}
904+
clearTaskAndPartitionAssignment();
905+
subscriptionState.unsubscribe();
906+
// UNSUBSCRIBED is only reachable from PREPARE_LEAVING, LEAVING, or FENCED.
907+
// For any active state (JOINING/STABLE/RECONCILING/ACKNOWLEDGING) we must pass
908+
// through PREPARE_LEAVING first; if already in PREPARE_LEAVING or LEAVING we
909+
// can transition directly.
910+
if (state != MemberState.PREPARE_LEAVING && state != MemberState.LEAVING) {
911+
transitionTo(MemberState.PREPARE_LEAVING);
912+
}
913+
transitionTo(MemberState.UNSUBSCRIBED);
914+
return CompletableFuture.completedFuture(null);
915+
}
895916
return leaveGroup(true);
896917
}
897918

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ private void process(final LeaveGroupOnCloseEvent event) {
483483
future.whenComplete(complete(event.future()));
484484
} else if (requestManagers.streamsMembershipManager.isPresent()) {
485485
log.debug("Signal the StreamsMembershipManager to leave the streams group since the member is closing");
486-
CompletableFuture<Void> future = requestManagers.streamsMembershipManager.get().leaveGroupOnClose();
486+
CompletableFuture<Void> future = requestManagers.streamsMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
487487
future.whenComplete(complete(event.future()));
488488
}
489489
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.clients.consumer.internals;
1818

19+
import org.apache.kafka.clients.consumer.CloseOptions;
1920
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
2021
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
2122
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
@@ -1152,7 +1153,7 @@ public void testLeaveGroupWhenNotInGroup() {
11521153

11531154
@Test
11541155
public void testLeaveGroupOnCloseWhenNotInGroup() {
1155-
testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
1156+
testLeaveGroupWhenNotInGroup(() -> membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
11561157
}
11571158

11581159
@Test
@@ -1228,7 +1229,7 @@ public void testLeaveGroupWhenNotInGroupAndFenced() {
12281229

12291230
@Test
12301231
public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
1231-
testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroupOnClose);
1232+
testLeaveGroupOnCloseWhenNotInGroupAndFenced(() -> membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT));
12321233
}
12331234

12341235
private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final Supplier<CompletableFuture<Void>> leaveGroup) {
@@ -1271,7 +1272,8 @@ public void testLeaveGroupWhenInGroupWithAssignment() {
12711272
verifyInStatePrepareLeaving(membershipManager);
12721273
final CompletableFuture<Void> onGroupLeftBeforeRevocationCallback = membershipManager.leaveGroup();
12731274
assertEquals(onGroupLeft, onGroupLeftBeforeRevocationCallback);
1274-
final CompletableFuture<Void> onGroupLeftOnCloseBeforeRevocationCallback = membershipManager.leaveGroupOnClose();
1275+
final CompletableFuture<Void> onGroupLeftOnCloseBeforeRevocationCallback =
1276+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
12751277
assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
12761278
onTasksRevokedCallbackExecuted.complete(null);
12771279
verify(memberStateListener).onGroupAssignmentUpdated(Set.of());
@@ -1312,7 +1314,8 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
13121314

13131315
acknowledging(onTasksAssignedCallbackExecutedSetup);
13141316

1315-
final CompletableFuture<Void> onGroupLeft = membershipManager.leaveGroupOnClose();
1317+
final CompletableFuture<Void> onGroupLeft =
1318+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
13161319

13171320
assertFalse(onGroupLeft.isDone());
13181321
verifyInStateLeaving(membershipManager);
@@ -1321,7 +1324,8 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
13211324
verify(backgroundEventHandler, never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
13221325
final CompletableFuture<Void> onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
13231326
assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
1324-
final CompletableFuture<Void> onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = membershipManager.leaveGroupOnClose();
1327+
final CompletableFuture<Void> onGroupLeftOnCloseBeforeHeartbeatRequestGenerated =
1328+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
13251329
assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
13261330
assertFalse(onGroupLeft.isDone());
13271331
membershipManager.onHeartbeatRequestGenerated();
@@ -1340,6 +1344,41 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
13401344
assertFalse(onGroupLeft.isCompletedExceptionally());
13411345
}
13421346

1347+
@Test
1348+
public void testLeaveGroupOnCloseWithRemainInGroupSkipsLeaveHeartbeat() {
1349+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1350+
final Set<StreamsRebalanceData.TaskId> activeTasks =
1351+
Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
1352+
joining();
1353+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1354+
final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
1355+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
1356+
activeTasks, Set.of(), Set.of());
1357+
acknowledging(onTasksAssignedCallbackExecuted);
1358+
1359+
final CompletableFuture<Void> onGroupLeft =
1360+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
1361+
1362+
assertTrue(onGroupLeft.isDone());
1363+
assertFalse(onGroupLeft.isCompletedExceptionally());
1364+
1365+
verifyInStateUnsubscribed(membershipManager);
1366+
verify(subscriptionState).unsubscribe();
1367+
verify(backgroundEventHandler, never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
1368+
}
1369+
1370+
@Test
1371+
public void testLeaveGroupOnCloseWithRemainInGroupWhenNotInGroup() {
1372+
// When not in the group, REMAIN_IN_GROUP should complete immediately
1373+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1374+
1375+
final CompletableFuture<Void> onGroupLeft =
1376+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
1377+
1378+
assertTrue(onGroupLeft.isDone());
1379+
assertFalse(onGroupLeft.isCompletedExceptionally());
1380+
}
1381+
13431382
@Test
13441383
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
13451384
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.common.utils.Utils;
3030
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
3131
import org.apache.kafka.streams.CloseOptions;
32+
import org.apache.kafka.streams.GroupProtocol;
3233
import org.apache.kafka.streams.KafkaStreams;
3334
import org.apache.kafka.streams.KeyValue;
3435
import org.apache.kafka.streams.StreamsBuilder;
@@ -57,8 +58,12 @@
5758
import java.util.List;
5859
import java.util.Properties;
5960

61+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
62+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyStreamGroup;
6063
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
64+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyStreamGroup;
6165
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
66+
import static org.junit.jupiter.api.Assertions.assertFalse;
6267

6368
@Tag("integration")
6469
@Timeout(600)
@@ -163,6 +168,84 @@ public void testCloseOptions() throws Exception {
163168
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
164169
}
165170

171+
@Test
172+
public void testCloseOptionsRemainInGroupClassicProtocol() throws Exception {
173+
// Classic + REMAIN_IN_GROUP: member must stay in group (no leave heartbeat).
174+
// The group should still have a member immediately after close because
175+
// the session timeout is set to Integer.MAX_VALUE.
176+
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
177+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
178+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
179+
180+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
181+
.withTimeout(Duration.ofSeconds(30)));
182+
183+
assertFalse(isEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
184+
"Group should still have a member after REMAIN_IN_GROUP close (session timeout is MAX)");
185+
}
186+
187+
@Test
188+
public void testCloseOptionsDefaultClassicProtocol() throws Exception {
189+
// Classic + DEFAULT: must behave like REMAIN_IN_GROUP (member stays in group).
190+
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
191+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
192+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
193+
194+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
195+
.withTimeout(Duration.ofSeconds(30)));
196+
197+
assertFalse(isEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
198+
"Group should still have a member after DEFAULT close under Classic protocol");
199+
}
200+
201+
@Test
202+
public void testCloseOptionsLeaveGroupStreamsProtocol() throws Exception {
203+
// Streams + LEAVE_GROUP: member must leave the group immediately.
204+
streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
205+
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
206+
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
207+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
208+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
209+
210+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP)
211+
.withTimeout(Duration.ofSeconds(30)));
212+
213+
waitForEmptyStreamGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
214+
}
215+
216+
@Test
217+
public void testCloseOptionsDefaultStreamsProtocol() throws Exception {
218+
// Streams + DEFAULT: dynamic member must leave the group (consistent with Streams protocol design).
219+
streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
220+
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
221+
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
222+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
223+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
224+
225+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.DEFAULT)
226+
.withTimeout(Duration.ofSeconds(30)));
227+
228+
waitForEmptyStreamGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
229+
}
230+
231+
@Test
232+
public void testCloseOptionsRemainInGroupStreamsProtocol() throws Exception {
233+
// Streams + REMAIN_IN_GROUP: member must stay in group (no leave heartbeat sent).
234+
// The group should still have a member immediately after close because
235+
// the session timeout is set to Integer.MAX_VALUE.
236+
streamsConfig.remove(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
237+
streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
238+
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
239+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
240+
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
241+
242+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
243+
.withTimeout(Duration.ofSeconds(30)));
244+
245+
assertFalse(isEmptyStreamGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)),
246+
"Group should still have a member after REMAIN_IN_GROUP close under Streams protocol");
247+
}
248+
166249
protected Topology setupTopologyWithoutIntermediateUserTopic() {
167250
final StreamsBuilder builder = new StreamsBuilder();
168251

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.admin.Admin;
2020
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
21+
import org.apache.kafka.clients.admin.StreamsGroupDescription;
2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerConfig;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -984,6 +985,22 @@ public boolean conditionMet() {
984985
}
985986
}
986987

988+
private static class StreamGroupInactiveCondition implements TestCondition {
989+
private final Admin adminClient;
990+
private final String applicationId;
991+
992+
private StreamGroupInactiveCondition(final Admin adminClient,
993+
final String applicationId) {
994+
this.adminClient = adminClient;
995+
this.applicationId = applicationId;
996+
}
997+
998+
@Override
999+
public boolean conditionMet() {
1000+
return isEmptyStreamGroup(adminClient, applicationId);
1001+
}
1002+
}
1003+
9871004
public static void waitForEmptyConsumerGroup(final Admin adminClient,
9881005
final String applicationId,
9891006
final long timeoutMs) throws Exception {
@@ -994,6 +1011,16 @@ public static void waitForEmptyConsumerGroup(final Admin adminClient,
9941011
);
9951012
}
9961013

1014+
public static void waitForEmptyStreamGroup(final Admin adminClient,
1015+
final String applicationId,
1016+
final long timeoutMs) throws Exception {
1017+
TestUtils.waitForCondition(
1018+
new IntegrationTestUtils.StreamGroupInactiveCondition(adminClient, applicationId),
1019+
timeoutMs,
1020+
"Test stream group " + applicationId + " still active even after waiting " + timeoutMs + " ms."
1021+
);
1022+
}
1023+
9971024
public static boolean isEmptyConsumerGroup(final Admin adminClient,
9981025
final String applicationId) {
9991026
try {
@@ -1010,6 +1037,22 @@ public static boolean isEmptyConsumerGroup(final Admin adminClient,
10101037
}
10111038
}
10121039

1040+
public static boolean isEmptyStreamGroup(final Admin adminClient,
1041+
final String applicationId) {
1042+
try {
1043+
final StreamsGroupDescription groupDescription =
1044+
adminClient.describeStreamsGroups(singletonList(applicationId))
1045+
.describedGroups()
1046+
.get(applicationId)
1047+
.get();
1048+
return groupDescription.members().isEmpty();
1049+
} catch (final ExecutionException e) {
1050+
return e.getCause() instanceof GroupIdNotFoundException;
1051+
} catch (final InterruptedException e) {
1052+
return false;
1053+
}
1054+
}
1055+
10131056
@SuppressWarnings("deprecation")
10141057
private static StateListener getStateListener(final KafkaStreams streams) {
10151058
try {

0 commit comments

Comments
 (0)