Skip to content

Commit 3be19e4

Browse files
0xffff-zhiyantzy-0x7cf
authored andcommitted
KAFKA-19851; Delete dynamic configs that were removed by Kafka (apache#21053)
When upgrading from Kafka 3.x to 4.0, the metadata log may contain dynamic configurations that were removed in 4.0 (e.g., message.format.version per KIP-724). These removed configs cause InvalidConfigurationException when users attempt to modify any configuration, because validation checks all existing configs including the removed ones. Adds filtering to prevent unsupported or invalid configurations from being applied during metadata replay. The filtering is implemented using a SupportedConfigChecker interface that is injected via dependency injection through Builder patterns. When a ConfigRecord is replayed, the checker validates whether the configuration name is supported for the given resource type. Unsupported configurations are silently ignored during replay, ensuring that only valid configurations enter the in-memory state. The SupportedConfigChecker interface provides a default TRUE implementation that accepts all configurations. The actual filtering logic is implemented by DefaultSupportedConfigChecker, which maintains a whitelist of valid configuration names per resource type (TOPIC, CLIENT_METRICS, GROUP) based on the actual config definitions. The filtering occurs in both ConfigurationDelta#replay and ConfigurationControlManager#replay methods. Added unit tests to ensure: - Removed configurations are filtered during the replay operations - Only supported configurations appear in the resulting metadata images - The filtering works correctly for all resource types (TOPIC, BROKER, CLIENT_METRICS, GROUP) - DefaultSupportedConfigChecker correctly identifies supported vs unsupported configurations for each resource type Reviewers: José Armando García Sancio <jsancio@apache.org>, Jun Rao <junrao@apache.org>, Alyssa Huang <ahuang@confluent.io>, Kevin Wu <kevin.wu2412@gmail.com>, Andrew Grant <agrant@confluent.io> (cherry picked from commit a35d649)
1 parent a7f0585 commit 3be19e4

36 files changed

Lines changed: 654 additions & 64 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2629,6 +2629,7 @@ project(':shell') {
26292629
implementation project(':core')
26302630
implementation project(':metadata')
26312631
implementation project(':raft')
2632+
implementation project(':server')
26322633

26332634
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
26342635
implementation libs.jacksonJakartarsJsonProvider

checkstyle/import-control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@
272272
<allow pkg="org.apache.kafka.queue"/>
273273
<allow pkg="org.apache.kafka.raft"/>
274274
<allow pkg="org.apache.kafka.server.common" />
275+
<allow pkg="org.apache.kafka.server.config" />
275276
<allow pkg="org.apache.kafka.server.fault" />
276277
<allow pkg="org.apache.kafka.server.util" />
277278
<allow pkg="org.apache.kafka.shell"/>

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,13 @@ public Optional<TopicMetadata> topicMetadata(String topicName) {
7575

7676
@Override
7777
public CoordinatorMetadataDelta emptyDelta() {
78-
return new KRaftCoordinatorMetadataDelta(new MetadataDelta(metadataImage));
78+
// Note: supportedConfigChecker is not set because CoordinatorMetadataDelta only exposes topic-related methods.
79+
// No ConfigRecord replay happens through this path, so the checker is never invoked.
80+
return new KRaftCoordinatorMetadataDelta(
81+
new MetadataDelta.Builder()
82+
.setImage(metadataImage)
83+
.build()
84+
);
7985
}
8086

8187
@Override

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2107,7 +2107,11 @@ public void testOnNewMetadataImage() {
21072107
verify(coordinator0).onLoaded(CoordinatorMetadataImage.EMPTY);
21082108

21092109
// Publish a new image.
2110-
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new MetadataDelta(MetadataImage.EMPTY));
2110+
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(
2111+
new MetadataDelta.Builder()
2112+
.setImage(MetadataImage.EMPTY)
2113+
.build()
2114+
);
21112115
CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY;
21122116
runtime.onNewMetadataImage(newImage, delta);
21132117

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataDeltaTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ public void testKRaftCoordinatorDelta() {
6363
.addTopic(deletedTopicId, deletedTopicName, 1)
6464
.addTopic(changedTopicId, changedTopicName, 1)
6565
.build();
66-
MetadataDelta delta = new MetadataDelta(image);
66+
MetadataDelta delta = new MetadataDelta.Builder()
67+
.setImage(image)
68+
.build();
6769
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
6870
delta.replay(new TopicRecord().setTopicId(topicId2).setName(topicName2));
6971
delta.replay(new RemoveTopicRecord().setTopicId(deletedTopicId));
@@ -113,14 +115,18 @@ public void testEqualsAndHashcode() {
113115
Uuid topicId3 = Uuid.randomUuid();
114116
String topicName3 = "test-topic3";
115117

116-
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
118+
MetadataDelta delta = new MetadataDelta.Builder()
119+
.setImage(MetadataImage.EMPTY)
120+
.build();
117121
delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
118122
delta.replay(new TopicRecord().setTopicId(topicId2).setName(topicName2));
119123

120124
KRaftCoordinatorMetadataDelta coordinatorDelta = new KRaftCoordinatorMetadataDelta(delta);
121125
KRaftCoordinatorMetadataDelta coordinatorDeltaCopy = new KRaftCoordinatorMetadataDelta(delta);
122126

123-
MetadataDelta delta2 = new MetadataDelta(MetadataImage.EMPTY);
127+
MetadataDelta delta2 = new MetadataDelta.Builder()
128+
.setImage(MetadataImage.EMPTY)
129+
.build();
124130
delta.replay(new TopicRecord().setTopicId(topicId3).setName(topicName3));
125131
KRaftCoordinatorMetadataDelta coordinatorDelta2 = new KRaftCoordinatorMetadataDelta(delta2);
126132

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MetadataImageBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public MetadataImageBuilder() {
3434
}
3535

3636
public MetadataImageBuilder(MetadataImage image) {
37-
this.delta = new MetadataDelta(image);
37+
this.delta = new MetadataDelta.Builder()
38+
.setImage(image)
39+
.build();
3840
}
3941

4042
public MetadataImageBuilder addTopic(

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ class ControllerServer(
246246
setCreateTopicPolicy(createTopicPolicy.toJava).
247247
setAlterConfigPolicy(alterConfigPolicy.toJava).
248248
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
249+
setSupportedConfigChecker(sharedServer.supportedConfigChecker).
249250
setStaticConfig(config.originals).
250251
setBootstrapMetadata(bootstrapMetadata).
251252
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).

core/src/main/scala/kafka/server/SharedServer.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import org.apache.kafka.image.loader.MetadataLoader
3030
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
3131
import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
3232
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
33-
import org.apache.kafka.metadata.ListenerInfo
34-
import org.apache.kafka.metadata.MetadataRecordSerde
33+
import org.apache.kafka.metadata.{SupportedConfigChecker, ListenerInfo, MetadataRecordSerde}
3534
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
3635
import org.apache.kafka.raft.Endpoints
3736
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
37+
import org.apache.kafka.server.config.DefaultSupportedConfigChecker
3838
import org.apache.kafka.server.common.ApiMessageAndVersion
3939
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
4040
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics}
@@ -112,6 +112,7 @@ class SharedServer(
112112
private var usedByController: Boolean = false
113113
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
114114
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
115+
val supportedConfigChecker: SupportedConfigChecker = new DefaultSupportedConfigChecker()
115116

116117
// Factory for creating request handler pools with shared aggregate thread counter
117118
val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
@@ -323,7 +324,8 @@ class SharedServer(
323324
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
324325
setFaultHandler(metadataLoaderFaultHandler).
325326
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
326-
setMetrics(metadataLoaderMetrics)
327+
setMetrics(metadataLoaderMetrics).
328+
setSupportedConfigChecker(supportedConfigChecker)
327329
loader = loaderBuilder.build()
328330
snapshotEmitter = new SnapshotEmitter.Builder().
329331
setNodeId(nodeId).

core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ class LocalLeaderEndPointTest extends Logging {
7878
alterPartitionManager = alterPartitionManager
7979
)
8080

81-
val delta = new MetadataDelta(MetadataImage.EMPTY)
81+
val delta = new MetadataDelta.Builder()
82+
.setImage(MetadataImage.EMPTY)
83+
.build()
8284
delta.replay(new FeatureLevelRecord()
8385
.setName(MetadataVersion.FEATURE_NAME)
8486
.setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())
@@ -254,7 +256,9 @@ class LocalLeaderEndPointTest extends Logging {
254256
}
255257

256258
private def bumpLeaderEpoch(): Unit = {
257-
val delta = new MetadataDelta(image)
259+
val delta = new MetadataDelta.Builder()
260+
.setImage(image)
261+
.build()
258262
delta.replay(new PartitionChangeRecord()
259263
.setTopicId(topicId)
260264
.setPartitionId(partition)

core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class DefaultApiVersionManagerTest {
3838
private val brokerFeatures = BrokerFeatures.createDefault(true)
3939
private val metadataCache = {
4040
val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
41-
val delta = new MetadataDelta(MetadataImage.EMPTY)
41+
val delta = new MetadataDelta.Builder()
42+
.setImage(MetadataImage.EMPTY)
43+
.build()
4244
delta.replay(new FeatureLevelRecord()
4345
.setName(MetadataVersion.FEATURE_NAME)
4446
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel())

0 commit comments

Comments
 (0)