Skip to content

Commit 0d9fe51

Browse files
dajacclaude
andauthored
KAFKA-20434: Consumer group does not recompute assignment when static members rejoin with different assignor (#22036)
When all static members of a consumer group using the consumer protocol leave and rejoin with a different server-side assignor, the group does not recompute the target assignment. The `bumpGroupEpoch` flag only considers changes to subscribed topic names and regex but not the preferred server assignor. This patch adds a `hasPreferredServerAssignorChanged` check that compares the effective preferred assignor (falling back to the default when no member has an explicit preference) before and after the member update. The group epoch is bumped when the effective preferred assignor changes, which only happens once a majority of members has switched. Reviewers: Sean Quah <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e30f771 commit 0d9fe51

3 files changed

Lines changed: 278 additions & 3 deletions

File tree

core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, Consumer
2828
import org.apache.kafka.common.test.ClusterInstance
2929
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
3030
import org.apache.kafka.server.common.Feature
31-
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull}
31+
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue}
3232

3333
import scala.collection.Map
3434
import scala.jdk.CollectionConverters._
@@ -1273,4 +1273,78 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
12731273
.setAssignment(expectedAssignment4)
12741274
assertEquals(expectedResponse4, response4.data)
12751275
}
1276+
1277+
@ClusterTest(
1278+
serverProperties = Array(
1279+
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0"),
1280+
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
1281+
)
1282+
)
1283+
def testStaticMembersRejoinWithDifferentServerAssignor(): Unit = {
1284+
// Creates the __consumer_offsets topics because it won't be created automatically
1285+
// in this test because it does not use FindCoordinator API.
1286+
createOffsetsTopic()
1287+
1288+
val groupId = "grp"
1289+
val instanceIds = List("instance-1", "instance-2", "instance-3")
1290+
1291+
// A helper that joins a static member with the given server assignor and waits until
1292+
// the heartbeat returns a successful response. Returns the member epoch from the response.
1293+
def joinStaticMember(memberId: String, instanceId: String, serverAssignor: String): Int = {
1294+
val joinRequest = new ConsumerGroupHeartbeatRequest.Builder(
1295+
new ConsumerGroupHeartbeatRequestData()
1296+
.setGroupId(groupId)
1297+
.setMemberId(memberId)
1298+
.setInstanceId(instanceId)
1299+
.setMemberEpoch(0)
1300+
.setRebalanceTimeoutMs(5 * 60 * 1000)
1301+
.setServerAssignor(serverAssignor)
1302+
.setSubscribedTopicNames(List("foo").asJava)
1303+
.setTopicPartitions(List.empty.asJava)
1304+
).build()
1305+
1306+
var response: ConsumerGroupHeartbeatResponse = null
1307+
TestUtils.waitUntilTrue(() => {
1308+
response = connectAndReceive[ConsumerGroupHeartbeatResponse](joinRequest)
1309+
response.data.errorCode == Errors.NONE.code
1310+
}, msg = s"Static member $instanceId could not join the group. Last response $response.")
1311+
response.data.memberEpoch
1312+
}
1313+
1314+
// Three static members join the group with the "uniform" assignor.
1315+
val initialMemberIds = instanceIds.map(_ => Uuid.randomUuid.toString)
1316+
val initialEpochs = initialMemberIds.zip(instanceIds).map { case (memberId, instanceId) =>
1317+
joinStaticMember(memberId, instanceId, "uniform")
1318+
}
1319+
val epochBeforeRejoin = initialEpochs.last
1320+
1321+
// All three members leave the group.
1322+
initialMemberIds.zip(instanceIds).foreach { case (memberId, instanceId) =>
1323+
val leaveRequest = new ConsumerGroupHeartbeatRequest.Builder(
1324+
new ConsumerGroupHeartbeatRequestData()
1325+
.setGroupId(groupId)
1326+
.setMemberId(memberId)
1327+
.setInstanceId(instanceId)
1328+
.setMemberEpoch(-2)
1329+
).build()
1330+
val leaveResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](leaveRequest)
1331+
assertEquals(Errors.NONE.code, leaveResponse.data.errorCode)
1332+
assertEquals(-2, leaveResponse.data.memberEpoch)
1333+
}
1334+
1335+
// All three members rejoin with the "range" assignor and new member ids. The group
1336+
// epoch must be bumped once a majority of members has switched assignor so that the
1337+
// target assignment is recomputed with the new assignor.
1338+
val rejoinEpochs = instanceIds.map { instanceId =>
1339+
joinStaticMember(Uuid.randomUuid.toString, instanceId, "range")
1340+
}
1341+
1342+
// The last rejoin epoch must be greater than the epoch before the leave/rejoin cycle,
1343+
// confirming that the group epoch was bumped and the assignment was recomputed.
1344+
val lastRejoinEpoch = rejoinEpochs.last
1345+
assertTrue(lastRejoinEpoch > epochBeforeRejoin,
1346+
s"Expected the last rejoin epoch ($lastRejoinEpoch) to be greater than " +
1347+
s"the epoch before the rejoin ($epochBeforeRejoin). " +
1348+
s"Rejoin epochs: ${rejoinEpochs.mkString(", ")}.")
1349+
}
12761350
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2377,6 +2377,11 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
23772377
updatedMember,
23782378
records
23792379
);
2380+
boolean preferredServerAssignorChanged = hasPreferredServerAssignorChanged(
2381+
group,
2382+
member,
2383+
updatedMember
2384+
);
23802385

