Skip to content

Commit faa8b48

Browse files
KAFKA-20533: Correcting error response for topic deletion during share fetch (#22170)
The PR fixes a suppressed NPE when topic name cannot be resolved during share fetch. The metrics update triggers NPE when topic name is null and the error response gets mapped to UNKNOWN_SERVER_ERROR. Correcting the behaviour to return the correct error code per partition response. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 07e0cae commit faa8b48

2 files changed

Lines changed: 54 additions & 4 deletions

File tree

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4174,10 +4174,13 @@ class KafkaApis(val requestChannel: RequestChannel,
41744174
// record the bytes out metrics only when the response is being sent.
41754175
response.data.responses.forEach { topicResponse =>
41764176
topicResponse.partitions.forEach { data =>
4177-
// If the topic name was not known, we will have no bytes out.
4178-
if (topicResponse.topicId != null) {
4179-
val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicIdNames.get(topicResponse.topicId), data.partitionIndex))
4180-
brokerTopicStats.updateBytesOut(tp.topic, false, false, ShareFetchResponse.recordsSize(data))
4177+
// If the topic name was not known, we will have no bytes out. This can happen if the topic
4178+
// was deleted and the fetch request was received, or if the topic id in the request was invalid.
4179+
// In both cases, the error code for the partition will be set accordingly, and we won't have
4180+
// a topic name to record metrics with.
4181+
val topicName = topicIdNames.get(topicResponse.topicId)
4182+
if (topicName != null) {
4183+
brokerTopicStats.updateBytesOut(topicName, false, false, ShareFetchResponse.recordsSize(data))
41814184
}
41824185
}
41834186
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5189,6 +5189,53 @@ class KafkaApisTest extends Logging {
51895189
assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
51905190
}
51915191

5192+
@Test
5193+
def testHandleShareFetchRequestTopicDeletedDuringFetch(): Unit = {
5194+
val topicId = Uuid.randomUuid()
5195+
val partitionIndex = 0
5196+
metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
5197+
// Do NOT add the topic to metadata cache - simulating a deleted topic.
5198+
// topicIdNames will not contain this topic's mapping.
5199+
val memberId: String = Uuid.randomUuid().toString
5200+
5201+
val groupId = "group"
5202+
5203+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
5204+
// Send the topic name that corresponds to the context response considering session existed.
5205+
// This is to simulate the scenario where the topic gets deleted after the context is created
5206+
// and the subsequent fetch is received.
5207+
new ShareSessionContext(0, util.List.of(
5208+
new TopicIdPartition(topicId, partitionIndex, "foo")
5209+
))
5210+
)
5211+
5212+
val shareFetchRequestData = new ShareFetchRequestData().
5213+
setGroupId(groupId).
5214+
setMemberId(memberId).
5215+
setShareSessionEpoch(0).
5216+
setTopics(new ShareFetchRequestData.FetchTopicCollection(util.List.of(new ShareFetchRequestData.FetchTopic().
5217+
setTopicId(topicId).
5218+
setPartitions(new ShareFetchRequestData.FetchPartitionCollection(util.List.of(
5219+
new ShareFetchRequestData.FetchPartition()
5220+
.setPartitionIndex(partitionIndex)).iterator))).iterator))
5221+
5222+
val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
5223+
val request = buildRequest(shareFetchRequest)
5224+
kafkaApis = createKafkaApis()
5225+
kafkaApis.handleShareFetchRequest(request)
5226+
val response = verifyNoThrottling[ShareFetchResponse](request)
5227+
val responseData = response.data()
5228+
5229+
assertEquals(Errors.NONE.code, responseData.errorCode)
5230+
val topicResponses = responseData.responses()
5231+
assertEquals(1, topicResponses.size())
5232+
val topicResponse = topicResponses.stream.findFirst.get
5233+
assertEquals(topicId, topicResponse.topicId)
5234+
assertEquals(1, topicResponse.partitions.size())
5235+
assertEquals(partitionIndex, topicResponse.partitions.get(0).partitionIndex)
5236+
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, topicResponse.partitions.get(0).errorCode)
5237+
}
5238+
51925239
@Test
51935240
def testHandleShareFetchRequestErrorInReadingPartition(): Unit = {
51945241
val topicName = "foo"

0 commit comments

Comments
 (0)