Skip to content

Commit 3a3646a

Browse files
committed
Merge branch 'trunk' into KAFKA-20167
2 parents d6afb13 + 84f810e commit 3a3646a

84 files changed

Lines changed: 4823 additions & 556 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/docker_scan.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
strategy:
2727
matrix:
2828
# This is an array of supported tags. Make sure this array only contains the supported tags
29-
supported_image_tag: ['latest', '3.9.1', '4.0.1', '4.1.1', '4.2.0']
29+
supported_image_tag: ['latest', '3.9.2', '4.0.1', '4.1.1', '4.2.0']
3030
steps:
3131
- name: Run CVE scan
3232
uses: aquasecurity/trivy-action@b6643a29fecd7f34b3597bc6acb0a98b03d33ff8 # v0.33.1

LICENSE-binary

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,16 @@ License Version 2.0:
225225
- jakarta.inject-api-2.0.1
226226
- jakarta.validation-api-3.0.2
227227
- javassist-3.30.2-GA
228-
- jetty-alpn-client-12.0.32
229-
- jetty-client-12.0.32
230-
- jetty-ee10-servlet-12.0.32
231-
- jetty-ee10-servlets-12.0.32
232-
- jetty-http-12.0.32
233-
- jetty-io-12.0.32
234-
- jetty-security-12.0.32
235-
- jetty-server-12.0.32
236-
- jetty-session-12.0.32
237-
- jetty-util-12.0.32
228+
- jetty-alpn-client-12.0.25
229+
- jetty-client-12.0.25
230+
- jetty-ee10-servlet-12.0.25
231+
- jetty-ee10-servlets-12.0.25
232+
- jetty-http-12.0.25
233+
- jetty-io-12.0.25
234+
- jetty-security-12.0.25
235+
- jetty-server-12.0.25
236+
- jetty-session-12.0.25
237+
- jetty-util-12.0.25
238238
- jose4j-0.9.6
239239
- jspecify-1.0.0
240240
- log4j-api-2.25.3

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ plugins {
3939

4040
id "com.github.spotbugs" version '6.4.4' apply false
4141
id 'org.scoverage' version '8.1' apply false
42-
id 'com.gradleup.shadow' version '8.3.9' apply false
42+
id 'com.gradleup.shadow' version '9.3.1' apply false
4343
id 'com.diffplug.spotless' version "8.0.0"
4444
}
4545

clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> re
3939
this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES, false);
4040
}
4141

42+
public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
43+
this(error, replicaInfos, totalBytes, usableBytes, false);
44+
}
45+
4246
public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos, long totalBytes, long usableBytes, boolean isCordoned) {
4347
this.error = error;
4448
this.replicaInfos = replicaInfos;

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3172,6 +3172,12 @@ class KafkaApis(val requestChannel: RequestChannel,
31723172

31733173
val groupId = shareFetchRequest.data.groupId
31743174

3175+
if (groupId == null) {
3176+
requestHelper.sendMaybeThrottle(request,
3177+
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
3178+
return CompletableFuture.completedFuture[Unit](())
3179+
}
3180+
31753181
// Share Fetch needs permission to perform the READ action on the named group resource (groupId)
31763182
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
31773183
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception))
@@ -3539,6 +3545,12 @@ class KafkaApis(val requestChannel: RequestChannel,
35393545

35403546
val groupId = shareAcknowledgeRequest.data.groupId
35413547

3548+
if (groupId == null) {
3549+
requestHelper.sendMaybeThrottle(request,
3550+
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
3551+
return CompletableFuture.completedFuture[Unit](())
3552+
}
3553+
35423554
// Share Acknowledge needs permission to perform READ action on the named group resource (groupId)
35433555
if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
35443556
requestHelper.sendMaybeThrottle(request,
@@ -4222,7 +4234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
42224234
* @return boolean if the member id in the RPC is valid or not.
42234235
*/
42244236
def isMemberIdValid(memberId: String): Boolean = {
4225-
memberId.nonEmpty && memberId.length <= 36
4237+
memberId != null && memberId.nonEmpty && memberId.length <= 36
42264238
}
42274239

42284240
private def updateRecordConversionStats(request: RequestChannel.Request,

core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala

Lines changed: 0 additions & 130 deletions
This file was deleted.

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,42 +1070,42 @@ class DynamicBrokerConfigTest {
10701070
// Cordoning 1 new log dir, so 1 new handleCordoned invocation
10711071
val props = new Properties()
10721072
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, logDirs.get(0))
1073-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1073+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
10741074
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
10751075
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
10761076
verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
10771077

10781078
// When using *, no other entries must be specified, so no new invocations
10791079
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
1080-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1080+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
10811081
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
10821082
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
10831083
verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
10841084

10851085
// Invalid log dir, so no new invocations
10861086
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
1087-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1087+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
10881088
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
10891089
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
10901090
verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
10911091

10921092
// * cordons the 2nd log dir, so 1 new handleCordoned invocation
10931093
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
1094-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1094+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
10951095
assertEquals(logDirs, ctx.config.cordonedLogDirs)
10961096
verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
10971097
verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
10981098

10991099
// clearing all cordoned log dirs, so 1 new handleUncordoned invocation
11001100
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
1101-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1101+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
11021102
assertTrue(ctx.config.cordonedLogDirs.isEmpty)
11031103
verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
11041104
verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
11051105

11061106
// * cordons all log dirs, so 1 new handleCordoned invocation
11071107
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",", logDirs))
1108-
ctx.config.dynamicConfig.updateDefaultConfig(props)
1108+
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
11091109
assertEquals(logDirs, ctx.config.cordonedLogDirs)
11101110
verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
11111111
verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)

core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ class RequestQuotaTest extends BaseRequestTest {
329329
)
330330
)
331331
)
332+
332333
case ApiKeys.OFFSET_FETCH =>
333334
OffsetFetchRequest.Builder.forTopicNames(
334335
new OffsetFetchRequestData()
@@ -493,6 +494,7 @@ class RequestQuotaTest extends BaseRequestTest {
493494
.setHost("*")
494495
.setOperation(AclOperation.WRITE.code)
495496
.setPermissionType(AclPermissionType.DENY.code))))
497+
496498
case ApiKeys.DELETE_ACLS =>
497499
new DeleteAclsRequest.Builder(new DeleteAclsRequestData().setFilters(util.List.of(
498500
new DeleteAclsRequestData.DeleteAclsFilter()
@@ -503,6 +505,7 @@ class RequestQuotaTest extends BaseRequestTest {
503505
.setHostFilter("*")
504506
.setOperation(AclOperation.ANY.code)
505507
.setPermissionType(AclPermissionType.DENY.code))))
508+
506509
case ApiKeys.DESCRIBE_CONFIGS =>
507510
new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
508511
.setResources(util.List.of(new DescribeConfigsRequestData.DescribeConfigsResource()
@@ -719,7 +722,10 @@ class RequestQuotaTest extends BaseRequestTest {
719722
).iterator)))
720723

