Skip to content

Commit 9db5888

Browse files
authored
MINOR: FindCoordinator API does not lookup partition for share partition key correctly (#19273)
This patch fixes another bug in the FindCoordinator API handling for share partition key. `shareCoordinator.foreach` returns `Unit` so `shareCoordinator.foreach(coordinator => coordinator.partitionFor(SharePartitionKey.getInstance(key)))` does not return the partition for the key. Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 15474e0 commit 9db5888

2 files changed

Lines changed: 32 additions & 2 deletions

File tree

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
11801180
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
11811181

11821182
case CoordinatorType.SHARE =>
1183-
(shareCoordinator.foreach(coordinator => coordinator.partitionFor(SharePartitionKey.getInstance(key))), SHARE_GROUP_STATE_TOPIC_NAME)
1183+
// We know that shareCoordinator is defined at this stage.
1184+
(shareCoordinator.get.partitionFor(SharePartitionKey.getInstance(key)), SHARE_GROUP_STATE_TOPIC_NAME)
11841185
}
11851186

11861187
val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName).asJava, request.context.listenerName, false, false).asScala

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigResource
3333
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
3434
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
3535
import org.apache.kafka.common.internals.Topic
36+
import org.apache.kafka.common.internals.Topic.SHARE_GROUP_STATE_TOPIC_NAME
3637
import org.apache.kafka.common.memory.MemoryPool
3738
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
3839
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@@ -88,7 +89,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
8889
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
8990
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
9091
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
91-
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData}
92+
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
9293
import org.apache.kafka.server.quota.ThrottleCallback
9394
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
9495
import org.apache.kafka.server.share.context.{FinalContext, ShareSessionContext}
@@ -789,6 +790,34 @@ class KafkaApisTest extends Logging {
789790
assertEquals(expectedResponse, response.data)
790791
}
791792

793+
@Test
794+
def testFindCoordinatorWithValidSharePartitionKey(): Unit = {
795+
addTopicToMetadataCache(SHARE_GROUP_STATE_TOPIC_NAME, 10, 3)
796+
val key = SharePartitionKey.getInstance("foo", Uuid.randomUuid(), 10)
797+
798+
val request = new FindCoordinatorRequestData()
799+
.setKeyType(CoordinatorType.SHARE.id)
800+
.setCoordinatorKeys(asList(key.asCoordinatorKey))
801+
802+
val requestChannelRequest = buildRequest(new FindCoordinatorRequest.Builder(request).build())
803+
804+
kafkaApis = createKafkaApis()
805+
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
806+
807+
when(shareCoordinator.partitionFor(ArgumentMatchers.eq(key))).thenReturn(10)
808+
809+
val expectedResponse = new FindCoordinatorResponseData()
810+
.setCoordinators(asList(
811+
new FindCoordinatorResponseData.Coordinator()
812+
.setKey(key.asCoordinatorKey)
813+
.setNodeId(0)
814+
.setHost("broker0")
815+
.setPort(9092)))
816+
817+
val response = verifyNoThrottling[FindCoordinatorResponse](requestChannelRequest)
818+
assertEquals(expectedResponse, response.data)
819+
}
820+
792821
@Test
793822
def testMetadataAutoTopicCreationForOffsetTopic(): Unit = {
794823
testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, enableAutoTopicCreation = true,

0 commit comments

Comments
 (0)