Skip to content

Commit 22c1e44

Browse files
dajacclaude
andauthored
MINOR: Include numeric bound in group config validation errors (#22185)
When a dynamic group config value is out of range, the error message referenced the broker-level config name that defines the bound (e.g. `group.consumer.min.heartbeat.interval.ms`). Operators had to look up the broker config to know which value would be accepted. This patch updates the `validateIntRange`, `validateIntMin`, and `validateIntMax` helpers in `GroupConfig` to include the numeric bound directly. For example, setting `consumer.heartbeat.interval.ms` below the minimum now reports `consumer.heartbeat.interval.ms must be greater than or equal to 5`. A new parameterized test `testValidationErrorMessageIncludesBound` covers both directions for each range-bounded config and the max-only and min-only checks. Reviewers: Andrew Schofield <aschofield@confluent.io> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 474798a commit 22c1e44

4 files changed

Lines changed: 101 additions & 63 deletions

File tree

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
11541154

11551155
assertFutureThrows(classOf[InvalidConfigurationException],
11561156
alterResult.values.get(groupResource),
1157-
"consumer.session.timeout.ms must be greater than or equal to group.consumer.min.session.timeout.ms")
1157+
"consumer.session.timeout.ms must be in the range 45000 to 60000 inclusive.")
11581158
}
11591159

11601160
@Test

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -446,119 +446,92 @@ private static void validateValues(
446446
parsed,
447447
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
448448
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
449-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
450-
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs(),
451-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
449+
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()
452450
);
453451
validateIntRange(
454452
parsed,
455453
CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
456454
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
457-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
458-
groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs(),
459-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
455+
groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()
460456
);
461457
validateIntRange(
462458
parsed,
463459
CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
464460
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
465-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
466-
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs(),
467-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
461+
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()
468462
);
469463

470464
// Share group configs.
471465
validateIntRange(
472466
parsed,
473467
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
474468
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
475-
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
476-
groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs(),
477-
GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
469+
groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()
478470
);
479471
validateIntRange(
480472
parsed,
481473
SHARE_SESSION_TIMEOUT_MS_CONFIG,
482474
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
483-
GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
484-
groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs(),
485-
GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
475+
groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()
486476
);
487477
validateIntRange(
488478
parsed,
489479
SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
490480
shareGroupConfig.shareGroupMinRecordLockDurationMs(),
491-
ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
492-
shareGroupConfig.shareGroupMaxRecordLockDurationMs(),
493-
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
481+
shareGroupConfig.shareGroupMaxRecordLockDurationMs()
494482
);
495483
validateIntRange(
496484
parsed,
497485
SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
498486
shareGroupConfig.shareGroupMinDeliveryCountLimit(),
499-
ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG,
500-
shareGroupConfig.shareGroupMaxDeliveryCountLimit(),
501-
ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
487+
shareGroupConfig.shareGroupMaxDeliveryCountLimit()
502488
);
503489
validateIntRange(
504490
parsed,
505491
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
506492
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
507-
ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
508-
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks(),
509-
ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG
493+
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()
510494
);
511495
validateIntRange(
512496
parsed,
513497
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
514498
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
515-
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
516-
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs(),
517-
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
499+
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()
518500
);
519501

520502
// Streams group configs.
521503
validateIntRange(
522504
parsed,
523505
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
524506
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
525-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
526-
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs(),
527-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
507+
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()
528508
);
529509
validateIntRange(
530510
parsed,
531511
STREAMS_SESSION_TIMEOUT_MS_CONFIG,
532512
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
533-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
534-
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs(),
535-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
513+
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()
536514
);
537515
validateIntMax(
538516
parsed,
539517
STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
540-
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas(),
541-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG
518+
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()
542519
);
543520
validateIntRange(
544521
parsed,
545522
STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
546523
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
547-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
548-
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs(),
549-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
524+
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()
550525
);
551526
validateIntMin(
552527
parsed,
553528
STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
554-
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
555-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
529+
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
556530
);
557531
validateIntMax(
558532
parsed,
559533
STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
560-
groupCoordinatorConfig.streamsGroupMaxWarmupReplicas(),
561-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG
534+
groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
562535
);
563536

