Skip to content

Commit 90a4c93

Browse files
MINOR: Fix testRackAwareAssignment flake (#22154)
The last part of testRackAwareAssignment was found to be flaky. This part moves all topic partitions to different racks and waits for consumer assignments to settle. Each of the three consumers is expected to revoke all its partitions and be assigned partitions previously held by another within a 15 second timeout. This timeout is not always sufficient. The consumer heartbeat interval is left at the default of 5,000 ms and each consumer polls every 3,000 ms. In the worst case, it takes a consumer around 7,000 ms to reconcile an assignment change. An additional 3,000 ms round of polling may be required when a consumer needs to auto-commit offsets. Two rounds of reconciliation must happen within 15,000 ms. The timeline of an example failing run looks like: -02.956 Group coordinator computes target assignment at epoch=6 consumer0=[0] consumer1=[1, 2] consumer2=[3, 4, 5] +00.000 15 second timeout starts +03.179 consumer0 heartbeats This is the first heartbeat since the rack reassignments. +03.179 Group coordinator computes target assignment at epoch=7 consumer0=[5] consumer1=[3, 4] consumer2=[0, 1, 2] +03.186 consumer0 heartbeat receives assignment [] +04.151 consumer1 starts poll() +04.877 consumer1 heartbeats +04.878 consumer1 heartbeat receives assignment [] +05.155 consumer1 ends poll() +07.259 consumer1 starts poll() +07.259 consumer1 sends auto-commit with offsets for [1, 2] +07.288 consumer1 receives auto-commit response +08.263 consumer1 ends poll() +10.379 consumer1 starts poll() +10.379 consumer1 calls onPartitionsRevoked with [1, 2] +10.379 consumer1 calls onPartitionsAssigned with [] +10.382 consumer1 heartbeats with owned partitions [] +10.387 consumer1 heartbeat receives assignment [3, 4] +10.483 consumer1 calls onPartitionsAssigned [3, 4] +10.483 consumer1 heartbeats with owned partitions [3, 4] +11.384 consumer1 ends poll() +15.000 15 second timeout elapses and the test fails +15.300 consumer2 heartbeat receives assignment [0, 1, 2] To fix the test we: * Make config changes to reduce the reconciliation time. This also reduces the test duration from 60 seconds to 20 seconds. * Disable auto-commit, since the consumers do not consume any records. * Reduce the heartbeat interval to 1,000 ms. * Reduce the poll timeouts to 100 ms, so that polls happen every 300 ms. * Raise the final timeout to 30 seconds, since under heavy CI load, the reduced intervals above aren't effective. Reviewers: Lianet Magrans <lmagrans@confluent.io>, David Jacot <djacot@confluent.io>
1 parent 44bafc6 commit 90a4c93

1 file changed

Lines changed: 20 additions & 13 deletions

File tree

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
240240
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
241241
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
242242
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
243+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
244+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
243245
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor"),
244246
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
245247
}
@@ -251,6 +253,8 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
251253
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
252254
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
253255
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
256+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
257+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
254258
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor"),
255259
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
256260
}
@@ -263,16 +267,19 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
263267
Consumer<byte[], byte[]> consumer0 = clusterInstance.consumer(Map.of(
264268
ConsumerConfig.GROUP_ID_CONFIG, "group0",
265269
ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
270+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
266271
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
267272
));
268273
Consumer<byte[], byte[]> consumer1 = clusterInstance.consumer(Map.of(
269274
ConsumerConfig.GROUP_ID_CONFIG, "group0",
270275
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
276+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
271277
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
272278
));
273279
Consumer<byte[], byte[]> consumer2 = clusterInstance.consumer(Map.of(
274280
ConsumerConfig.GROUP_ID_CONFIG, "group0",
275281
ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
282+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
276283
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
277284
))
278285
) {
@@ -288,9 +295,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
288295
consumer2.subscribe(List.of(topic));
289296

290297
TestUtils.waitForCondition(() -> {
291-
consumer0.poll(Duration.ofMillis(1000));
292-
consumer1.poll(Duration.ofMillis(1000));
293-
consumer2.poll(Duration.ofMillis(1000));
298+
consumer0.poll(Duration.ofMillis(100));
299+
consumer1.poll(Duration.ofMillis(100));
300+
consumer2.poll(Duration.ofMillis(100));
294301
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
295302
consumer1.assignment().isEmpty() &&
296303
consumer2.assignment().isEmpty();
@@ -305,9 +312,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
305312
);
306313
clusterInstance.waitTopicCreation(topic, 3);
307314
TestUtils.waitForCondition(() -> {
308-
consumer0.poll(Duration.ofMillis(1000));
309-
consumer1.poll(Duration.ofMillis(1000));
310-
consumer2.poll(Duration.ofMillis(1000));
315+
consumer0.poll(Duration.ofMillis(100));
316+
consumer1.poll(Duration.ofMillis(100));
317+
consumer2.poll(Duration.ofMillis(100));
311318
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
312319
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
313320
consumer2.assignment().isEmpty();
@@ -322,9 +329,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
322329
);
323330
clusterInstance.waitTopicCreation(topic, 6);
324331
TestUtils.waitForCondition(() -> {
325-
consumer0.poll(Duration.ofMillis(1000));
326-
consumer1.poll(Duration.ofMillis(1000));
327-
consumer2.poll(Duration.ofMillis(1000));
332+
consumer0.poll(Duration.ofMillis(100));
333+
consumer1.poll(Duration.ofMillis(100));
334+
consumer2.poll(Duration.ofMillis(100));
328335
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
329336
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
330337
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
@@ -346,13 +353,13 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
346353
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
347354
)).all().get();
348355
TestUtils.waitForCondition(() -> {
349-
consumer0.poll(Duration.ofMillis(1000));
350-
consumer1.poll(Duration.ofMillis(1000));
351-
consumer2.poll(Duration.ofMillis(1000));
356+
consumer0.poll(Duration.ofMillis(100));
357+
consumer1.poll(Duration.ofMillis(100));
358+
consumer2.poll(Duration.ofMillis(100));
352359
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
353360
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
354361
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
355-
}, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
362+
}, 30000, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
356363
}
357364
}
358365

0 commit comments

Comments
 (0)