Skip to content

Commit c75e10d

Browse files
authored
KAFKA-20424 : clients: Update KafkaConsumerTest comments,tests with relevant protocol (#22144)
Ref : https://issues.apache.org/jira/browse/KAFKA-20424 - 18 tests with updated comments (classic only) - 1 test (testAutoCommitSentBeforePositionUpdate) now runs for both protocols. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 90a4c93 commit c75e10d

1 file changed

Lines changed: 20 additions & 39 deletions

File tree

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -485,8 +485,7 @@ public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupP
485485
assertInstanceOf(ClientTelemetryReporter.class, consumer.metricsRegistry().reporters().get(0));
486486
}
487487

488-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
489-
// Once it is implemented, this should use both group protocols.
488+
// NOTE: the assertion path is specific to the CLASSIC consumer.
490489
@ParameterizedTest
491490
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
492491
@SuppressWarnings("unchecked")
@@ -502,8 +501,7 @@ public void testPollReturnsRecords(GroupProtocol groupProtocol) {
502501
assertEquals(new OffsetAndMetadata(5), records.nextOffsets().get(tp0));
503502
}
504503

505-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
506-
// Once it is implemented, this should use both group protocols.
504+
// NOTE: the assertion path is specific to the CLASSIC consumer.
507505
@ParameterizedTest
508506
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
509507
@SuppressWarnings("unchecked")
@@ -790,6 +788,7 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
790788
() -> consumer.subscribe(Pattern.compile("")));
791789
}
792790

791+
// NOTE: this test configures partition.assignment.strategy, which only applies to the CLASSIC group protocol.
793792
@ParameterizedTest
794793
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
795794
public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
@@ -968,8 +967,7 @@ private <K, V> KafkaConsumer<K, V> newConsumer(Properties props,
968967
return newConsumer(propsToMap(props), keyDeserializer, valueDeserializer);
969968
}
970969

971-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
972-
// Once it is implemented, this should use both group protocols.
970+
// NOTE: this test exercises the Heartbeat RPC, which does not exist in the CONSUMER group protocol.
973971
@ParameterizedTest
974972
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
975973
public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
@@ -1001,8 +999,7 @@ public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
1001999
assertTrue(heartbeatReceived.get());
10021000
}
10031001

1004-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1005-
// Once it is implemented, this should use both group protocols.
1002+
// NOTE: this test exercises the Heartbeat RPC, which does not exist in the CONSUMER group protocol.
10061003
@ParameterizedTest
10071004
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
10081005
public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol groupProtocol) throws Exception {
@@ -1410,10 +1407,8 @@ public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
14101407
assertNull(committed.get(tp1));
14111408
}
14121409

1413-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1414-
// Once it is implemented, this should use both group protocols.
14151410
@ParameterizedTest
1416-
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
1411+
@EnumSource(GroupProtocol.class)
14171412
public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol) {
14181413
ConsumerMetadata metadata = createMetadata(subscription);
14191414
MockClient client = new MockClient(time, metadata);
@@ -1444,8 +1439,7 @@ public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol)
14441439
assertTrue(commitReceived.get());
14451440
}
14461441

1447-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1448-
// Once it is implemented, this should use both group protocols.
1442+
// NOTE: the assertion path is specific to the CLASSIC consumer.
14491443
@ParameterizedTest
14501444
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
14511445
public void testRegexSubscription(GroupProtocol groupProtocol) {
@@ -1473,8 +1467,7 @@ public void testRegexSubscription(GroupProtocol groupProtocol) {
14731467
assertEquals(Set.of(tp0), consumer.assignment());
14741468
}
14751469

1476-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1477-
// Once it is implemented, this should use both group protocols.
1470+
// NOTE: the assertion path is specific to the CLASSIC consumer.
14781471
@ParameterizedTest
14791472
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
14801473
public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
@@ -1510,8 +1503,7 @@ public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
15101503
assertEquals(Set.of(otherTopic), consumer.subscription());
15111504
}
15121505

