Currently the TimeWindowedSerde does not deserialize the windowed keys from a changelog topic properly. There are a few assumptions made in the TimeWindowedDeserializer that prevents the changelog windowed keys from being correctly deserialized.
1) In the from method of WindowKeySchema (called in deserialize in TimeWindowedDeserializer), we extract the window from the binary key, but we call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for ChangeLoggingWindowBytesStore will log the windowed key as:
In toStoreKeyBinary, we store the key in
with the seqnum (used for de-duping). So the eventual result is that when we deserialize, we do not assume the windowed changelog key has a seq_num, and the window extracted will be gibberish values since the bytes extracted won't be alligned.
The fix here is to introduce a new Serde in WindowSerdes that will handle explicitly, windowed changelog input topic.
2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to Long.MAX_VALUE:
This will cause the end times to be giberrish when we extract the window since the windowSize is subtracted from the start time in:
So in the new serde, we will make windowSize a constructor param that can be supplied.
I've started a patch, and will prepare a PR for the fix for 1) and 2) above. Let me know if this sounds reasonable.