1717package org .apache .kafka .clients ;
1818
1919import org .apache .kafka .clients .admin .NewTopic ;
20+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
21+ import org .apache .kafka .clients .consumer .GroupProtocol ;
22+ import org .apache .kafka .clients .producer .ProducerConfig ;
2023import org .apache .kafka .clients .producer .ProducerRecord ;
24+ import org .apache .kafka .common .TopicPartition ;
2125import org .apache .kafka .common .config .TopicConfig ;
2226import org .apache .kafka .common .test .ClusterInstance ;
27+ import org .apache .kafka .common .test .TestUtils ;
2328import org .apache .kafka .common .test .api .ClusterConfigProperty ;
2429import org .apache .kafka .common .test .api .ClusterTest ;
2530import org .apache .kafka .common .test .api .Type ;
3843
3944public class ClientRebootstrapTest {
4045 private static final String TOPIC = "topic" ;
46+ private static final int PARTITIONS = 1 ;
4147 private static final int REPLICAS = 2 ;
4248
4349 @ ClusterTest (
@@ -55,7 +61,7 @@ public void testAdminRebootstrap(ClusterInstance clusterInstance) {
5561 clusterInstance .shutdownBroker (broker0 );
5662
5763 try (var admin = clusterInstance .admin ()) {
58- admin .createTopics (List .of (new NewTopic (TOPIC , 1 , (short ) REPLICAS )));
64+ admin .createTopics (List .of (new NewTopic (TOPIC , PARTITIONS , (short ) REPLICAS )));
5965
6066 // Only the broker 1 is available for the admin client during the bootstrap.
6167 assertDoesNotThrow (() -> admin .listTopics ().names ().get (timeout , TimeUnit .SECONDS ).contains (TOPIC ));
@@ -84,7 +90,7 @@ public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) {
8490 clusterInstance .shutdownBroker (broker0 );
8591
8692 var admin = clusterInstance .admin (Map .of (CommonClientConfigs .METADATA_RECOVERY_STRATEGY_CONFIG , "none" ));
87- admin .createTopics (List .of (new NewTopic (TOPIC , 1 , (short ) REPLICAS )));
93+ admin .createTopics (List .of (new NewTopic (TOPIC , PARTITIONS , (short ) REPLICAS )));
8894
8995 // Only the broker 1 is available for the admin client during the bootstrap.
9096 assertDoesNotThrow (() -> admin .listTopics ().names ().get (60 , TimeUnit .SECONDS ).contains (TOPIC ));
@@ -109,7 +115,7 @@ public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) {
109115 )
110116 public void testProducerRebootstrap (ClusterInstance clusterInstance ) throws ExecutionException , InterruptedException {
111117 try (var admin = clusterInstance .admin ()) {
112- admin .createTopics (List .of (new NewTopic (TOPIC , 1 , (short ) REPLICAS )));
118+ admin .createTopics (List .of (new NewTopic (TOPIC , PARTITIONS , (short ) REPLICAS )));
113119 }
114120
115121 var broker0 = 0 ;
@@ -144,7 +150,7 @@ public void testProducerRebootstrap(ClusterInstance clusterInstance) throws Exec
144150 )
145151 public void testProducerRebootstrapDisabled (ClusterInstance clusterInstance ) throws ExecutionException , InterruptedException {
146152 try (var admin = clusterInstance .admin ()) {
147- admin .createTopics (List .of (new NewTopic (TOPIC , 1 , (short ) REPLICAS )));
153+ admin .createTopics (List .of (new NewTopic (TOPIC , PARTITIONS , (short ) REPLICAS )));
148154 }
149155
150156 var broker0 = 0 ;
@@ -168,4 +174,123 @@ public void testProducerRebootstrapDisabled(ClusterInstance clusterInstance) thr
168174 // Since the brokers cached during the bootstrap are offline, the producer needs to wait the default timeout for other threads.
169175 producer .close (Duration .ZERO );
170176 }
177+
178+ public void consumerRebootstrap (ClusterInstance clusterInstance , GroupProtocol groupProtocol ) throws InterruptedException , ExecutionException {
179+ clusterInstance .createTopic (TOPIC , PARTITIONS , (short ) REPLICAS );
180+
181+ var broker0 = 0 ;
182+ var broker1 = 1 ;
183+ var partitions = List .of (new TopicPartition (TOPIC , 0 ));
184+
185+ try (var producer = clusterInstance .producer (Map .of (ProducerConfig .ACKS_CONFIG , "-1" ))) {
186+ var recordMetadata = producer .send (new ProducerRecord <>(TOPIC , "value 0" .getBytes ())).get ();
187+ assertEquals (0 , recordMetadata .offset ());
188+ }
189+
190+ clusterInstance .shutdownBroker (broker0 );
191+
192+ try (var consumer = clusterInstance .consumer (Map .of (ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name ))) {
193+ // Only the server 1 is available for the consumer during the bootstrap.
194+ consumer .assign (partitions );
195+ consumer .seekToBeginning (partitions );
196+ TestUtils .waitForCondition (() -> consumer .poll (Duration .ofMillis (100 )).count () == 1 , 10 * 1000 , "Failed to poll data." );
197+
198+ // Bring back the server 0 and shut down 1.
199+ clusterInstance .shutdownBroker (broker1 );
200+ clusterInstance .startBroker (broker0 );
201+
202+ try (var producer = clusterInstance .producer (Map .of (ProducerConfig .ACKS_CONFIG , "-1" ))) {
203+ var recordMetadata = producer .send (new ProducerRecord <>(TOPIC , "value 1" .getBytes ())).get ();
204+ assertEquals (1 , recordMetadata .offset ());
205+ }
206+
207+ // The server 1 originally cached during the bootstrap, is offline.
208+ // However, the server 0 from the bootstrap list is online.
209+ TestUtils .waitForCondition (() -> consumer .poll (Duration .ofMillis (100 )).count () == 1 , 10 * 1000 , "Failed to poll data." );
210+ }
211+ }
212+
213+ @ ClusterTest (
214+ brokers = REPLICAS ,
215+ types = {Type .KRAFT },
216+ serverProperties = {
217+ @ ClusterConfigProperty (key = TopicConfig .UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG , value = "true" ),
218+ @ ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = "2" ),
219+ })
220+ public void testClassicConsumerRebootstrap (ClusterInstance clusterInstance ) throws InterruptedException , ExecutionException {
221+ consumerRebootstrap (clusterInstance , GroupProtocol .CLASSIC );
222+ }
223+
224+ @ ClusterTest (
225+ brokers = REPLICAS ,
226+ types = {Type .KRAFT },
227+ serverProperties = {
228+ @ ClusterConfigProperty (key = TopicConfig .UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG , value = "true" ),
229+ @ ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = "2" ),
230+ })
231+ public void testConsumerRebootstrap (ClusterInstance clusterInstance ) throws InterruptedException , ExecutionException {
232+ consumerRebootstrap (clusterInstance , GroupProtocol .CONSUMER );
233+ }
234+
235+ public void consumerRebootstrapDisabled (ClusterInstance clusterInstance , GroupProtocol groupProtocol ) throws InterruptedException , ExecutionException {
236+ clusterInstance .createTopic (TOPIC , PARTITIONS , (short ) REPLICAS );
237+
238+ var broker0 = 0 ;
239+ var broker1 = 1 ;
240+ var tp = new TopicPartition (TOPIC , 0 );
241+
242+ try (var producer = clusterInstance .producer (Map .of (ProducerConfig .ACKS_CONFIG , "-1" ))) {
243+ var recordMetadata = producer .send (new ProducerRecord <>(TOPIC , "value 0" .getBytes ())).get ();
244+ assertEquals (0 , recordMetadata .offset ());
245+ }
246+
247+ clusterInstance .shutdownBroker (broker0 );
248+
249+ try (var consumer = clusterInstance .consumer (Map .of (
250+ CommonClientConfigs .METADATA_RECOVERY_STRATEGY_CONFIG , "none" ,
251+ ConsumerConfig .GROUP_PROTOCOL_CONFIG , groupProtocol .name )
252+ )) {
253+ // Only the server 1 is available for the consumer during the bootstrap.
254+ consumer .assign (List .of (tp ));
255+ consumer .seekToBeginning (List .of (tp ));
256+ TestUtils .waitForCondition (() -> consumer .poll (Duration .ofMillis (100 )).count () == 1 , 10 * 1000 , "Failed to poll data." );
257+
258+ // Bring back the server 0 and shut down 1.
259+ clusterInstance .shutdownBroker (broker1 );
260+ clusterInstance .startBroker (broker0 );
261+
262+ try (var producer = clusterInstance .producer (Map .of (ProducerConfig .ACKS_CONFIG , "-1" ))) {
263+ var recordMetadata = producer .send (new ProducerRecord <>(TOPIC , "value 1" .getBytes ())).get ();
264+ assertEquals (1 , recordMetadata .offset ());
265+ }
266+
267+ // The server 1 originally cached during the bootstrap, is offline.
268+ // However, the server 0 from the bootstrap list is online.
269+ assertEquals (0 , consumer .poll (Duration .ofMillis (100 )).count ());
270+ }
271+ }
272+
273+ @ ClusterTest (
274+ brokers = REPLICAS ,
275+ types = {Type .KRAFT },
276+ serverProperties = {
277+ @ ClusterConfigProperty (key = TopicConfig .UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG , value = "true" ),
278+ @ ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = "2" )
279+ }
280+ )
281+ public void testClassicConsumerRebootstrapDisabled (ClusterInstance clusterInstance ) throws InterruptedException , ExecutionException {
282+ consumerRebootstrapDisabled (clusterInstance , GroupProtocol .CLASSIC );
283+ }
284+
285+ @ ClusterTest (
286+ brokers = REPLICAS ,
287+ types = {Type .KRAFT },
288+ serverProperties = {
289+ @ ClusterConfigProperty (key = TopicConfig .UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG , value = "true" ),
290+ @ ClusterConfigProperty (key = GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG , value = "2" )
291+ }
292+ )
293+ public void testConsumerRebootstrapDisabled (ClusterInstance clusterInstance ) throws InterruptedException , ExecutionException {
294+ consumerRebootstrapDisabled (clusterInstance , GroupProtocol .CONSUMER );
295+ }
171296}
0 commit comments