1513-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1514-
// Once it is implemented, this should use both group protocols.
1506+
// NOTE: the assertion path is specific to the CLASSIC consumer.
15151507
@ParameterizedTest
15161508
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
15171509
public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws Exception {
@@ -1614,8 +1606,7 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupPro
16141606
* Upon unsubscribing from subscribed topics the consumer subscription and assignment
16151607
* are both updated right away but its consumed offsets are not auto committed.
16161608
*/
1617-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1618-
// Once it is implemented, this should use both group protocols.
1609+
// NOTE: the assertion path is specific to the CLASSIC consumer.
16191610
@ParameterizedTest
16201611
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
16211612
@SuppressWarnings("unchecked")
@@ -1733,8 +1724,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProt
17331724
* Upon unsubscribing from subscribed topics, the assigned partitions immediately
17341725
* change but if auto-commit is disabled the consumer offsets are not committed.
17351726
*/
1736-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1737-
// Once it is implemented, this should use both group protocols.
1727+
// NOTE: the assertion path is specific to the CLASSIC consumer.
17381728
@ParameterizedTest
17391729
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
17401730
public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupProtocol) {
@@ -1852,8 +1842,7 @@ public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled(GroupPr
18521842
client.requests().clear();
18531843
}
18541844

1855-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1856-
// Once it is implemented, this should use both group protocols.
1845+
// NOTE: the assertion path is specific to the CLASSIC consumer.
18571846
@ParameterizedTest
18581847
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
18591848
public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol groupProtocol) {
@@ -1878,8 +1867,7 @@ public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(Gro
18781867
assertEquals(partitionRevoked + singleTopicPartition, unsubscribeException.getCause().getMessage());
18791868
}
18801869

1881-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
1882-
// Once it is implemented, this should use both group protocols.
1870+
// NOTE: the assertion path is specific to the CLASSIC consumer.
18831871
@ParameterizedTest
18841872
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
18851873
public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol groupProtocol) throws Exception {
@@ -2138,16 +2126,14 @@ public void testCloseTimeoutDueToNoResponseForCloseFetchRequest(GroupProtocol gr
21382126
consumerCloseTest(groupProtocol, closeTimeoutMs, serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
21392127
}
21402128

2141-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2142-
// Once it is implemented, this should use both group protocols.
2129+
// NOTE: this test drives consumerCloseTest, whose close/rebalance mock setup is Classic-specific.
21432130
@ParameterizedTest
21442131
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
21452132
public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception {
21462133
consumerCloseTest(groupProtocol, 5000, Collections.emptyList(), 5000, false);
21472134
}
21482135

2149-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2150-
// Once it is implemented, this should use both group protocols.
2136+
// NOTE: this test drives consumerCloseTest with an OffsetCommit + LeaveGroup flow that is Classic-specific.
21512137
@ParameterizedTest
21522138
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
21532139
public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws Exception {
@@ -2157,16 +2143,14 @@ public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws Exception
21572143
consumerCloseTest(groupProtocol, 5000, List.of(commitResponse), 5000, false);
21582144
}
21592145

2160-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2161-
// Once it is implemented, this should use both group protocols.
2146+
// NOTE: this test drives consumerCloseTest, whose close/rebalance mock setup is Classic-specific.
21622147
@ParameterizedTest
21632148
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
21642149
public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
21652150
consumerCloseTest(groupProtocol, 0, Collections.emptyList(), 0, false);
21662151
}
21672152

2168-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2169-
// Once it is implemented, this should use both group protocols.
2153+
// NOTE: this test drives consumerCloseTest, whose close/rebalance mock setup is Classic-specific.
21702154
@ParameterizedTest
21712155
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
21722156
public void testCloseInterrupt(GroupProtocol groupProtocol) throws Exception {
@@ -2600,8 +2584,7 @@ public void testMeasureCommittedDuration(GroupProtocol groupProtocol) {
26002584
assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
26012585
}
26022586

2603-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2604-
// Once it is implemented, this should use both group protocols.
2587+
// NOTE: the assertion path is specific to the CLASSIC consumer.
26052588
@ParameterizedTest
26062589
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
26072590
public void testRebalanceException(GroupProtocol groupProtocol) {
@@ -2637,8 +2620,7 @@ public void testRebalanceException(GroupProtocol groupProtocol) {
26372620
assertTrue(subscription.assignedPartitions().isEmpty());
26382621
}
26392622

2640-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2641-
// Once it is implemented, this should use both group protocols.
2623+
// NOTE: the assertion path is specific to the CLASSIC consumer.
26422624
@ParameterizedTest
26432625
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
26442626
public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws InterruptedException {
@@ -2779,8 +2761,7 @@ public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
27792761
consumer.close(CloseOptions.timeout(Duration.ZERO));
27802762
}
27812763

2782-
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
2783-
// Once it is implemented, this should use both group protocols.
2764+
// NOTE: the assertion path is specific to the CLASSIC consumer.
27842765
@ParameterizedTest
27852766
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
27862767
public void testGetGroupMetadata(GroupProtocol groupProtocol) {

0 commit comments

Comments
 (0)