@@ -56,7 +56,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
5656import org .apache .kafka .common .serialization .{StringDeserializer , StringSerializer }
5757import org .apache .kafka .coordinator .transaction .TransactionLogConfig
5858import org .apache .kafka .network .SocketServerConfigs
59- import org .apache .kafka .server .config .{ReplicationConfigs , ServerConfigs , ServerLogConfigs , ServerTopicConfigSynonyms }
59+ import org .apache .kafka .server .config .{KRaftConfigs , ReplicationConfigs , ServerConfigs , ServerLogConfigs , ServerTopicConfigSynonyms }
6060import org .apache .kafka .server .metrics .{KafkaYammerMetrics , MetricConfigs }
6161import org .apache .kafka .server .record .BrokerCompressionType
6262import org .apache .kafka .server .util .ShutdownableThread
@@ -116,30 +116,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
116116 clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new ones can be tested
117117
118118 (0 until numServers).foreach { brokerId =>
119-
120- val props = TestUtils .createBrokerConfig(brokerId)
121- props.put(SocketServerConfigs .ADVERTISED_LISTENERS_CONFIG , s " $SecureInternal://localhost:0, $SecureExternal://localhost:0 " )
122- props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS )
123- // Ensure that we can support multiple listeners per security protocol and multiple security protocols
124- props.put(SocketServerConfigs .LISTENERS_CONFIG , s " $SecureInternal://localhost:0, $SecureExternal://localhost:0 " )
125- props.put(SocketServerConfigs .LISTENER_SECURITY_PROTOCOL_MAP_CONFIG , s " PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER: $controllerListenerSecurityProtocol" )
126- props.put(ReplicationConfigs .INTER_BROKER_LISTENER_NAME_CONFIG , SecureInternal )
127- props.put(BrokerSecurityConfigs .SSL_CLIENT_AUTH_CONFIG , " requested" )
128- props.put(BrokerSecurityConfigs .SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG , " PLAIN" )
129- props.put(BrokerSecurityConfigs .SASL_ENABLED_MECHANISMS_CONFIG , kafkaServerSaslMechanisms.mkString(" ," ))
130- props.put(ServerLogConfigs .LOG_SEGMENT_BYTES_CONFIG , " 1048576" ) // low value to test log rolling on config update
131- props.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" ) // greater than one to test reducing threads
132- props.put(ServerLogConfigs .LOG_RETENTION_TIME_MILLIS_CONFIG , 1680000000 .toString)
133- props.put(ServerLogConfigs .LOG_RETENTION_TIME_HOURS_CONFIG , 168 .toString)
134-
135- props ++= sslProperties1
136- props ++= securityProps(sslProperties1, KEYSTORE_PROPS , listenerPrefix(SecureInternal ))
137-
138- // Set invalid top-level properties to ensure that listener config is used
139- // Don't set any dynamic configs here since they get overridden in tests
140- props ++= invalidSslProperties
141- props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS )
142- props ++= securityProps(sslProperties1, KEYSTORE_PROPS , listenerPrefix(SecureExternal ))
119+ val props = defaultStaticConfig(brokerId)
143120
144121 val kafkaConfig = KafkaConfig .fromProps(props)
145122
@@ -157,6 +134,33 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
157134 TestMetricsReporter .testReporters.clear()
158135 }
159136
137+ def defaultStaticConfig (brokerId : Int ): Properties = {
138+ val props = TestUtils .createBrokerConfig(brokerId)
139+ props.put(SocketServerConfigs .ADVERTISED_LISTENERS_CONFIG , s " $SecureInternal://localhost:0, $SecureExternal://localhost:0 " )
140+ props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS )
141+ // Ensure that we can support multiple listeners per security protocol and multiple security protocols
142+ props.put(SocketServerConfigs .LISTENERS_CONFIG , s " $SecureInternal://localhost:0, $SecureExternal://localhost:0 " )
143+ props.put(SocketServerConfigs .LISTENER_SECURITY_PROTOCOL_MAP_CONFIG , s " PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER: $controllerListenerSecurityProtocol" )
144+ props.put(ReplicationConfigs .INTER_BROKER_LISTENER_NAME_CONFIG , SecureInternal )
145+ props.put(BrokerSecurityConfigs .SSL_CLIENT_AUTH_CONFIG , " requested" )
146+ props.put(BrokerSecurityConfigs .SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG , " PLAIN" )
147+ props.put(BrokerSecurityConfigs .SASL_ENABLED_MECHANISMS_CONFIG , kafkaServerSaslMechanisms.mkString(" ," ))
148+ props.put(ServerLogConfigs .LOG_SEGMENT_BYTES_CONFIG , " 1048576" ) // low value to test log rolling on config update
149+ props.put(ReplicationConfigs .NUM_REPLICA_FETCHERS_CONFIG , " 2" ) // greater than one to test reducing threads
150+ props.put(ServerLogConfigs .LOG_RETENTION_TIME_MILLIS_CONFIG , 1680000000 .toString)
151+ props.put(ServerLogConfigs .LOG_RETENTION_TIME_HOURS_CONFIG , 168 .toString)
152+
153+ props ++= sslProperties1
154+ props ++= securityProps(sslProperties1, KEYSTORE_PROPS , listenerPrefix(SecureInternal ))
155+
156+ // Set invalid top-level properties to ensure that listener config is used
157+ // Don't set any dynamic configs here since they get overridden in tests
158+ props ++= invalidSslProperties
159+ props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS )
160+ props ++= securityProps(sslProperties1, KEYSTORE_PROPS , listenerPrefix(SecureExternal ))
161+ props
162+ }
163+
160164 @ AfterEach
161165 override def tearDown (): Unit = {
162166 clientThreads.foreach(_.interrupt())
@@ -1090,6 +1094,38 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
10901094 verifyConfiguration(true )
10911095 }
10921096
1097+ @ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedQuorumAndGroupProtocolNames )
1098+ @ MethodSource (Array (" getTestQuorumAndGroupProtocolParametersAll" ))
1099+ def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs (quorum : String , groupProtocol : String ): Unit = {
1100+ // modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
1101+ val props = defaultStaticConfig(numServers)
1102+ props.put(KRaftConfigs .METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG , " 10000" )
1103+
1104+ val kafkaConfig = KafkaConfig .fromProps(props)
1105+ val newBroker = createBroker(kafkaConfig).asInstanceOf [BrokerServer ]
1106+ servers += newBroker
1107+
1108+ alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal ))
1109+
1110+ TestUtils .ensureConsistentKRaftMetadata(servers, controllerServer)
1111+
1112+ TestUtils .waitUntilTrue(
1113+ () => newBroker.raftManager.replicatedLog.latestSnapshotId().isPresent,
1114+ " metadata snapshot not present on broker" ,
1115+ 30000L
1116+ )
1117+
1118+ // shutdown broker and attempt to restart it after invalidating its static configurations
1119+ newBroker.shutdown()
1120+ newBroker.awaitShutdown()
1121+
1122+ val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
1123+ invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS , listenerPrefix(SecureExternal )))
1124+ newBroker.config.updateCurrentConfig(KafkaConfig .fromProps(invalidStaticConfigs))
1125+
1126+ newBroker.startup()
1127+ }
1128+
10931129 private def awaitInitialPositions (consumer : Consumer [_, _]): Unit = {
10941130 TestUtils .pollUntilTrue(consumer, () => ! consumer.assignment.isEmpty, " Timed out while waiting for assignment" )
10951131 consumer.assignment.forEach(tp => consumer.position(tp))
0 commit comments