KAFKA-20158: Add AggregationWithHeaders, serialization support and tests (1/N)#21511
Conversation
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @bbejeck. I left some minor comments.
|
|
||
| return baos.toByteArray(); | ||
| } catch (final IOException e) { | ||
| throw new SerializationException("Failed to serialize AggregationWithHeaders", e); |
There was a problem hiding this comment.
Should we add topic to the exception message for better debugging?
There was a problem hiding this comment.
if so, we should also add it to ValueTimestampHeadersSerializer
| * @return the byte array containing the read bytes | ||
| * @throws SerializationException if buffer doesn't have enough bytes or length is negative | ||
| */ | ||
| private static byte[] readBytes(final ByteBuffer buffer, final int length) { |
There was a problem hiding this comment.
This may not be directly related to this PR, but we could refactor the code so that this method lives in a shared place and can be reused by other classes as well.
There was a problem hiding this comment.
Great idea! But we currently don't have a Utils class (at least that I know of) can we defer this to a follow-up PR and maybe consider other code as well?
There was a problem hiding this comment.
There was a problem hiding this comment.
Seems in HeaderDeserializer we don't have such a helper at all, and just use
final byte[] bytes = new byte[length];
buffer.get(bytes);
Might be good to also start using it there for better error messages.
There was a problem hiding this comment.
What would be a good location for a Utils class for this - maybe just a standalone one at the root of org.apache.kafka.streams.state.internals ?
| * @throws SerializationException if buffer doesn't have enough bytes or length is negative | ||
| */ | ||
| private static byte[] readBytes(final ByteBuffer buffer, final int length) { | ||
| if (length < 0) { |
There was a problem hiding this comment.
thanks. I like this check. We dont have it in other places.
| * Extract aggregation from serialized AggregationWithHeaders. | ||
| */ | ||
| static <T> T aggregation(final byte[] rawAggregationWithHeaders, final Deserializer<T> deserializer) { | ||
| if (rawAggregationWithHeaders == null) { |
There was a problem hiding this comment.
Not sure why we need this method, but we can go ahead and remove it if it's not needed.
| final byte[] rawAggregation = readBytes(buffer, buffer.remaining()); | ||
| final AGG aggregation = aggregationDeserializer.deserialize(topic, headers, rawAggregation); | ||
|
|
||
| return AggregationWithHeaders.make(aggregation, headers); |
There was a problem hiding this comment.
here if aggregation is null, then make returns null. I think this should not be the desired behaviour. Same for ValueTimestampDeserializer.deserialize. WDYT @frankvicky?
There was a problem hiding this comment.
hmmm...
Make sense, even if we have a null value after deserialization, the headers might still be meaningful.
We should replace with makeAllowNullable
frankvicky
left a comment
There was a problem hiding this comment.
Overall LGTM.
Could you please rebase on the trunk?
There are some changes of deserializer/serializer of headers.
23ff16b to
3110503
Compare
|
@aliehsaeedii @frankvicky all comments addressed ready for another review |
| * @return the byte array containing the read bytes | ||
| * @throws SerializationException if buffer doesn't have enough bytes or length is negative | ||
| */ | ||
| private static byte[] readBytes(final ByteBuffer buffer, final int length) { |
There was a problem hiding this comment.
| return null; | ||
| } | ||
|
|
||
| final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders); |
There was a problem hiding this comment.
Seems this duplicates some code from above (inside deserialize). Should we unify it?
|
|
||
| private AggregationWithHeaders(final AGG aggregation, final Headers headers) { | ||
| this.aggregation = aggregation; | ||
| this.headers = headers == null ? new RecordHeaders() : headers; |
There was a problem hiding this comment.
Should we allow null at all? For the aggregation it's different, as it might be a tombstone that we need to handle -- but for headers I am wondering if we need null or if we should just disallow it entirly?
There was a problem hiding this comment.
We can dissallow null headers here, which makes since how instances get created, the headers really won't ever be null. But this is the same as ValueTimestampHeaders should probably follow the same pattern. \cc @frankvicky @aliehsaeedii
3110503 to
6573fa6
Compare
|
Merged #21511 into trunk |
…sts (1/N) (apache#21511) This PR introduces `AggregationWithHeaders` and serialization support introduced in KIP-1271 for storing session aggregations with headers.
This PR introduces
AggregationWithHeadersand serialization supportintroduced in KIP-1271 for storing session aggregations with headers.