Skip to content

Commit e73719d

Browse files
authored
KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API check topic describe (#19183)
This patch filters out the topic describe unauthorized topics from the StreamsGroupHeartbeat and StreamsGroupDescribe response. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
1 parent fcca405 commit e73719d

6 files changed

Lines changed: 232 additions & 9 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ private void handleError(
238238
Set<CoordinatorKey> groupsToUnmap) {
239239
switch (error) {
240240
case GROUP_AUTHORIZATION_FAILED:
241+
case TOPIC_AUTHORIZATION_FAILED:
241242
log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error);
242243
failed.put(groupId, error.exception(errorMsg));
243244
break;

clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* - {@link Errors#INVALID_REQUEST}
3636
* - {@link Errors#INVALID_GROUP_ID}
3737
* - {@link Errors#GROUP_ID_NOT_FOUND}
38+
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
3839
*/
3940
public class StreamsGroupDescribeResponse extends AbstractResponse {
4041

clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
// - INVALID_REQUEST (version 0+)
2828
// - INVALID_GROUP_ID (version 0+)
2929
// - GROUP_ID_NOT_FOUND (version 0+)
30+
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
3031
"fields": [
3132
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
3233
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
// - FENCED_MEMBER_EPOCH (version 0+)
3131
// - UNRELEASED_INSTANCE_ID (version 0+)
3232
// - GROUP_MAX_SIZE_REACHED (version 0+)
33-
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
33+
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
3434
// - CLUSTER_AUTHORIZATION_FAILED (version 0+)
3535
// - STREAMS_INVALID_TOPOLOGY (version 0+)
3636
// - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2707,11 +2707,20 @@ class KafkaApis(val requestChannel: RequestChannel,
27072707
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
27082708
return CompletableFuture.completedFuture[Unit](())
27092709
}
2710+
2711+
if (requiredTopics.nonEmpty) {
2712+
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, requiredTopics)(identity)
2713+
if (authorizedTopics.size < requiredTopics.size) {
2714+
val responseData = new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
2715+
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
2716+
return CompletableFuture.completedFuture[Unit](())
2717+
}
2718+
}
27102719
}
27112720

27122721
groupCoordinator.streamsGroupHeartbeat(
27132722
request.context,
2714-
streamsGroupHeartbeatRequest.data,
2723+
streamsGroupHeartbeatRequest.data
27152724
).handle[Unit] { (response, exception) =>
27162725
if (exception != null) {
27172726
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception))
@@ -2795,6 +2804,50 @@ class KafkaApis(val requestChannel: RequestChannel,
27952804
response.groups.addAll(results)
27962805
}
27972806

2807+
// Clients are not allowed to see topics that are not authorized for Describe.
2808+
if (authorizer.isDefined) {
2809+
val topicsToCheck = response.groups.stream()
2810+
.filter(group => group.topology != null)
2811+
.flatMap(group => group.topology.subtopologies.stream)
2812+
.flatMap(subtopology => java.util.stream.Stream.concat(
2813+
java.util.stream.Stream.concat(
2814+
java.util.stream.Stream.concat(
2815+
subtopology.sourceTopics.stream,
2816+
subtopology.repartitionSinkTopics.stream),
2817+
subtopology.repartitionSourceTopics.stream.map(_.name)),
2818+
subtopology.stateChangelogTopics.stream.map(_.name)))
2819+
.collect(Collectors.toSet[String])
2820+
.asScala
2821+
2822+
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
2823+
topicsToCheck)(identity)
2824+
2825+
val updatedGroups = response.groups.stream.map { group =>
2826+
val hasUnauthorizedTopic = if (group.topology == null) false else
2827+
group.topology.subtopologies.stream()
2828+
.flatMap(subtopology => java.util.stream.Stream.concat(
2829+
java.util.stream.Stream.concat(
2830+
java.util.stream.Stream.concat(
2831+
subtopology.sourceTopics.stream,
2832+
subtopology.repartitionSinkTopics.stream),
2833+
subtopology.repartitionSourceTopics.stream.map(_.name)),
2834+
subtopology.stateChangelogTopics.stream.map(_.name)))
2835+
.anyMatch(topic => !authorizedTopics.contains(topic))
2836+
2837+
if (hasUnauthorizedTopic) {
2838+
new StreamsGroupDescribeResponseData.DescribedGroup()
2839+
.setGroupId(group.groupId)
2840+
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
2841+
.setErrorMessage("The described group uses topics that the client is not authorized to describe.")
2842+
.setMembers(List.empty.asJava)
2843+
.setTopology(null)
2844+
} else {
2845+
group
2846+
}
2847+
}.collect(Collectors.toList[StreamsGroupDescribeResponseData.DescribedGroup])
2848+
response.setGroups(updatedGroups)
2849+
}
2850+
27982851
requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response))
27992852
}
28002853
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 174 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9883,7 +9883,7 @@ class KafkaApisTest extends Logging {
98839883
}
98849884

