Skip to content

Commit 7340eef

Browse files
authored
MINOR: Reshape TxnOffsetCommitRequest.Builder (#22147)
This patch refactors `TxnOffsetCommitRequest.Builder` to match the factory-method style used by `OffsetCommitRequest.Builder`: the public constructors are replaced by a private one plus a single static `forTopicNames(TxnOffsetCommitRequestData, boolean)` factory. The `getTopics(...)` helper is made public so callers can build the topic list themselves. All call sites in production and test code are migrated to build a `TxnOffsetCommitRequestData` and pass it to the factory. The change is behavior-preserving. It prepares the ground for a future `forTopicIdsOrNames(...)` factory that will be added when `TxnOffsetCommit` gains topic ID support. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 20c2450 commit 7340eef

7 files changed

Lines changed: 100 additions & 144 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.common.message.FindCoordinatorRequestData;
4949
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
5050
import org.apache.kafka.common.message.InitProducerIdRequestData;
51+
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
5152
import org.apache.kafka.common.protocol.ApiKeys;
5253
import org.apache.kafka.common.protocol.Errors;
5354
import org.apache.kafka.common.record.internal.RecordBatch;
@@ -1248,17 +1249,17 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult
12481249
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
12491250
}
12501251

1252+
final TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
1253+
.setTransactionalId(transactionalId)
1254+
.setGroupId(groupMetadata.groupId())
1255+
.setProducerId(producerIdAndEpoch.producerId)
1256+
.setProducerEpoch(producerIdAndEpoch.epoch)
1257+
.setMemberId(groupMetadata.memberId())
1258+
.setGenerationId(groupMetadata.generationId())
1259+
.setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
1260+
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
12511261
final TxnOffsetCommitRequest.Builder builder =
1252-
new TxnOffsetCommitRequest.Builder(transactionalId,
1253-
groupMetadata.groupId(),
1254-
producerIdAndEpoch.producerId,
1255-
producerIdAndEpoch.epoch,
1256-
pendingTxnOffsetCommits,
1257-
groupMetadata.memberId(),
1258-
groupMetadata.generationId(),
1259-
groupMetadata.groupInstanceId(),
1260-
isTransactionV2Enabled()
1261-
);
1262+
TxnOffsetCommitRequest.Builder.forTopicNames(data, isTransactionV2Enabled());
12621263
if (result == null) {
12631264
// In this case, transaction V2 is in use.
12641265
return new TxnOffsetCommitHandler(builder);

clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -47,49 +47,20 @@ public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitReque
4747
public final TxnOffsetCommitRequestData data;
4848
public final boolean isTransactionV2Enabled;
4949

50-
public Builder(final String transactionalId,
51-
final String consumerGroupId,
52-
final long producerId,
53-
final short producerEpoch,
54-
final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
55-
final boolean isTransactionV2Enabled) {
56-
this(transactionalId,
57-
consumerGroupId,
58-
producerId,
59-
producerEpoch,
60-
pendingTxnOffsetCommits,
61-
JoinGroupRequest.UNKNOWN_MEMBER_ID,
62-
JoinGroupRequest.UNKNOWN_GENERATION_ID,
63-
Optional.empty(),
64-
isTransactionV2Enabled);
65-
}
66-
67-
public Builder(final String transactionalId,
68-
final String consumerGroupId,
69-
final long producerId,
70-
final short producerEpoch,
71-
final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
72-
final String memberId,
73-
final int generationId,
74-
final Optional<String> groupInstanceId,
75-
final boolean isTransactionV2Enabled) {
50+
private Builder(
51+
final TxnOffsetCommitRequestData data,
52+
final boolean isTransactionV2Enabled
53+
) {
7654
super(ApiKeys.TXN_OFFSET_COMMIT);
55+
this.data = data;
7756
this.isTransactionV2Enabled = isTransactionV2Enabled;
78-
this.data = new TxnOffsetCommitRequestData()
79-
.setTransactionalId(transactionalId)
80-
.setGroupId(consumerGroupId)
81-
.setProducerId(producerId)
82-
.setProducerEpoch(producerEpoch)
83-
.setTopics(getTopics(pendingTxnOffsetCommits))
84-
.setMemberId(memberId)
85-
.setGenerationId(generationId)
86-
.setGroupInstanceId(groupInstanceId.orElse(null));
8757
}
8858

89-
public Builder(final TxnOffsetCommitRequestData data) {
90-
super(ApiKeys.TXN_OFFSET_COMMIT);
91-
this.data = data;
92-
this.isTransactionV2Enabled = true;
59+
public static Builder forTopicNames(
60+
final TxnOffsetCommitRequestData data,
61+
final boolean isTransactionV2Enabled
62+
) {
63+
return new Builder(data, isTransactionV2Enabled);
9364
}
9465

9566
@Override
@@ -136,7 +107,7 @@ public Map<TopicPartition, CommittedOffset> offsets() {
136107
return offsetMap;
137108
}
138109

139-
static List<TxnOffsetCommitRequestTopic> getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
110+
public static List<TxnOffsetCommitRequestTopic> getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
140111
Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap = new HashMap<>();
141112
for (Map.Entry<TopicPartition, CommittedOffset> entry : pendingTxnOffsetCommits.entrySet()) {
142113
TopicPartition topicPartition = entry.getKey();

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@
237237
import org.apache.kafka.common.message.SyncGroupRequestData;
238238
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
239239
import org.apache.kafka.common.message.SyncGroupResponseData;
240+
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
240241
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
241242
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
242243
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
@@ -2841,34 +2842,20 @@ private TxnOffsetCommitRequest createTxnOffsetCommitRequest(short version) {
28412842
offsets.put(new TopicPartition("topic", 74),
28422843
new TxnOffsetCommitRequest.CommittedOffset(100, "blah", Optional.of(27)));
28432844

2844-
if (version < 3) {
2845-
return new TxnOffsetCommitRequest.Builder("transactionalId",
2846-
"groupId",
2847-
21L,
2848-
(short) 42,
2849-
offsets,
2850-
false).build();
2851-
} else if (version < 5) {
2852-
return new TxnOffsetCommitRequest.Builder("transactionalId",
2853-
"groupId",
2854-
21L,
2855-
(short) 42,
2856-
offsets,
2857-
"member",
2858-
2,
2859-
Optional.of("instance"),
2860-
false).build(version);
2861-
} else {
2862-
return new TxnOffsetCommitRequest.Builder("transactionalId",
2863-
"groupId",
2864-
21L,
2865-
(short) 42,
2866-
offsets,
2867-
"member",
2868-
2,
2869-
Optional.of("instance"),
2870-
true).build(version);
2845+
TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
2846+
.setTransactionalId("transactionalId")
2847+
.setGroupId("groupId")
2848+
.setProducerId(21L)
2849+
.setProducerEpoch((short) 42)
2850+
.setTopics(TxnOffsetCommitRequest.getTopics(offsets));
2851+
2852+
if (version >= 3) {
2853+
data.setMemberId("member")
2854+
.setGenerationId(2)
2855+
.setGroupInstanceId("instance");
28712856
}
2857+
2858+
return TxnOffsetCommitRequest.Builder.forTopicNames(data, version >= 5).build(version);
28722859
}
28732860

28742861
private TxnOffsetCommitRequest createTxnOffsetCommitRequestWithAutoDowngrade() {
@@ -2878,15 +2865,16 @@ private TxnOffsetCommitRequest createTxnOffsetCommitRequestWithAutoDowngrade() {
28782865
offsets.put(new TopicPartition("topic", 74),
28792866
new TxnOffsetCommitRequest.CommittedOffset(100, "blah", Optional.of(27)));
28802867

2881-
return new TxnOffsetCommitRequest.Builder("transactionalId",
2882-
"groupId",
2883-
21L,
2884-
(short) 42,
2885-
offsets,
2886-
"member",
2887-
2,
2888-
Optional.of("instance"),
2889-
false).build();
2868+
TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
2869+
.setTransactionalId("transactionalId")
2870+
.setGroupId("groupId")
2871+
.setProducerId(21L)
2872+
.setProducerEpoch((short) 42)
2873+
.setMemberId("member")
2874+
.setGenerationId(2)
2875+
.setGroupInstanceId("instance")
2876+
.setTopics(TxnOffsetCommitRequest.getTopics(offsets));
2877+
return TxnOffsetCommitRequest.Builder.forTopicNames(data, false).build();
28902878
}
28912879

28922880
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {

clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.TopicPartition;
2020
import org.apache.kafka.common.errors.UnsupportedVersionException;
21+
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
2122
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
2223
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
2324
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
@@ -28,8 +29,6 @@
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.Test;
3031

31-
import java.util.Arrays;
32-
import java.util.Collections;
3332
import java.util.HashMap;
3433
import java.util.List;
3534
import java.util.Map;
@@ -65,27 +64,25 @@ public void setUp() {
6564
String transactionalId = "transactionalId";
6665
int producerId = 10;
6766
short producerEpoch = 1;
68-
builder = new TxnOffsetCommitRequest.Builder(
69-
transactionalId,
70-
groupId,
71-
producerId,
72-
producerEpoch,
73-
OFFSETS,
74-
true
75-
);
67+
TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
68+
.setTransactionalId(transactionalId)
69+
.setGroupId(groupId)
70+
.setProducerId(producerId)
71+
.setProducerEpoch(producerEpoch)
72+
.setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
73+
builder = TxnOffsetCommitRequest.Builder.forTopicNames(data, true);
7674

7775
int generationId = 5;
78-
builderWithGroupMetadata = new TxnOffsetCommitRequest.Builder(
79-
transactionalId,
80-
groupId,
81-
producerId,
82-
producerEpoch,
83-
OFFSETS,
84-
memberId,
85-
generationId,
86-
Optional.of(groupInstanceId),
87-
true
88-
);
76+
TxnOffsetCommitRequestData dataWithGroupMetadata = new TxnOffsetCommitRequestData()
77+
.setTransactionalId(transactionalId)
78+
.setGroupId(groupId)
79+
.setProducerId(producerId)
80+
.setProducerEpoch(producerEpoch)
81+
.setMemberId(memberId)
82+
.setGenerationId(generationId)
83+
.setGroupInstanceId(groupInstanceId)
84+
.setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
85+
builderWithGroupMetadata = TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
8986
}
9087

9188
@Test
@@ -95,26 +92,23 @@ public void testConstructor() {
9592
errorsMap.put(new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR);
9693
errorsMap.put(new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR);
9794

98-
List<TxnOffsetCommitRequestTopic> expectedTopics = Arrays.asList(
95+
List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
9996
new TxnOffsetCommitRequestTopic()
10097
.setName(topicOne)
101-
.setPartitions(Collections.singletonList(
98+
.setPartitions(List.of(
10299
new TxnOffsetCommitRequestPartition()
103100
.setPartitionIndex(partitionOne)
104101
.setCommittedOffset(offset)
105102
.setCommittedLeaderEpoch(leaderEpoch)
106-
.setCommittedMetadata(metadata)
107-
)),
103+
.setCommittedMetadata(metadata))),
108104
new TxnOffsetCommitRequestTopic()
109105
.setName(topicTwo)
110-
.setPartitions(Collections.singletonList(
106+
.setPartitions(List.of(
111107
new TxnOffsetCommitRequestPartition()
112108
.setPartitionIndex(partitionTwo)
113109
.setCommittedOffset(offset)
114110
.setCommittedLeaderEpoch(leaderEpoch)
115-
.setCommittedMetadata(metadata)
116-
))
117-
);
111+
.setCommittedMetadata(metadata))));
118112

119113
for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
120114
final TxnOffsetCommitRequest request;
@@ -130,7 +124,7 @@ public void testConstructor() {
130124
request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
131125

132126
assertEquals(errorsMap, response.errors());
133-
assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts());
127+
assertEquals(Map.of(Errors.NOT_COORDINATOR, 2), response.errorCounts());
134128
assertEquals(throttleTimeMs, response.throttleTimeMs());
135129
}
136130
}
@@ -139,16 +133,16 @@ public void testConstructor() {
139133
@Override
140134
public void testGetErrorResponse() {
141135
TxnOffsetCommitResponseData expectedResponse = new TxnOffsetCommitResponseData()
142-
.setTopics(Arrays.asList(
136+
.setTopics(List.of(
143137
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
144138
.setName(topicOne)
145-
.setPartitions(Collections.singletonList(
139+
.setPartitions(List.of(
146140
new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
147141
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
148142
.setPartitionIndex(partitionOne))),
149143
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
150144
.setName(topicTwo)
151-
.setPartitions(Collections.singletonList(
145+
.setPartitions(List.of(
152146
new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
153147
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
154148
.setPartitionIndex(partitionTwo)))));

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
258258
expectedError: Errors,
259259
version: Short = ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
260260
): Unit = {
261-
val request = new TxnOffsetCommitRequest.Builder(
261+
val request = TxnOffsetCommitRequest.Builder.forTopicNames(
262262
new TxnOffsetCommitRequestData()
263263
.setGroupId(groupId)
264264
.setMemberId(memberId)
@@ -274,7 +274,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
274274
.setPartitionIndex(partition)
275275
.setCommittedOffset(offset)
276276
).asJava)
277-
).asJava)
277+
).asJava),
278+
true
278279
).build(version)
279280

280281
val expectedResponse = new TxnOffsetCommitResponseData()

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,14 +1475,13 @@ class KafkaApisTest extends Logging {
14751475

14761476
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
14771477
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
1478-
val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
1479-
"txnId",
1480-
"groupId",
1481-
15L,
1482-
0.toShort,
1483-
util.Map.of(invalidTopicPartition, partitionOffsetCommitData),
1484-
true
1485-
).build()
1478+
val data = new TxnOffsetCommitRequestData()
1479+
.setTransactionalId("txnId")
1480+
.setGroupId("groupId")
1481+
.setProducerId(15L)
1482+
.setProducerEpoch(0.toShort)
1483+
.setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(invalidTopicPartition, partitionOffsetCommitData)))
1484+
val offsetCommitRequest = TxnOffsetCommitRequest.Builder.forTopicNames(data, true).build()
14861485
val request = buildRequest(offsetCommitRequest)
14871486
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
14881487
any[Long])).thenReturn(0)
@@ -1521,7 +1520,7 @@ class KafkaApisTest extends Logging {
15211520
.setPartitionIndex(0)
15221521
.setCommittedOffset(10)))))
15231522

1524-
val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
1523+
val requestChannelRequest = buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest, true).build())
15251524

