Skip to content

Commit cb7e3ab

Browse files
authored
KAFKA-20410: Register topic DLQ configs in LogConfig. [2/N] (#22167)
* Register TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG in LogConfig. * Update tests. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
1 parent 8a1c198 commit cb7e3ab

2 files changed

Lines changed: 9 additions & 0 deletions

File tree

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class LogConfigTest {
7373
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
7474
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
7575
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
76+
case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
7677
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op
7778

7879
case _ => assertPropertyInvalid(name, "not_a_number", "-1")

storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ public Optional<String> serverConfigName(String configName) {
248248
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
249249
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
250250
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
251+
.define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC)
251252
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
252253
}
253254

@@ -279,6 +280,7 @@ public Optional<String> serverConfigName(String configName) {
279280
public final BrokerCompressionType compressionType;
280281
public final Optional<Compression> compression;
281282
public final boolean preallocate;
283+
public final boolean errorsDeadletterqueueGroupEnable;
282284

283285
public final TimestampType messageTimestampType;
284286

@@ -335,6 +337,7 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
335337
this.messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
336338
this.leaderReplicationThrottledReplicas = Collections.unmodifiableList(getList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
337339
this.followerReplicationThrottledReplicas = Collections.unmodifiableList(getList(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
340+
this.errorsDeadletterqueueGroupEnable = getBoolean(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
338341

339342
remoteLogConfig = new RemoteLogConfig(this);
340343
}
@@ -387,6 +390,10 @@ public int initFileSize() {
387390
return 0;
388391
}
389392

393+
public boolean errorsDeadletterqueueGroupEnable() {
394+
return errorsDeadletterqueueGroupEnable;
395+
}
396+
390397
public boolean remoteStorageEnable() {
391398
return remoteLogConfig.remoteStorageEnable;
392399
}
@@ -652,6 +659,7 @@ public String toString() {
652659
", followerReplicationThrottledReplicas=" + followerReplicationThrottledReplicas +
653660
", remoteLogConfig=" + remoteLogConfig +
654661
", maxMessageSize=" + maxMessageSize +
662+
", errorsDeadletterqueueGroupEnable=" + errorsDeadletterqueueGroupEnable +
655663
'}';
656664
}
657665

0 commit comments

Comments
 (0)