98859885
@Test
9886-
def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
9886+
def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
98879887
metadataCache = mock(classOf[KRaftMetadataCache])
98889888

98899889
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -9903,6 +9903,58 @@ class KafkaApisTest extends Logging {
99039903
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
99049904
}
99059905

9906+
@Test
9907+
def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
9908+
metadataCache = mock(classOf[KRaftMetadataCache])
9909+
val groupId = "group"
9910+
val fooTopicName = "foo"
9911+
val barTopicName = "bar"
9912+
val zarTopicName = "zar"
9913+
val tarTopicName = "tar"
9914+
9915+
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
9916+
new StreamsGroupHeartbeatRequestData.Topology()
9917+
.setEpoch(3)
9918+
.setSubtopologies(
9919+
util.List.of(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
9920+
.setSourceTopics(Collections.singletonList(fooTopicName))
9921+
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
9922+
.setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName)))
9923+
.setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName)))
9924+
)
9925+
)
9926+
)
9927+
9928+
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
9929+
9930+
val authorizer: Authorizer = mock(classOf[Authorizer])
9931+
val acls = Map(
9932+
groupId -> AuthorizationResult.ALLOWED,
9933+
fooTopicName -> AuthorizationResult.ALLOWED,
9934+
barTopicName -> AuthorizationResult.DENIED,
9935+
zarTopicName -> AuthorizationResult.ALLOWED,
9936+
tarTopicName -> AuthorizationResult.ALLOWED
9937+
)
9938+
when(authorizer.authorize(
9939+
any[RequestContext],
9940+
any[util.List[Action]]
9941+
)).thenAnswer { invocation =>
9942+
val actions = invocation.getArgument(1, classOf[util.List[Action]])
9943+
actions.asScala.map { action =>
9944+
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
9945+
}.asJava
9946+
}
9947+
9948+
kafkaApis = createKafkaApis(
9949+
authorizer = Some(authorizer),
9950+
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
9951+
)
9952+
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
9953+
9954+
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
9955+
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode)
9956+
}
9957+
99069958
@Test
99079959
def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
99089960
metadataCache = mock(classOf[KRaftMetadataCache])
@@ -10230,6 +10282,8 @@ class KafkaApisTest extends Logging {
1023010282
@ValueSource(booleans = Array(true, false))
1023110283
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
1023210284
metadataCache = mock(classOf[KRaftMetadataCache])
10285+
val fooTopicName = "foo"
10286+
val barTopicName = "bar"
1023310287

1023410288
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
1023510289
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10247,10 +10301,32 @@ class KafkaApisTest extends Logging {
1024710301
)
1024810302
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
1024910303

10304+
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
10305+
.setSubtopologyId("subtopology0")
10306+
.setSourceTopics(Collections.singletonList(fooTopicName))
10307+
10308+
val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
10309+
.setSubtopologyId("subtopology1")
10310+
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
10311+
10312+
val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
10313+
.setSubtopologyId("subtopology2")
10314+
.setSourceTopics(Collections.singletonList(fooTopicName))
10315+
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
10316+
1025010317
future.complete(List(
10251-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
10252-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
10253-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
10318+
new StreamsGroupDescribeResponseData.DescribedGroup()
10319+
.setGroupId(groupIds.get(0))
10320+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10321+
.setSubtopologies(Collections.singletonList(subtopology0))),
10322+
new StreamsGroupDescribeResponseData.DescribedGroup()
10323+
.setGroupId(groupIds.get(1))
10324+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10325+
.setSubtopologies(Collections.singletonList(subtopology1))),
10326+
new StreamsGroupDescribeResponseData.DescribedGroup()
10327+
.setGroupId(groupIds.get(2))
10328+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10329+
.setSubtopologies(Collections.singletonList(subtopology2)))
1025410330
).asJava)
1025510331

