1818
1919import org .apache .kafka .clients .admin .AdminClientConfig ;
2020import org .apache .kafka .clients .consumer .ConsumerConfig ;
21- import org .apache .kafka .common .errors .TimeoutException ;
2221import org .apache .kafka .common .serialization .Serdes ;
2322import org .apache .kafka .common .utils .internals .Exit ;
2423import org .apache .kafka .streams .GroupProtocol ;
4645import java .util .Map ;
4746import java .util .Properties ;
4847import java .util .Set ;
49- import java .util .concurrent .ExecutionException ;
5048import java .util .concurrent .atomic .AtomicBoolean ;
5149import java .util .concurrent .atomic .AtomicReference ;
5250import java .util .stream .Collectors ;
@@ -92,6 +90,7 @@ public static void setup() throws Exception {
9290 @ AfterAll
9391 public static void closeCluster () {
9492 streams .close ();
93+ cluster .deleteTopics (INPUT_TOPIC , OUTPUT_TOPIC , INPUT_TOPIC_2 , OUTPUT_TOPIC_2 );
9594 cluster .stop ();
9695 cluster = null ;
9796 }
@@ -215,35 +214,37 @@ public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throw
215214 "Topic " + INPUT_TOPIC_2 + " not created"
216215 );
217216 KafkaStreams streams2 = new KafkaStreams (topology (INPUT_TOPIC_2 , OUTPUT_TOPIC_2 ), streamsProp (APP_ID_2 ));
218- startApplicationAndWaitUntilRunning (streams2 );
219-
220- final List <String > expectedHeader = List .of ("GROUP" , "TARGET-ASSIGNMENT-EPOCH" , "TOPOLOGY-EPOCH" , "MEMBER" , "MEMBER-PROTOCOL" , "MEMBER-EPOCH" , "PROCESS" , "CLIENT-ID" , "ASSIGNMENTS" );
221- final Set <List <String >> expectedRows1 = Set .of (
222- List .of (APP_ID , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[1];" , "1:[1];" , "TARGET-ACTIVE:" , "0:[1];" , "1:[1];" ),
223- List .of (APP_ID , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[0];" , "1:[0];" , "TARGET-ACTIVE:" , "0:[0];" , "1:[0];" ));
224- final Set <List <String >> expectedRows2 = Set .of (
225- List .of (APP_ID_2 , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "1:[0];" , "TARGET-ACTIVE:" , "1:[0];" ),
226- List .of (APP_ID_2 , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[0];" , "TARGET-ACTIVE:" , "0:[0];" ));
227- final Map <String , Set <List <String >>> expectedRowsMap = new HashMap <>();
228- expectedRowsMap .put (APP_ID , expectedRows1 );
229- expectedRowsMap .put (APP_ID_2 , expectedRows2 );
230-
231- // The member and process names as well as client-id are not deterministic, so we don't care about them.
232- // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to rebalance timing, so we don't care about them either.
233- final List <Integer > dontCares = List .of (1 , 3 , 5 , 6 , 7 );
234-
235- validateDescribeOutput (
236- List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--members" , "--verbose" , "--group" , APP_ID , "--group" , APP_ID_2 ),
237- expectedHeader , expectedRowsMap , dontCares );
238- validateDescribeOutput (
239- List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--verbose" , "--members" , "--group" , APP_ID , "--group" , APP_ID_2 ),
240- expectedHeader , expectedRowsMap , dontCares );
241- validateDescribeOutput (
242- List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--verbose" , "--members" , "--all-groups" ),
243- expectedHeader , expectedRowsMap , dontCares );
244-
245- streams2 .close ();
246- streams2 .cleanUp ();
217+ try {
218+ startApplicationAndWaitUntilRunning (streams2 );
219+
220+ final List <String > expectedHeader = List .of ("GROUP" , "TARGET-ASSIGNMENT-EPOCH" , "TOPOLOGY-EPOCH" , "MEMBER" , "MEMBER-PROTOCOL" , "MEMBER-EPOCH" , "PROCESS" , "CLIENT-ID" , "ASSIGNMENTS" );
221+ final Set <List <String >> expectedRows1 = Set .of (
222+ List .of (APP_ID , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[1];" , "1:[1];" , "TARGET-ACTIVE:" , "0:[1];" , "1:[1];" ),
223+ List .of (APP_ID , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[0];" , "1:[0];" , "TARGET-ACTIVE:" , "0:[0];" , "1:[0];" ));
224+ final Set <List <String >> expectedRows2 = Set .of (
225+ List .of (APP_ID_2 , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "1:[0];" , "TARGET-ACTIVE:" , "1:[0];" ),
226+ List .of (APP_ID_2 , "" , "0" , "" , "streams" , "" , "" , "" , "ACTIVE:" , "0:[0];" , "TARGET-ACTIVE:" , "0:[0];" ));
227+ final Map <String , Set <List <String >>> expectedRowsMap = new HashMap <>();
228+ expectedRowsMap .put (APP_ID , expectedRows1 );
229+ expectedRowsMap .put (APP_ID_2 , expectedRows2 );
230+
231+ // The member and process names as well as client-id are not deterministic, so we don't care about them.
232+ // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to rebalance timing, so we don't care about them either.
233+ final List <Integer > dontCares = List .of (1 , 3 , 5 , 6 , 7 );
234+
235+ validateDescribeOutput (
236+ List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--members" , "--verbose" , "--group" , APP_ID , "--group" , APP_ID_2 ),
237+ expectedHeader , expectedRowsMap , dontCares );
238+ validateDescribeOutput (
239+ List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--verbose" , "--members" , "--group" , APP_ID , "--group" , APP_ID_2 ),
240+ expectedHeader , expectedRowsMap , dontCares );
241+ validateDescribeOutput (
242+ List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--verbose" , "--members" , "--all-groups" ),
243+ expectedHeader , expectedRowsMap , dontCares );
244+ } finally {
245+ streams2 .close ();
246+ streams2 .cleanUp ();
247+ }
247248 }
248249
249250 @ Test
@@ -259,15 +260,6 @@ public void testDescribeNonExistingStreamsGroup() {
259260 List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--verbose" , "--members" , "--group" , nonExistingGroup ), errorMessage );
260261 }
261262
262- @ Test
263- public void testDescribeStreamsGroupWithShortTimeout () {
264- // Note: 1ms timeout may not always trigger timeout on fast machines with warm groups
265- // Using 0ms to ensure timeout
266- List <String > args = List .of ("--bootstrap-server" , bootstrapServers , "--describe" , "--members" , "--verbose" , "--group" , APP_ID , "--timeout" , "0" );
267- Throwable e = assertThrows (ExecutionException .class , () -> getStreamsGroupService (args .toArray (new String [0 ])).describeGroups ());
268- assertEquals (TimeoutException .class , e .getCause ().getClass ());
269- }
270-
271263 private static Topology topology (String inputTopic , String outputTopic ) {
272264 final StreamsBuilder builder = new StreamsBuilder ();
273265 builder .stream (inputTopic , Consumed .with (Serdes .String (), Serdes .String ()))
0 commit comments