564537
// Cross-field validations: session timeout must be greater than heartbeat interval.
@@ -601,16 +574,12 @@ private static void validateIntRange(
601574
Map<String, Object> parsed,
602575
String key,
603576
int min,
604-
String minConfigName,
605-
int max,
606-
String maxConfigName
577+
int max
607578
) {
608579
if (!parsed.containsKey(key)) return;
609580
int value = (Integer) parsed.get(key);
610-
if (value < min)
611-
throw new InvalidConfigurationException(key + " must be greater than or equal to " + minConfigName);
612-
if (value > max)
613-
throw new InvalidConfigurationException(key + " must be less than or equal to " + maxConfigName);
581+
if (value < min || value > max)
582+
throw new InvalidConfigurationException(key + " must be in the range " + min + " to " + max + " inclusive.");
614583
}
615584

616585
/**
@@ -620,13 +589,12 @@ private static void validateIntRange(
620589
private static void validateIntMax(
621590
Map<String, Object> parsed,
622591
String key,
623-
int max,
624-
String maxConfigName
592+
int max
625593
) {
626594
if (!parsed.containsKey(key)) return;
627595
int value = (Integer) parsed.get(key);
628596
if (value > max)
629-
throw new InvalidConfigurationException(key + " must be less than or equal to " + maxConfigName);
597+
throw new InvalidConfigurationException(key + " must be less than or equal to " + max);
630598
}
631599

632600
/**
@@ -636,13 +604,12 @@ private static void validateIntMax(
636604
private static void validateIntMin(
637605
Map<String, Object> parsed,
638606
String key,
639-
int min,
640-
String minConfigName
607+
int min
641608
) {
642609
if (!parsed.containsKey(key)) return;
643610
int value = (Integer) parsed.get(key);
644611
if (value < min)
645-
throw new InvalidConfigurationException(key + " must be greater than or equal to " + minConfigName);
612+
throw new InvalidConfigurationException(key + " must be greater than or equal to " + min);
646613
}
647614

648615
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,77 @@ private void doTestValidProps(Map<String, String> props) {
353353
assertDoesNotThrow(() -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig()));
354354
}
355355

356+
private static Stream<Arguments> outOfRangeValuesAndExpectedMessages() {
357+
return Stream.of(
358+
// Consumer group configs.
359+
Arguments.of(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "1",
360+
"consumer.heartbeat.interval.ms must be in the range 5 to 15000 inclusive."),
361+
Arguments.of(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "20000",
362+
"consumer.heartbeat.interval.ms must be in the range 5 to 15000 inclusive."),
363+
Arguments.of(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1",
364+
"consumer.session.timeout.ms must be in the range 45 to 60000 inclusive."),
365+
Arguments.of(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "70000",
366+
"consumer.session.timeout.ms must be in the range 45 to 60000 inclusive."),
367+
Arguments.of(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "500",
368+
"consumer.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
369+
Arguments.of(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000",
370+
"consumer.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
371+
372+
// Share group configs.
373+
Arguments.of(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1",
374+
"share.heartbeat.interval.ms must be in the range 5 to 15000 inclusive."),
375+
Arguments.of(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "20000",
376+
"share.heartbeat.interval.ms must be in the range 5 to 15000 inclusive."),
377+
Arguments.of(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1",
378+
"share.session.timeout.ms must be in the range 45 to 60000 inclusive."),
379+
Arguments.of(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000",
380+
"share.session.timeout.ms must be in the range 45 to 60000 inclusive."),
381+
Arguments.of(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "10000",
382+
"share.record.lock.duration.ms must be in the range 15000 to 60000 inclusive."),
383+
Arguments.of(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "70000",
384+
"share.record.lock.duration.ms must be in the range 15000 to 60000 inclusive."),
385+
Arguments.of(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "11",
386+
"share.delivery.count.limit must be in the range 2 to 10 inclusive."),
387+
Arguments.of(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, "11000",
388+
"share.partition.max.record.locks must be in the range 100 to 10000 inclusive."),
389+
Arguments.of(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "500",
390+
"share.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
391+
Arguments.of(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000",
392+
"share.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
393+
394+
// Streams group configs.
395+
Arguments.of(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "1000",
396+
"streams.heartbeat.interval.ms must be in the range 5000 to 15000 inclusive."),
397+
Arguments.of(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "20000",
398+
"streams.heartbeat.interval.ms must be in the range 5000 to 15000 inclusive."),
399+
Arguments.of(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1",
400+
"streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."),
401+
Arguments.of(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "70000",
402+
"streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."),
403+
Arguments.of(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "5",
404+
"streams.num.standby.replicas must be less than or equal to 2"),
405+
Arguments.of(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "500",
406+
"streams.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
407+
Arguments.of(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000",
408+
"streams.assignment.interval.ms must be in the range 1000 to 15000 inclusive."),
409+
Arguments.of(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000",
410+
"streams.task.offset.interval.ms must be greater than or equal to 15000"),
411+
Arguments.of(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "50",
412+
"streams.num.warmup.replicas must be less than or equal to 20")
413+
);
414+
}
415+
416+
@ParameterizedTest(name = "testValidationErrorMessageIncludesBound[{0}={1}]")
417+
@MethodSource("outOfRangeValuesAndExpectedMessages")
418+
public void testValidationErrorMessageIncludesBound(String key, String value, String expectedMessage) {
419+
var props = Map.of(key, value);
420+
var exception = assertThrows(
421+
InvalidConfigurationException.class,
422+
() -> GroupConfig.validate(props, createGroupCoordinatorConfig(), createShareGroupConfig())
423+
);
424+
assertEquals(expectedMessage, exception.getMessage());
425+
}
426+
356427
@Test
357428
public void testFromPropsWithDefaultValue() {
358429
Map<String, String> defaultValue = new HashMap<>();

tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,15 @@ public void testAlterStreamsGroupSessionTimeout() {
316316
"--entity-name", "group",
317317
"--alter", "--add-config", "streams.session.timeout.ms=1"));
318318
message = captureStandardErr(run(command));
319-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.session.timeout.ms must be greater than or equal to group.streams.min.session.timeout.ms"));
319+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."));
320320

321321
// Should fail to set above max
322322
command = Stream.concat(quorumArgs(), Stream.of(
323323
"--entity-type", "groups",
324324
"--entity-name", "group",
325325
"--alter", "--add-config", "streams.session.timeout.ms=100000"));
326326
message = captureStandardErr(run(command));
327-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.session.timeout.ms must be less than or equal to group.streams.max.session.timeout.ms"));
327+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."));
328328
}
329329

330330
@ClusterTest(serverProperties = {
@@ -347,15 +347,15 @@ public void testAlterStreamsGroupHeartbeatInterval() {
347347
"--entity-name", "group",
348348
"--alter", "--add-config", "streams.heartbeat.interval.ms=1"));
349349
message = captureStandardErr(run(command));
350-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.heartbeat.interval.ms must be greater than or equal to group.streams.min.heartbeat.interval.ms"));
350+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.heartbeat.interval.ms must be in the range 5000 to 55000 inclusive."));
351351

352352
// Should fail to set above max
353353
command = Stream.concat(quorumArgs(), Stream.of(
354354
"--entity-type", "groups",
355355
"--entity-name", "group",
356356
"--alter", "--add-config", "streams.heartbeat.interval.ms=100000"));
357357
message = captureStandardErr(run(command));
358-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.heartbeat.interval.ms must be less than or equal to group.streams.max.heartbeat.interval.ms"));
358+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException: streams.heartbeat.interval.ms must be in the range 5000 to 55000 inclusive."));
359359

360360
// Should fail to set above session timeout
361361
command = Stream.concat(quorumArgs(), Stream.of(
@@ -398,7 +398,7 @@ public void testAlterStreamsGroupNumOfStandbyReplicas() {
398398
"--entity-name", "group",
399399
"--alter", "--add-config", "streams.num.standby.replicas=3"));
400400
message = captureStandardErr(run(command));
401-
assertTrue(message.contains("streams.num.standby.replicas must be less than or equal to group.streams.max.standby.replicas"));
401+
assertTrue(message.contains("streams.num.standby.replicas must be less than or equal to 2"));
402402
}
403403

404404
@ClusterTest
@@ -432,7 +432,7 @@ public void testAlterStreamsGroupTaskOffsetInterval() {
432432
"--entity-name", "group",
433433
"--alter", "--add-config", "streams.task.offset.interval.ms=1"));
434434
message = captureStandardErr(run(command));
435-
assertTrue(message.contains("streams.task.offset.interval.ms must be greater than or equal to group.streams.min.task.offset.interval.ms"));
435+
assertTrue(message.contains("streams.task.offset.interval.ms must be greater than or equal to 15000"));
436436

437437
}
438438

@@ -467,7 +467,7 @@ public void testAlterStreamsGroupNumWarmupReplicas() {
467467
"--entity-name", "group",
468468
"--alter", "--add-config", "streams.num.warmup.replicas=25"));
469469
message = captureStandardErr(run(command));
470-
assertTrue(message.contains("streams.num.warmup.replicas must be less than or equal to group.streams.max.warmup.replicas"));
470+
assertTrue(message.contains("streams.num.warmup.replicas must be less than or equal to 20"));
471471
}
472472

473473
private void verifyGroupConfigUpdate(List<String> alterOpts) throws Exception {

0 commit comments

Comments
 (0)