Skip to content

Commit 84f810e

Browse files
MINOR: Resolve hidden NPE in RequestQuotaTest (#21587)
`RequestQuotaTest` was silently experiencing NPE when testing `SHARE_ACKNOWLEDGE`. This is because the default for the group ID in this request is null, even though this is never actually used in practice by a real client. The construction of `ShareAcknowledgeRequestData` in this test did not initialize a specific value for group ID, and this means it was left as null. The result was an NPE handling the request in the broker, which was not the intended action of the test. The PR explicitly handles null for group ID and member ID in `SHARE_FETCH` and `SHARE_ACKNOWLEDGE` requests so that we are not relying on the overall exception handling for this situation. In practice, this would not be necessary for a real client, but the defensive code makes sense for this test (or a poorly written client). It also initialises the request in the test case with a non-null group ID and member ID for `SHARE_ACKNOWLEDGE` which aligns with what already exists for `SHARE_FETCH`. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent c4631a2 commit 84f810e

2 files changed

Lines changed: 20 additions & 2 deletions

File tree

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3172,6 +3172,12 @@ class KafkaApis(val requestChannel: RequestChannel,
31723172

31733173
val groupId = shareFetchRequest.data.groupId
31743174

3175+
if (groupId == null) {
3176+
requestHelper.sendMaybeThrottle(request,
3177+
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
3178+
return CompletableFuture.completedFuture[Unit](())
3179+
}
3180+
31753181
// Share Fetch needs permission to perform the READ action on the named group resource (groupId)
31763182
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
31773183
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
@@ -3539,6 +3545,12 @@ class KafkaApis(val requestChannel: RequestChannel,
35393545

35403546
val groupId = shareAcknowledgeRequest.data.groupId
35413547

3548+
if (groupId == null) {
3549+
requestHelper.sendMaybeThrottle(request,
3550+
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
3551+
return CompletableFuture.completedFuture[Unit](())
3552+
}
3553+
35423554
// Share Acknowledge needs permission to perform READ action on the named group resource (groupId)
35433555
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
35443556
requestHelper.sendMaybeThrottle(request,
@@ -4222,7 +4234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
42224234
* @return boolean if the member id in the RPC is valid or not.
42234235
*/
42244236
def isMemberIdValid(memberId: String): Boolean = {
4225-
memberId.nonEmpty && memberId.length <= 36
4237+
memberId != null && memberId.nonEmpty && memberId.length <= 36
42264238
}
42274239

42284240
private def updateRecordConversionStats(request: RequestChannel.Request,

core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ class RequestQuotaTest extends BaseRequestTest {
329329
)
330330
)
331331
)
332+
332333
case ApiKeys.OFFSET_FETCH =>
333334
OffsetFetchRequest.Builder.forTopicNames(
334335
new OffsetFetchRequestData()
@@ -493,6 +494,7 @@ class RequestQuotaTest extends BaseRequestTest {
493494
.setHost("*")
494495
.setOperation(AclOperation.WRITE.code)
495496
.setPermissionType(AclPermissionType.DENY.code))))
497+
496498
case ApiKeys.DELETE_ACLS =>
497499
new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(util.List.of(
498500
new DeleteAclsRequestData.DeleteAclsFilter()
@@ -503,6 +505,7 @@ class RequestQuotaTest extends BaseRequestTest {
503505
.setHostFilter("*")
504506
.setOperation(AclOperation.ANY.code)
505507
.setPermissionType(AclPermissionType.DENY.code))))
508+
506509
case ApiKeys.DESCRIBE_CONFIGS =>
507510
new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
508511
.setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource()
@@ -719,7 +722,10 @@ class RequestQuotaTest extends BaseRequestTest {
719722
).iterator)))
720723

721724
case ApiKeys.SHARE_ACKNOWLEDGE =>
722-
new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData())
725+
new ShareAcknowledgeRequest.Builder(
726+
new ShareAcknowledgeRequestData()
727+
.setGroupId("test-share-group")
728+
.setMemberId(Uuid.randomUuid().toString))
723729

724730
case ApiKeys.ADD_RAFT_VOTER =>
725731
new AddRaftVoterRequest.Builder(new AddRaftVoterRequestData())

0 commit comments

Comments
 (0)