Skip to content

Commit 07e0cae

Browse files
authored
KAFKA-18652: Add num.warmup.replicas config (#21801)
KIP-1071 adds new broker side configs: - num.warmup.replicas - max.warmup.replicas This PR adds both configs to the broker. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Lan Ding <isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent e6653a9 commit 07e0cae

7 files changed

Lines changed: 177 additions & 5 deletions

File tree

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@
269269

270270
<!-- group coordinator -->
271271
<suppress checks="CyclomaticComplexity"
272-
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext).java"/>
272+
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext|GroupConfigTest).java"/>
273273
<suppress checks="(NPathComplexity|MethodLength)"
274274
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorShard).java"/>
275275
<suppress checks="ClassFanOutComplexity"

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerd
7777
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
7878
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
7979
import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
80-
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG}
80+
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, STREAMS_NUM_WARMUP_REPLICAS_CONFIG, ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG}
8181
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
8282
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, GroupCoordinator, GroupCoordinatorConfig}
8383
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,9 +379,9 @@ class KafkaApisTest extends Logging {
379379
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
380380
cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
381381
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
382+
cgConfigs.put(STREAMS_NUM_WARMUP_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT.toString)
382383
cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
383384
cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
384-
385385
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
386386

387387
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public final class GroupConfig extends AbstractConfig {
105105

106106
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = "streams.task.offset.interval.ms";
107107

108+
public static final String STREAMS_NUM_WARMUP_REPLICAS_CONFIG = "streams.num.warmup.replicas";
109+
108110
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
109111
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
110112
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The name of the topic to be used as the dead-letter queue (DLQ) topic for this share group. If blank (the default), the group does not have a DLQ topic.";
@@ -151,6 +153,8 @@ public final class GroupConfig extends AbstractConfig {
151153

152154
private final Optional<Integer> streamsTaskOffsetIntervalMs;
153155

156+
private final Optional<Integer> streamsNumWarmupReplicas;
157+
154158
private final Optional<IsolationLevel> shareIsolationLevel;
155159

156160
private final Optional<Boolean> shareRenewAcknowledgeEnable;
@@ -282,6 +286,12 @@ public final class GroupConfig extends AbstractConfig {
282286
atLeast(1),
283287
MEDIUM,
284288
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
289+
.define(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
290+
INT,
291+
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT,
292+
atLeast(0),
293+
MEDIUM,
294+
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
285295

286296
// DLQ configurations (KIP-1191)
287297
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
@@ -326,6 +336,7 @@ public final class GroupConfig extends AbstractConfig {
326336
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
327337
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
328338
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
339+
Map.entry(STREAMS_NUM_WARMUP_REPLICAS_CONFIG, Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG)),
329340

330341
// DLQ configs
331342
Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
@@ -362,6 +373,7 @@ public GroupConfig(Map<?, ?> props) {
362373
this.streamsAssignmentIntervalMs = optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
363374
this.streamsAssignorOffloadEnable = optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
364375
this.streamsTaskOffsetIntervalMs = optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
376+
this.streamsNumWarmupReplicas = optionalInt(STREAMS_NUM_WARMUP_REPLICAS_CONFIG);
365377
this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
366378
.map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
367379
this.shareRenewAcknowledgeEnable = optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -542,6 +554,12 @@ private static void validateValues(
542554
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
543555
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
544556
);
557+
validateIntMax(
558+
parsed,
559+
STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
560+
groupCoordinatorConfig.streamsGroupMaxWarmupReplicas(),
561+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG
562+
);
545563

546564
// Cross-field validations: session timeout must be greater than heartbeat interval.
547565
validateSessionExceedsHeartbeat(
@@ -782,6 +800,12 @@ private static void evaluateValues(
782800
STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
783801
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
784802
);
803+
clampToMax(
804+
props,
805+
groupId,
806+
STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
807+
groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
808+
);
785809

786810
// Verify that clamping did not break the session > heartbeat invariant.
787811
checkSessionExceedsHeartbeat(
@@ -1084,6 +1108,13 @@ public Optional<Integer> streamsTaskOffsetIntervalMs() {
10841108
return streamsTaskOffsetIntervalMs;
10851109
}
10861110

1111+
/**
1112+
* The number of warmup replicas for each task.
1113+
*/
1114+
public Optional<Integer> streamsNumWarmupReplicas() {
1115+
return streamsNumWarmupReplicas;
1116+
}
1117+
10871118
/**
10881119
* The share group isolation level.
10891120
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,14 @@ public class GroupCoordinatorConfig {
392392
public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT = 15000;
393393
public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
394394

395+
public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG = "group.streams.num.warmup.replicas";
396+
public static final int STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT = 2;
397+
public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC = "The maximum number of warmup task replicas.";
398+
399+
public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG = "group.streams.max.warmup.replicas";
400+
public static final int STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT = 20;
401+
public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG;
402+
395403
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
396404
CACHED_BUFFER_MAX_BYTES_CONFIG,
397405
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -482,7 +490,9 @@ public class GroupCoordinatorConfig {
482490
.define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
483491
.define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
484492
.define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
485-
.define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
493+
.define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC)
494+
.define(STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
495+
.define(STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC);
486496

487497

488498
/**
@@ -548,6 +558,8 @@ public class GroupCoordinatorConfig {
548558
private final int streamsGroupMaxAssignmentIntervalMs;
549559
private final int streamsGroupTaskOffsetIntervalMs;
550560
private final int streamsGroupMinTaskOffsetIntervalMs;
561+
private final int streamsGroupNumWarmupReplicas;
562+
private final int streamsGroupMaxWarmupReplicas;
551563

552564
private final AbstractConfig config;
553565

@@ -615,6 +627,8 @@ public GroupCoordinatorConfig(AbstractConfig config) {
615627
this.streamsGroupMaxAssignmentIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
616628
this.streamsGroupTaskOffsetIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
617629
this.streamsGroupMinTaskOffsetIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
630+
this.streamsGroupNumWarmupReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG);
631+
this.streamsGroupMaxWarmupReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG);
618632
this.config = config;
619633

620634
// New group coordinator configs validation.
@@ -714,9 +728,10 @@ public GroupCoordinatorConfig(AbstractConfig config) {
714728

715729
require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas,
716730
String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
717-
718731
require(streamsGroupTaskOffsetIntervalMs >= streamsGroupMinTaskOffsetIntervalMs,
719732
String.format("%s must be greater than or equal to %s", STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
733+
require(streamsGroupNumWarmupReplicas <= streamsGroupMaxWarmupReplicas,
734+
String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG));
720735

721736
}
722737

@@ -1334,4 +1349,18 @@ public int streamsGroupTaskOffsetIntervalMs() {
13341349
public int streamsGroupMinTaskOffsetIntervalMs() {
13351350
return streamsGroupMinTaskOffsetIntervalMs;
13361351
}
1352+
1353+
/**
1354+
* The maximum number of warmup replicas for streams groups.
1355+
*/
1356+
public int streamsGroupNumWarmupReplicas() {
1357+
return streamsGroupNumWarmupReplicas;
1358+
}
1359+
1360+
/**
1361+
* The maximum allowed number of warmup replicas to be configured for streams groups
1362+
*/
1363+
public int streamsGroupMaxWarmupReplicas() {
1364+
return streamsGroupMaxWarmupReplicas;
1365+
}
13371366
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
5151
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
5252
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
53+
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT;
5354
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
5455
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
5556
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
@@ -127,6 +128,8 @@ public void testFromPropsInvalid() {
127128
assertPropertyInvalid(name, "not_a_boolean");
128129
} else if (GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
129130
assertPropertyInvalid(name, "not_a_number", "1.0");
131+
} else if (GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG.equals(name)) {
132+
assertPropertyInvalid(name, "not_a_number", "1.0");
130133
} else if (GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
131134
assertPropertyInvalid(name, "not_a_boolean");
132135
} else if (!GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG.equals(name)) {
@@ -322,6 +325,11 @@ public void testInvalidProps() {
322325
doTestInvalidProps(props, InvalidConfigurationException.class);
323326
props = createValidGroupConfig();
324327

328+
// Check for invalid streamsNumWarmupReplicas, > MAX
329+
props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "50");
330+
doTestInvalidProps(props, InvalidConfigurationException.class);
331+
props = createValidGroupConfig();
332+
325333
// Check for invalid shareIsolationLevel.
326334
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
327335
doTestInvalidProps(props, ConfigException.class);
@@ -368,6 +376,7 @@ public void testFromPropsWithDefaultValue() {
368376
defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250");
369377
defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
370378
defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000");
379+
defaultValue.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "5");
371380
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
372381

373382
Properties props = new Properties();
@@ -394,6 +403,7 @@ public void testFromPropsWithDefaultValue() {
394403
assertEquals(1250, config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
395404
assertEquals(false, config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
396405
assertEquals(30000, config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
406+
assertEquals(5, config.getInt(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG));
397407
assertEquals(true, config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
398408
}
399409

@@ -640,6 +650,10 @@ private static Stream<Arguments> maxBoundedConfigs() {
640650
Arguments.of(
641651
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
642652
5, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT
653+
),
654+
Arguments.of(
655+
GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
656+
25, STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT
643657
)
644658
);
645659
}
@@ -766,6 +780,7 @@ private Map<String, String> createValidGroupConfig() {
766780
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
767781
props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
768782
props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
783+
props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "3");
769784
props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
770785
return props;
771786
}

0 commit comments

Comments
 (0)