Skip to content

Commit 20c2450

Browse files
authored
MINOR: Reshape TxnOffsetCommitResponse.Builder (#22146)
This patch refactors `TxnOffsetCommitResponse.Builder` to mirror the class hierarchy used by `OffsetCommitResponse.Builder`: the `Builder` becomes abstract and the existing topic-name-keyed logic moves to a `TopicNameBuilder` subclass. A new `newBuilder()` factory replaces the only direct `new Builder()` call site in `KafkaApis`. Unit tests covering `addPartition`, `addPartitions`, `merge`, and the null-name guard are added in `TxnOffsetCommitResponseTest`. The change is behavior-preserving. It prepares the ground for a future `TopicIdBuilder` subclass that will be added when `TxnOffsetCommit` gains topic ID support. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 2dfdb70 commit 20c2450

3 files changed

Lines changed: 201 additions & 34 deletions

File tree

clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,34 @@
4949
*/
5050
public class TxnOffsetCommitResponse extends AbstractResponse {
5151

52-
public static class Builder {
53-
TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData();
54-
HashMap<String, TxnOffsetCommitResponseTopic> byTopicName = new HashMap<>();
52+
public static Builder newBuilder() {
53+
return new TopicNameBuilder();
54+
}
55+
56+
public abstract static class Builder {
57+
protected TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData();
58+
59+
protected abstract void add(
60+
TxnOffsetCommitResponseTopic topic
61+
);
5562

56-
private TxnOffsetCommitResponseTopic getOrCreateTopic(
63+
protected abstract TxnOffsetCommitResponseTopic get(
5764
String topicName
58-
) {
59-
TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
60-
if (topic == null) {
61-
topic = new TxnOffsetCommitResponseTopic().setName(topicName);
62-
data.topics().add(topic);
63-
byTopicName.put(topicName, topic);
64-
}
65-
return topic;
66-
}
65+
);
66+
67+
protected abstract TxnOffsetCommitResponseTopic getOrCreate(
68+
String topicName
69+
);
6770

6871
public Builder addPartition(
6972
String topicName,
7073
int partitionIndex,
7174
Errors error
7275
) {
73-
final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
74-
76+
final TxnOffsetCommitResponseTopic topicResponse = getOrCreate(topicName);
7577
topicResponse.partitions().add(new TxnOffsetCommitResponsePartition()
7678
.setPartitionIndex(partitionIndex)
7779
.setErrorCode(error.code()));
78-
7980
return this;
8081
}
8182

@@ -85,14 +86,12 @@ public <P> Builder addPartitions(
8586
Function<P, Integer> partitionIndex,
8687
Errors error
8788
) {
88-
final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
89-
89+
final TxnOffsetCommitResponseTopic topicResponse = getOrCreate(topicName);
9090
partitions.forEach(partition ->
9191
topicResponse.partitions().add(new TxnOffsetCommitResponsePartition()
9292
.setPartitionIndex(partitionIndex.apply(partition))
9393
.setErrorCode(error.code()))
9494
);
95-
9695
return this;
9796
}
9897

@@ -105,11 +104,10 @@ public Builder merge(
105104
} else {
106105
// Otherwise, we have to merge them together.
107106
newData.topics().forEach(newTopic -> {
108-
TxnOffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
107+
TxnOffsetCommitResponseTopic existingTopic = get(newTopic.name());
109108
if (existingTopic == null) {
110109
// If no topic exists, we can directly copy the new topic data.
111-
data.topics().add(newTopic);
112-
byTopicName.put(newTopic.name(), newTopic);
110+
add(newTopic);
113111
} else {
114112
// Otherwise, we add the partitions to the existing one. Note we
115113
// expect non-overlapping partitions here as we don't verify
@@ -118,7 +116,6 @@ public Builder merge(
118116
}
119117
});
120118
}
121-
122119
return this;
123120
}
124121

@@ -127,6 +124,41 @@ public TxnOffsetCommitResponse build() {
127124
}
128125
}
129126

127+
public static class TopicNameBuilder extends Builder {
128+
private final HashMap<String, TxnOffsetCommitResponseTopic> byTopicName = new HashMap<>();
129+
130+
@Override
131+
protected void add(TxnOffsetCommitResponseTopic topic) {
132+
throwIfTopicNameIsNull(topic.name());
133+
data.topics().add(topic);
134+
byTopicName.put(topic.name(), topic);
135+
}
136+
137+
@Override
138+
protected TxnOffsetCommitResponseTopic get(String topicName) {
139+
throwIfTopicNameIsNull(topicName);
140+
return byTopicName.get(topicName);
141+
}
142+
143+
@Override
144+
protected TxnOffsetCommitResponseTopic getOrCreate(String topicName) {
145+
throwIfTopicNameIsNull(topicName);
146+
TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
147+
if (topic == null) {
148+
topic = new TxnOffsetCommitResponseTopic().setName(topicName);
149+
data.topics().add(topic);
150+
byTopicName.put(topicName, topic);
151+
}
152+
return topic;
153+
}
154+
155+
private static void throwIfTopicNameIsNull(String topicName) {
156+
if (topicName == null) {
157+
throw new IllegalArgumentException("TopicName cannot be null.");
158+
}
159+
}
160+
}
161+
130162
private final TxnOffsetCommitResponseData data;
131163

132164
public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {

clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java

Lines changed: 145 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
package org.apache.kafka.common.requests;
1818

1919
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
20+
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
21+
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
2022
import org.apache.kafka.common.protocol.ApiKeys;
2123
import org.apache.kafka.common.protocol.MessageUtil;
2224

2325
import org.junit.jupiter.api.Test;
2426

25-
import java.util.Arrays;
26-
import java.util.Collections;
27+
import java.util.List;
2728

2829
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
2931

3032
public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
3133

@@ -44,16 +46,15 @@ public void testConstructorWithErrorResponse() {
4446
public void testParse() {
4547
TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData()
4648
.setThrottleTimeMs(throttleTimeMs)
47-
.setTopics(Arrays.asList(
48-
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
49-
Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
49+
.setTopics(List.of(
50+
new TxnOffsetCommitResponseTopic().setPartitions(List.of(
51+
new TxnOffsetCommitResponsePartition()
5052
.setPartitionIndex(partitionOne)
5153
.setErrorCode(errorOne.code()))),
52-
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
53-
Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
54-
.setPartitionIndex(partitionTwo)
55-
.setErrorCode(errorTwo.code())))
56-
));
54+
new TxnOffsetCommitResponseTopic().setPartitions(List.of(
55+
new TxnOffsetCommitResponsePartition()
56+
.setPartitionIndex(partitionTwo)
57+
.setErrorCode(errorTwo.code())))));
5758

5859
for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
5960
TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse(
@@ -64,4 +65,138 @@ public void testParse() {
6465
}
6566
}
6667

68+
@Test
69+
public void testBuilderAddPartition() {
70+
TxnOffsetCommitResponse.Builder builder = TxnOffsetCommitResponse.newBuilder();
71+
builder.addPartition(topicOne, partitionOne, errorOne);
72+
builder.addPartition(topicOne, partitionTwo, errorTwo);
73+
builder.addPartition(topicTwo, partitionOne, errorOne);
74+
75+
TxnOffsetCommitResponseData expected = new TxnOffsetCommitResponseData()
76+
.setTopics(List.of(
77+
new TxnOffsetCommitResponseTopic()
78+
.setName(topicOne)
79+
.setPartitions(List.of(
80+
new TxnOffsetCommitResponsePartition()
81+
.setPartitionIndex(partitionOne)
82+
.setErrorCode(errorOne.code()),
83+
new TxnOffsetCommitResponsePartition()
84+
.setPartitionIndex(partitionTwo)
85+
.setErrorCode(errorTwo.code()))),
86+
new TxnOffsetCommitResponseTopic()
87+
.setName(topicTwo)
88+
.setPartitions(List.of(
89+
new TxnOffsetCommitResponsePartition()
90+
.setPartitionIndex(partitionOne)
91+
.setErrorCode(errorOne.code())))));
92+
93+
assertEquals(expected, builder.build().data());
94+
}
95+
96+
@Test
97+
public void testBuilderAddPartitions() {
98+
TxnOffsetCommitResponse.Builder builder = TxnOffsetCommitResponse.newBuilder();
99+
builder.addPartitions(topicOne, List.of(partitionOne, partitionTwo), p -> p, errorOne);
100+
101+
TxnOffsetCommitResponseData expected = new TxnOffsetCommitResponseData()
102+
.setTopics(List.of(
103+
new TxnOffsetCommitResponseTopic()
104+
.setName(topicOne)
105+
.setPartitions(List.of(
106+
new TxnOffsetCommitResponsePartition()
107+
.setPartitionIndex(partitionOne)
108+
.setErrorCode(errorOne.code()),
109+
new TxnOffsetCommitResponsePartition()
110+
.setPartitionIndex(partitionTwo)
111+
.setErrorCode(errorOne.code())))));
112+
113+
assertEquals(expected, builder.build().data());
114+
}
115+
116+
@Test
117+
public void testBuilderMergeIntoEmpty() {
118+
TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
119+
.setTopics(List.of(
120+
new TxnOffsetCommitResponseTopic()
121+
.setName(topicOne)
122+
.setPartitions(List.of(
123+
new TxnOffsetCommitResponsePartition()
124+
.setPartitionIndex(partitionOne)
125+
.setErrorCode(errorOne.code())))));
126+
127+
TxnOffsetCommitResponse response = TxnOffsetCommitResponse.newBuilder()
128+
.merge(newData)
129+
.build();
130+
131+
assertEquals(newData, response.data());
132+
}
133+
134+
@Test
135+
public void testBuilderMergeAddsNewTopic() {
136+
TxnOffsetCommitResponse.Builder builder = TxnOffsetCommitResponse.newBuilder();
137+
builder.addPartition(topicOne, partitionOne, errorOne);
138+
139+
TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
140+
.setTopics(List.of(
141+
new TxnOffsetCommitResponseTopic()
142+
.setName(topicTwo)
143+
.setPartitions(List.of(
144+
new TxnOffsetCommitResponsePartition()
145+
.setPartitionIndex(partitionTwo)
146+
.setErrorCode(errorTwo.code())))));
147+
148+
TxnOffsetCommitResponseData expected = new TxnOffsetCommitResponseData()
149+
.setTopics(List.of(
150+
new TxnOffsetCommitResponseTopic()
151+
.setName(topicOne)
152+
.setPartitions(List.of(
153+
new TxnOffsetCommitResponsePartition()
154+
.setPartitionIndex(partitionOne)
155+
.setErrorCode(errorOne.code()))),
156+
new TxnOffsetCommitResponseTopic()
157+
.setName(topicTwo)
158+
.setPartitions(List.of(
159+
new TxnOffsetCommitResponsePartition()
160+
.setPartitionIndex(partitionTwo)
161+
.setErrorCode(errorTwo.code())))));
162+
163+
assertEquals(expected, builder.merge(newData).build().data());
164+
}
165+
166+
@Test
167+
public void testBuilderMergeAppendsToExistingTopic() {
168+
TxnOffsetCommitResponse.Builder builder = TxnOffsetCommitResponse.newBuilder();
169+
builder.addPartition(topicOne, partitionOne, errorOne);
170+
171+
TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
172+
.setTopics(List.of(
173+
new TxnOffsetCommitResponseTopic()
174+
.setName(topicOne)
175+
.setPartitions(List.of(
176+
new TxnOffsetCommitResponsePartition()
177+
.setPartitionIndex(partitionTwo)
178+
.setErrorCode(errorTwo.code())))));
179+
180+
TxnOffsetCommitResponseData expected = new TxnOffsetCommitResponseData()
181+
.setTopics(List.of(
182+
new TxnOffsetCommitResponseTopic()
183+
.setName(topicOne)
184+
.setPartitions(List.of(
185+
new TxnOffsetCommitResponsePartition()
186+
.setPartitionIndex(partitionOne)
187+
.setErrorCode(errorOne.code()),
188+
new TxnOffsetCommitResponsePartition()
189+
.setPartitionIndex(partitionTwo)
190+
.setErrorCode(errorTwo.code())))));
191+
192+
assertEquals(expected, builder.merge(newData).build().data());
193+
}
194+
195+
@Test
196+
public void testTopicNameBuilderRejectsNullTopicName() {
197+
TxnOffsetCommitResponse.Builder builder = TxnOffsetCommitResponse.newBuilder();
198+
assertThrows(IllegalArgumentException.class,
199+
() -> builder.addPartition(null, partitionOne, errorOne));
200+
}
201+
67202
}

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2057,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
20572057
txnOffsetCommitRequest.data.topics.asScala
20582058
)(_.name)
20592059

2060-
val responseBuilder = new TxnOffsetCommitResponse.Builder()
2060+
val responseBuilder = TxnOffsetCommitResponse.newBuilder()
20612061
val authorizedTopicCommittedOffsets = new mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
20622062
txnOffsetCommitRequest.data.topics.forEach { topic =>
20632063
if (!authorizedTopics.contains(topic.name)) {

0 commit comments

Comments
 (0)