23812386
// The subscription has changed when either the subscribed topic names or subscribed topic
23822387
// regex has changed.
@@ -2389,9 +2394,12 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
23892394
// the group epoch when the member has changed its subscribed topic names or the member
23902395
// has changed its subscribed topic regex to a regex that is already resolved. We avoid
23912396
// bumping the group epoch when the new subscribed topic regex has not been resolved
2392-
// yet, since we will have to update the target assignment again later.
2397+
// yet, since we will have to update the target assignment again later. We also bump the
2398+
// group epoch when the effective preferred server assignor changes, since the target
2399+
// assignment must be recomputed with the new assignor.
23932400
subscribedTopicNamesChanged ||
2394-
updateRegularExpressionStatus == UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
2401+
updateRegularExpressionStatus == UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED ||
2402+
preferredServerAssignorChanged;
23952403

23962404
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
23972405
// The subscription metadata is updated in two cases:
@@ -3232,6 +3240,27 @@ private boolean hasMemberSubscriptionChanged(
32323240
return false;
32333241
}
32343242

3243+
/**
3244+
* Returns true if the effective preferred server assignor of the group changes as a
3245+
* result of updating the given member. The effective preferred assignor falls back to
3246+
* the default assignor when no member has an explicit preference.
3247+
*
3248+
* @param group The consumer group.
3249+
* @param member The old member.
3250+
* @param updatedMember The updated member.
3251+
* @return Whether the effective preferred server assignor has changed.
3252+
*/
3253+
private boolean hasPreferredServerAssignorChanged(
3254+
ConsumerGroup group,
3255+
ConsumerGroupMember member,
3256+
ConsumerGroupMember updatedMember
3257+
) {
3258+
String defaultAssignorName = defaultConsumerGroupAssignor.name();
3259+
String currentPreferredAssignor = group.preferredServerAssignor().orElse(defaultAssignorName);
3260+
String newPreferredAssignor = group.computePreferredServerAssignor(member, updatedMember).orElse(defaultAssignorName);
3261+
return !currentPreferredAssignor.equals(newPreferredAssignor);
3262+
}
3263+
32353264
private static boolean isNotEmpty(String value) {
32363265
return value != null && !value.isEmpty();
32373266
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2536,6 +2536,178 @@ member2RejoinId, new MemberAssignmentImpl(mkAssignment(
25362536
context.assertNoRebalanceTimeout(groupId, memberId2);
25372537
}
25382538

2539+
@Test
2540+
public void testStaticMembersRejoinWithNewServerAssignor() {
2541+
String groupId = "fooup";
2542+
// Use a static member id as it makes the test easier.
2543+
String memberId1 = Uuid.randomUuid().toString();
2544+
String memberId2 = Uuid.randomUuid().toString();
2545+
String memberId3 = Uuid.randomUuid().toString();
2546+
String member2RejoinId = Uuid.randomUuid().toString();
2547+
String member3RejoinId = Uuid.randomUuid().toString();
2548+
2549+
Uuid fooTopicId = Uuid.randomUuid();
2550+
String fooTopicName = "foo";
2551+
2552+
MockPartitionAssignor uniformAssignor = new MockPartitionAssignor("uniform");
2553+
MockPartitionAssignor rangeAssignor = new MockPartitionAssignor("range");
2554+
2555+
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
2556+
.setState(MemberState.STABLE)
2557+
.setInstanceId("instance-id-1")
2558+
.setMemberEpoch(10)
2559+
.setPreviousMemberEpoch(9)
2560+
.setRebalanceTimeoutMs(5000)
2561+
.setClientId(DEFAULT_CLIENT_ID)
2562+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
2563+
.setSubscribedTopicNames(List.of("foo"))
2564+
.setServerAssignorName("uniform")
2565+
.setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
2566+
mkTopicAssignment(fooTopicId, 0, 1)), 10))
2567+
.build();
2568+
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
2569+
.setState(MemberState.STABLE)
2570+
.setInstanceId("instance-id-2")
2571+
.setMemberEpoch(10)
2572+
.setPreviousMemberEpoch(9)
2573+
.setRebalanceTimeoutMs(5000)
2574+
.setClientId(DEFAULT_CLIENT_ID)
2575+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
2576+
.setSubscribedTopicNames(List.of("foo"))
2577+
.setServerAssignorName("uniform")
2578+
.setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
2579+
mkTopicAssignment(fooTopicId, 2, 3)), 10))
2580+
.build();
2581+
ConsumerGroupMember member3 = new ConsumerGroupMember.Builder(memberId3)
2582+
.setState(MemberState.STABLE)
2583+
.setInstanceId("instance-id-3")
2584+
.setMemberEpoch(10)
2585+
.setPreviousMemberEpoch(9)
2586+
.setRebalanceTimeoutMs(5000)
2587+
.setClientId(DEFAULT_CLIENT_ID)
2588+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
2589+
.setSubscribedTopicNames(List.of("foo"))
2590+
.setServerAssignorName("uniform")
2591+
.setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
2592+
mkTopicAssignment(fooTopicId, 4, 5)), 10))
2593+
.build();
2594+
2595+
MetadataImage metadataImage = new MetadataImageBuilder()
2596+
.addTopic(fooTopicId, fooTopicName, 6)
2597+
.addRacks()
2598+
.build();
2599+
2600+
// Consumer group with three static members using the uniform assignor.
2601+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
2602+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(uniformAssignor, rangeAssignor))
2603+
.withMetadataImage(new KRaftCoordinatorMetadataImage(metadataImage))
2604+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
2605+
.withMember(member1)
2606+
.withMember(member2)
2607+
.withMember(member3)
2608+
.withAssignment(memberId1, mkAssignment(
2609+
mkTopicAssignment(fooTopicId, 0, 1)))
2610+
.withAssignment(memberId2, mkAssignment(
2611+
mkTopicAssignment(fooTopicId, 2, 3)))
2612+
.withAssignment(memberId3, mkAssignment(
2613+
mkTopicAssignment(fooTopicId, 4, 5)))
2614+
.withAssignmentEpoch(10)
2615+
.withMetadataHash(computeGroupHash(Map.of(
2616+
fooTopicName, computeTopicHash(fooTopicName, new KRaftCoordinatorMetadataImage(metadataImage))
2617+
))))
2618+
.build();
2619+
2620+
// All three members leave the consumer group.
2621+
for (var entry : List.of(
2622+
Map.entry(memberId1, "instance-id-1"),
2623+
Map.entry(memberId2, "instance-id-2"),
2624+
Map.entry(memberId3, "instance-id-3")
2625+
)) {
2626+
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
2627+
new ConsumerGroupHeartbeatRequestData()
2628+
.setGroupId(groupId)
2629+
.setMemberId(entry.getKey())
2630+
.setInstanceId(entry.getValue())
2631+
.setMemberEpoch(-2));
2632+
2633+
assertResponseEquals(
2634+
new ConsumerGroupHeartbeatResponseData()
2635+
.setMemberId(entry.getKey())
2636+
.setMemberEpoch(-2),
2637+
result.response()
2638+
);
2639+
}
2640+
2641+
// Member 2 rejoins with "range" assignor. The preferred assignor is still "uniform"
2642+
// (counts: uniform=2, range=1) so the group epoch is not bumped. Member 2 gets back
2643+
// its existing assignment at epoch 10.
2644+
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> rejoinResult2 = context.consumerGroupHeartbeat(
2645+
new ConsumerGroupHeartbeatRequestData()
2646+
.setMemberId(member2RejoinId)
2647+
.setGroupId(groupId)
2648+
.setInstanceId("instance-id-2")
2649+
.setMemberEpoch(0)
2650+
.setRebalanceTimeoutMs(5000)
2651+
.setServerAssignor("range")
2652+
.setSubscribedTopicNames(List.of("foo"))
2653+
.setTopicPartitions(List.of()));
2654+
2655+
assertResponseEquals(
2656+
new ConsumerGroupHeartbeatResponseData()
2657+
.setMemberId(member2RejoinId)
2658+
.setMemberEpoch(10)
2659+
.setHeartbeatIntervalMs(5000)
2660+
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
2661+
.setTopicPartitions(List.of(
2662+
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
2663+
.setTopicId(fooTopicId)
2664+
.setPartitions(List.of(2, 3))
2665+
))),
2666+
rejoinResult2.response()
2667+
);
2668+
2669+
// Member 3 rejoins with "range" assignor. The preferred assignor shifts to "range"
2670+
// (counts: uniform=1, range=2) so the group epoch is bumped to 11 and a new target
2671+
// assignment is computed using the range assignor.
2672+
rangeAssignor.prepareGroupAssignment(new GroupAssignment(Map.of(
2673+
memberId1, new MemberAssignmentImpl(mkAssignment(
2674+
mkTopicAssignment(fooTopicId, 0, 1)
2675+
)),
2676+
member2RejoinId, new MemberAssignmentImpl(mkAssignment(
2677+
mkTopicAssignment(fooTopicId, 2, 3)
2678+
)),
2679+
member3RejoinId, new MemberAssignmentImpl(mkAssignment(
2680+
mkTopicAssignment(fooTopicId, 4, 5)
2681+
))
2682+
)));
2683+
2684+
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> rejoinResult3 = context.consumerGroupHeartbeat(
2685+
new ConsumerGroupHeartbeatRequestData()
2686+
.setMemberId(member3RejoinId)
2687+
.setGroupId(groupId)
2688+
.setInstanceId("instance-id-3")
2689+
.setMemberEpoch(0)
2690+
.setRebalanceTimeoutMs(5000)
2691+
.setServerAssignor("range")
2692+
.setSubscribedTopicNames(List.of("foo"))
2693+
.setTopicPartitions(List.of()));
2694+
2695+
// Verify that the group epoch was bumped to 11 and the member got the new assignment.
2696+
assertResponseEquals(
2697+
new ConsumerGroupHeartbeatResponseData()
2698+
.setMemberId(member3RejoinId)
2699+
.setMemberEpoch(11)
2700+
.setHeartbeatIntervalMs(5000)
2701+
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
2702+
.setTopicPartitions(List.of(
2703+
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
2704+
.setTopicId(fooTopicId)
2705+
.setPartitions(List.of(4, 5))
2706+
))),
2707+
rejoinResult3.response()
2708+
);
2709+
}
2710+
25392711
@Test
25402712
public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() {
25412713
String groupId = "fooup";

0 commit comments

Comments
 (0)