15261525
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
15271526
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1566,7 +1565,7 @@ class KafkaApisTest extends Logging {
15661565
.setPartitionIndex(0)
15671566
.setCommittedOffset(10)))))
15681567

1569-
val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
1568+
val requestChannelRequest = buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest, true).build())
15701569

15711570
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
15721571
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1638,7 +1637,7 @@ class KafkaApisTest extends Logging {
16381637
.setPartitionIndex(1)
16391638
.setCommittedOffset(70)))))
16401639

1641-
val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
1640+
val requestChannelRequest = buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest, true).build())
16421641

16431642
// This is the request expected by the group coordinator.
16441643
val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
@@ -1757,12 +1756,14 @@ class KafkaApisTest extends Logging {
17571756
val producerId = 15L
17581757
val epoch = 0.toShort
17591758

1760-
val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
1761-
"txnId",
1762-
groupId,
1763-
producerId,
1764-
epoch,
1765-
util.Map.of(topicPartition, partitionOffsetCommitData),
1759+
val data = new TxnOffsetCommitRequestData()
1760+
.setTransactionalId("txnId")
1761+
.setGroupId(groupId)
1762+
.setProducerId(producerId)
1763+
.setProducerEpoch(epoch)
1764+
.setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(topicPartition, partitionOffsetCommitData)))
1765+
val offsetCommitRequest = TxnOffsetCommitRequest.Builder.forTopicNames(
1766+
data,
17661767
version >= TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
17671768
).build(version)
17681769
val request = buildRequest(offsetCommitRequest)

0 commit comments

Comments
 (0)