Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
In Streams, windowing a stream by either time or session windows causes the stream's keys to be transformed from `K` to `Windowed<K>`.
Since this is a well defined transition, it's not necessary for developers to explicitly provide a `Serde<Windowed<K>>`. For convenience, Streams, which already knows the key serde (`Serde<K>`) automatically wraps it in case it's needed by downstream operators.
However, this automatic wrapping only takes place if the key serde has been explicitly provided in the topology. If the topology relies on the `default.key.serde` configuration, no wrapping takes place, and downstream operators will encounter a ClassCastException trying to cast a `Windowed` (the windowed key) to whatever type the default serde handles (which is the key wrapped inside the windowed key).
Specifically, they key serde forwarding logic is:
in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:
`materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null`
and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:
`materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`
This pattern of not "solidifying" the default key serde is common in Streams. Not all operators need a serde, and the default serde may not be applicable to all operators. So, it would be a mistake to arbitrary operators to grab the default serde and pass it downstream as if it had been explicitly set.
However, in this case specifically, all windowed aggregations are stateful, so if we don't have an explicit key serde at this point, we know that we have used the default serde in the window store. If the default serde were incorrect, an exception would be thrown by the windowed aggregation itself. So it actually is safe to wrap the default serde in a windowed serde and pass it downstream, which would result in a better development experience.
Unfortunately, the default serde is set via config, but the windowed serde wrapping happens during DSL building, when the config is not generally available. Therefore, we would need a special windowed serde wrapper that signals that it wraps the default serde, which would be fully resolved during operators' init call.
For example, something of this nature:
`materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : FullTimeWindowedSerde.wrapDefault(windows.size())`
etc.
Complicating the situation slightly, all the windowed serializers and deserializers will resolve a runtime inner class using `default.windowed.key.serde.inner` if given a null inner serde to wrap. However, at this point in the topology build, we do know that the windowed aggregation has specifically used the `default.key.serde`, not the `default.windowed.key.serde.inner` to persist its state to the window store, therefore, it should be correct to wrap the default key serde specifically and not use the `default.windowed.key.serde.inner`.
In addition to fixing this for TimeWindowed and SessionWindowed streams, we need to have good test coverage of the new code. There is clearly a blind spot in the tests, or we would have noticed this sooner.