Skip to content

Commit 7b46af6

Browse files
authored
KAFKA-20158: Add AggregationWithHeaders, serialization support and tests (1/N) (#21511)
This PR introduces `AggregationWithHeaders` and serialization support introduced in KIP-1271 for storing session aggregations with headers.
1 parent d9c77be commit 7b46af6

6 files changed

Lines changed: 690 additions & 0 deletions

File tree

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state;
18+
19+
import org.apache.kafka.common.header.Headers;
20+
21+
import java.util.Objects;
22+
23+
/**
24+
* Combines an aggregated value with its associated record headers.
25+
* This is used by SessionStoreWithHeaders to store session aggregations along with headers.
26+
*
27+
* @param <AGG> the aggregation type
28+
*/
29+
public final class AggregationWithHeaders<AGG> {
30+
31+
private final AGG aggregation;
32+
private final Headers headers;
33+
34+
private AggregationWithHeaders(final AGG aggregation, final Headers headers) {
35+
Objects.requireNonNull(headers, "headers must not be null");
36+
this.aggregation = aggregation;
37+
this.headers = headers;
38+
}
39+
40+
/**
41+
* Create a new {@link AggregationWithHeaders} instance if the provided {@code aggregation} is not {@code null}.
42+
*
43+
* @param aggregation the aggregation
44+
* @param headers the headers (may be {@code null}, treated as empty)
45+
* @param <AGG> the type of the aggregation
46+
* @return a new {@link AggregationWithHeaders} instance if the provided {@code aggregation} is not {@code null};
47+
* otherwise {@code null} is returned
48+
*/
49+
public static <AGG> AggregationWithHeaders<AGG> make(final AGG aggregation, final Headers headers) {
50+
if (aggregation == null) {
51+
return null;
52+
}
53+
return new AggregationWithHeaders<>(aggregation, headers);
54+
}
55+
56+
/**
57+
* Create a new {@link AggregationWithHeaders} instance.
58+
* The provided {@code aggregation} may be {@code null}.
59+
*
60+
* @param aggregation the aggregation (may be {@code null})
61+
* @param headers the headers (may be {@code null}, treated as empty)
62+
* @param <AGG> the type of the aggregation
63+
* @return a new {@link AggregationWithHeaders} instance
64+
*/
65+
public static <AGG> AggregationWithHeaders<AGG> makeAllowNullable(final AGG aggregation, final Headers headers) {
66+
return new AggregationWithHeaders<>(aggregation, headers);
67+
}
68+
69+
/**
70+
* Return the wrapped {@code aggregation} of the given {@code aggregationWithHeaders} parameter
71+
* if the parameter is not {@code null}.
72+
*
73+
* @param aggregationWithHeaders an {@link AggregationWithHeaders} instance; can be {@code null}
74+
* @param <AGG> the type of the aggregation
75+
* @return the wrapped {@code aggregation} of {@code aggregationWithHeaders} if not {@code null}; otherwise {@code null}
76+
*/
77+
public static <AGG> AGG getAggregationOrNull(final AggregationWithHeaders<AGG> aggregationWithHeaders) {
78+
return aggregationWithHeaders == null ? null : aggregationWithHeaders.aggregation;
79+
}
80+
81+
public AGG aggregation() {
82+
return aggregation;
83+
}
84+
85+
public Headers headers() {
86+
return headers;
87+
}
88+
89+
@Override
90+
public boolean equals(final Object o) {
91+
if (this == o) {
92+
return true;
93+
}
94+
if (!(o instanceof AggregationWithHeaders)) {
95+
return false;
96+
}
97+
final AggregationWithHeaders<?> that = (AggregationWithHeaders<?>) o;
98+
return Objects.equals(aggregation, that.aggregation)
99+
&& Objects.equals(this.headers, that.headers);
100+
}
101+
102+
@Override
103+
public int hashCode() {
104+
return Objects.hash(aggregation, headers);
105+
}
106+
107+
@Override
108+
public String toString() {
109+
return "AggregationWithHeaders{" +
110+
"aggregation=" + aggregation +
111+
", headers=" + headers +
112+
'}';
113+
}
114+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.errors.SerializationException;
20+
import org.apache.kafka.common.header.Headers;
21+
import org.apache.kafka.common.serialization.Deserializer;
22+
import org.apache.kafka.common.utils.ByteUtils;
23+
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
24+
import org.apache.kafka.streams.processor.internals.SerdeGetter;
25+
import org.apache.kafka.streams.state.AggregationWithHeaders;
26+
27+
import java.nio.ByteBuffer;
28+
import java.util.Map;
29+
import java.util.Objects;
30+
31+
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
32+
33+
/**
34+
* Deserializer for AggregationWithHeaders.
35+
* Deserialization format (per KIP-1271):
36+
* [headersSize(varint)][headersBytes][aggregation]
37+
* <p>
38+
* Where:
39+
* - headersSize: Size of the headersBytes section in bytes, encoded as varint
40+
* - headersBytes:
41+
* - For null/empty headers: headersSize = 0, headersBytes is omitted (0 bytes)
42+
* - For non-empty headers: headersSize > 0, serialized headers in the format [count(varint)][header1][header2]... to be processed by HeadersDeserializer.
43+
* - aggregation: Serialized aggregation to be deserialized with the provided aggregation deserializer
44+
* <p>
45+
* This is used by KIP-1271 to deserialize aggregations with headers from session state stores.
46+
*/
47+
class AggregationWithHeadersDeserializer<AGG> implements WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
48+
49+
public final Deserializer<AGG> aggregationDeserializer;
50+
51+
AggregationWithHeadersDeserializer(final Deserializer<AGG> aggregationDeserializer) {
52+
Objects.requireNonNull(aggregationDeserializer);
53+
this.aggregationDeserializer = aggregationDeserializer;
54+
}
55+
56+
@Override
57+
public void configure(final Map<String, ?> configs, final boolean isKey) {
58+
aggregationDeserializer.configure(configs, isKey);
59+
}
60+
61+
@Override
62+
public AggregationWithHeaders<AGG> deserialize(final String topic, final byte[] aggregationWithHeaders) {
63+
if (aggregationWithHeaders == null) {
64+
return null;
65+
}
66+
67+
final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
68+
final Headers headers = readHeaders(buffer);
69+
final byte[] rawAggregation = readBytes(buffer, buffer.remaining());
70+
final AGG aggregation = aggregationDeserializer.deserialize(topic, headers, rawAggregation);
71+
72+
return AggregationWithHeaders.makeAllowNullable(aggregation, headers);
73+
}
74+
75+
@Override
76+
public void close() {
77+
aggregationDeserializer.close();
78+
}
79+
80+
@Override
81+
public void setIfUnset(final SerdeGetter getter) {
82+
initNullableDeserializer(aggregationDeserializer, getter);
83+
}
84+
85+
86+
/**
87+
* Reads the specified number of bytes from the buffer with validation.
88+
*
89+
* @param buffer the ByteBuffer to read from
90+
* @param length the number of bytes to read
91+
* @return the byte array containing the read bytes
92+
* @throws SerializationException if buffer doesn't have enough bytes or length is negative
93+
*/
94+
private static byte[] readBytes(final ByteBuffer buffer, final int length) {
95+
if (length < 0) {
96+
throw new SerializationException(
97+
"Invalid AggregationWithHeaders format: negative length " + length
98+
);
99+
}
100+
if (buffer.remaining() < length) {
101+
throw new SerializationException(
102+
"Invalid AggregationWithHeaders format: expected " + length +
103+
" bytes but only " + buffer.remaining() + " bytes remaining"
104+
);
105+
}
106+
final byte[] bytes = new byte[length];
107+
buffer.get(bytes);
108+
return bytes;
109+
}
110+
111+
/**
112+
* Extract headers from serialized AggregationWithHeaders.
113+
*/
114+
static Headers headers(final byte[] rawAggregationWithHeaders) {
115+
if (rawAggregationWithHeaders == null) {
116+
return null;
117+
}
118+
119+
final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
120+
return readHeaders(buffer);
121+
}
122+
123+
private static Headers readHeaders(final ByteBuffer buffer) {
124+
final int headersSize = ByteUtils.readVarint(buffer);
125+
final byte[] rawHeaders = readBytes(buffer, headersSize);
126+
return HeadersDeserializer.deserialize(rawHeaders);
127+
}
128+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.errors.SerializationException;
20+
import org.apache.kafka.common.header.Headers;
21+
import org.apache.kafka.common.serialization.Serializer;
22+
import org.apache.kafka.common.utils.ByteUtils;
23+
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
24+
import org.apache.kafka.streams.processor.internals.SerdeGetter;
25+
import org.apache.kafka.streams.state.AggregationWithHeaders;
26+
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.DataOutputStream;
29+
import java.io.IOException;
30+
import java.util.Map;
31+
import java.util.Objects;
32+
33+
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
34+
35+
/**
36+
* Serializer for AggregationWithHeaders.
37+
<p>
38+
* Serialization format (per KIP-1271):
39+
* [headersSize(varint)][headersBytes][aggregation]
40+
* <p>
41+
* Where:
42+
* - headersSize: Size of the headersBytes section in bytes, encoded as varint
43+
* - headersBytes:
44+
* - For null/empty headers: headersSize = 0, headersBytes is omitted (0 bytes)
45+
* - For non-empty headers: headersSize > 0, serialized headers ([count(varint)][header1][header2]...) from HeadersSerializer
46+
* - aggregation: Serialized aggregation using the provided aggregation serializer
47+
* <p>
48+
* This is used by KIP-1271 to serialize aggregations with headers for session state stores.
49+
*/
50+
class AggregationWithHeadersSerializer<AGG> implements WrappingNullableSerializer<AggregationWithHeaders<AGG>, Void, AGG> {
51+
public final Serializer<AGG> aggregationSerializer;
52+
53+
AggregationWithHeadersSerializer(final Serializer<AGG> aggregationSerializer) {
54+
Objects.requireNonNull(aggregationSerializer);
55+
this.aggregationSerializer = aggregationSerializer;
56+
}
57+
58+
@Override
59+
public void configure(final Map<String, ?> configs, final boolean isKey) {
60+
aggregationSerializer.configure(configs, isKey);
61+
}
62+
63+
@Override
64+
public byte[] serialize(final String topic, final AggregationWithHeaders<AGG> aggregationWithHeaders) {
65+
if (aggregationWithHeaders == null) {
66+
return null;
67+
}
68+
return serialize(topic, aggregationWithHeaders.aggregation(), aggregationWithHeaders.headers());
69+
}
70+
71+
private byte[] serialize(final String topic, final AGG plainAggregation, final Headers headers) {
72+
if (plainAggregation == null) {
73+
return null;
74+
}
75+
76+
final byte[] rawAggregation = aggregationSerializer.serialize(topic, headers, plainAggregation);
77+
78+
if (rawAggregation == null) {
79+
return null;
80+
}
81+
82+
final byte[] rawHeaders = HeadersSerializer.serialize(headers);
83+
84+
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
85+
final DataOutputStream out = new DataOutputStream(baos)) {
86+
87+
ByteUtils.writeVarint(rawHeaders.length, out);
88+
out.write(rawHeaders);
89+
out.write(rawAggregation);
90+
91+
return baos.toByteArray();
92+
} catch (final IOException e) {
93+
throw new SerializationException("Failed to serialize AggregationWithHeaders on topic: " + topic, e);
94+
}
95+
}
96+
97+
@Override
98+
public void close() {
99+
aggregationSerializer.close();
100+
}
101+
102+
@Override
103+
public void setIfUnset(final SerdeGetter getter) {
104+
initNullableSerializer(aggregationSerializer, getter);
105+
}
106+
}

0 commit comments

Comments
 (0)