1025610332
var authorizedOperationsInt = Int.MinValue;
@@ -10262,9 +10338,18 @@ class KafkaApisTest extends Logging {
1026210338

1026310339
// Can't reuse the above list here because we would not test the implementation in KafkaApis then
1026410340
val describedGroups = List(
10265-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
10266-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
10267-
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
10341+
new StreamsGroupDescribeResponseData.DescribedGroup()
10342+
.setGroupId(groupIds.get(0))
10343+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10344+
.setSubtopologies(Collections.singletonList(subtopology0))),
10345+
new StreamsGroupDescribeResponseData.DescribedGroup()
10346+
.setGroupId(groupIds.get(1))
10347+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10348+
.setSubtopologies(Collections.singletonList(subtopology1))),
10349+
new StreamsGroupDescribeResponseData.DescribedGroup()
10350+
.setGroupId(groupIds.get(2))
10351+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10352+
.setSubtopologies(Collections.singletonList(subtopology2)))
1026810353
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
1026910354
val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData()
1027010355
.setGroups(describedGroups.asJava)
@@ -10353,6 +10438,88 @@ class KafkaApisTest extends Logging {
1035310438
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
1035410439
}
1035510440

10441+
@ParameterizedTest
10442+
@ValueSource(booleans = Array(true, false))
10443+
def testStreamsGroupDescribeFilterUnauthorizedTopics(includeAuthorizedOperations: Boolean): Unit = {
10444+
val fooTopicName = "foo"
10445+
val barTopicName = "bar"
10446+
val errorMessage = "The described group uses topics that the client is not authorized to describe."
10447+
10448+
metadataCache = mock(classOf[KRaftMetadataCache])
10449+
10450+
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
10451+
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
10452+
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
10453+
streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
10454+
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
10455+
10456+
val authorizer: Authorizer = mock(classOf[Authorizer])
10457+
val acls = Map(
10458+
groupIds.get(0) -> AuthorizationResult.ALLOWED,
10459+
groupIds.get(1) -> AuthorizationResult.ALLOWED,
10460+
groupIds.get(2) -> AuthorizationResult.ALLOWED,
10461+
fooTopicName -> AuthorizationResult.ALLOWED,
10462+
barTopicName -> AuthorizationResult.DENIED,
10463+
)
10464+
when(authorizer.authorize(
10465+
any[RequestContext],
10466+
any[util.List[Action]]
10467+
)).thenAnswer { invocation =>
10468+
val actions = invocation.getArgument(1, classOf[util.List[Action]])
10469+
actions.asScala.map { action =>
10470+
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
10471+
}.asJava
10472+
}
10473+
10474+
val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
10475+
when(groupCoordinator.streamsGroupDescribe(
10476+
any[RequestContext],
10477+
any[util.List[String]]
10478+
)).thenReturn(future)
10479+
kafkaApis = createKafkaApis(
10480+
authorizer = Some(authorizer),
10481+
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
10482+
)
10483+
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
10484+
10485+
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
10486+
.setSubtopologyId("subtopology0")
10487+
.setSourceTopics(Collections.singletonList(fooTopicName))
10488+
10489+
val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
10490+
.setSubtopologyId("subtopology1")
10491+
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
10492+
10493+
val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
10494+
.setSubtopologyId("subtopology2")
10495+
.setSourceTopics(Collections.singletonList(fooTopicName))
10496+
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
10497+
10498+
future.complete(List(
10499+
new StreamsGroupDescribeResponseData.DescribedGroup()
10500+
.setGroupId(groupIds.get(0))
10501+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10502+
.setSubtopologies(Collections.singletonList(subtopology0))),
10503+
new StreamsGroupDescribeResponseData.DescribedGroup()
10504+
.setGroupId(groupIds.get(1))
10505+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10506+
.setSubtopologies(Collections.singletonList(subtopology1))),
10507+
new StreamsGroupDescribeResponseData.DescribedGroup()
10508+
.setGroupId(groupIds.get(2))
10509+
.setTopology(new StreamsGroupDescribeResponseData.Topology()
10510+
.setSubtopologies(Collections.singletonList(subtopology2)))
10511+
).asJava)
10512+
10513+
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
10514+
assertNotNull(response.data)
10515+
assertEquals(3, response.data.groups.size)
10516+
assertEquals(Errors.NONE.code(), response.data.groups.get(0).errorCode())
10517+
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(1).errorCode())
10518+
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(2).errorCode())
10519+
assertEquals(errorMessage, response.data.groups.get(1).errorMessage())
10520+
assertEquals(errorMessage, response.data.groups.get(2).errorMessage())
10521+
}
10522+
1035610523
@Test
1035710524
def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
1035810525
val fooTopicName = "foo"

0 commit comments

Comments
 (0)