721724
case ApiKeys.SHARE_ACKNOWLEDGE =>
722-
new ShareAcknowledgeRequest.Builder(new ShareAcknowledgeRequestData())
725+
new ShareAcknowledgeRequest.Builder(
726+
new ShareAcknowledgeRequestData()
727+
.setGroupId("test-share-group")
728+
.setMemberId(Uuid.randomUuid().toString))
723729

724730
case ApiKeys.ADD_RAFT_VOTER =>
725731
new AddRaftVoterRequest.Builder(new AddRaftVoterRequestData())

docs/design/protocol.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Kafka is a partitioned system so not all servers have the complete data set. Ins
5252

5353
All systems of this nature have the question of how a particular piece of data is assigned to a particular partition. Kafka clients directly control this assignment, the brokers themselves enforce no particular semantics of which messages should be published to a particular partition. Rather, to publish messages the client directly addresses messages to a particular partition, and when fetching messages, fetches from a particular partition. If two clients want to use the same partitioning scheme they must use the same method to compute the mapping of key to partition.
5454

55-
These requests to publish or fetch data must be sent to the broker that is currently acting as the leader for a given partition. This condition is enforced by the broker, so a request for a particular partition to the wrong broker will result in an the NotLeaderForPartition error code (described below).
55+
These requests to publish or fetch data must be sent to the broker that is currently acting as the leader for a given partition. This condition is enforced by the broker, so a request for a particular partition to the wrong broker will result in the NotLeaderForPartition error code (described below).
5656

5757
How can the client find out which topics exist, what partitions they have, and which brokers currently host those partitions so that it can direct its requests to the right hosts? This information is dynamic, so you can't just configure each client with some static mapping file. Instead all Kafka brokers can answer a metadata request that describes the current state of the cluster: what topics there are, which partitions those topics have, which broker is the leader for those partitions, and the host and port information for these brokers.
5858

@@ -79,7 +79,7 @@ Partitioning really serves two purposes in Kafka:
7979

8080
For a given use case you may care about only one of these or both.
8181

82-
To accomplish simple load balancing a simple approach would be for the client to just round robin requests over all brokers. Another alternative, in an environment where there are many more producers than brokers, would be to have each client chose a single partition at random and publish to that. This later strategy will result in far fewer TCP connections.
82+
To accomplish simple load balancing a simple approach would be for the client to just round robin requests over all brokers. Another alternative, in an environment where there are many more producers than brokers, would be to have each client choose a single partition at random and publish to that. This latter strategy will result in far fewer TCP connections.
8383

8484
Semantic partitioning means using some key in the message to assign messages to partitions. For example if you were processing a click message stream you might want to partition the stream by the user id so that all data for a particular user would go to a single consumer. To accomplish this the client can take a key associated with the message and use some hash of this key to choose the partition to which to deliver the message.
8585

@@ -113,7 +113,7 @@ The following sequence may be used by a client to obtain supported API versions
113113
2. On receiving `ApiVersionsRequest`, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the `ApiVersionRequest` is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request. Also note that if the client `ApiVersionsRequest` version is unsupported by the broker (client is ahead), and the broker version is 2.4.0 or greater, then the broker will respond with a version 0 ApiVersionsResponse with the error code set to `UNSUPPORTED_VERSION` and the `api_versions` field populated with the supported version of the `ApiVersionsRequest`. It is then up to the client to retry, making another `ApiVersionsRequest` using the highest version supported by the client and broker. See [KIP-511: Collect and Expose Client's Name and Version in the Brokers](https://cwiki.apache.org/confluence/x/qRJ4Bw)
114114
3. If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.
115115
4. Deprecation of a protocol version is done by marking an API version as deprecated in the protocol documentation.
116-
5. Supported API versions obtained from a broker are only valid for the connection on which that information is obtained. In the event of disconnection, the client should obtain the information from the broker again, as the broker might have been upgraded/downgraded in the mean time.
116+
5. Supported API versions obtained from a broker are only valid for the connection on which that information is obtained. In the event of disconnection, the client should obtain the information from the broker again, as the broker might have been upgraded/downgraded in the meantime.
117117

118118

119119

0 commit comments

Comments
 (0)