Description
I have a stream which I then group and aggregate (this results in a KTable). When aggregating, I explicitly tell to materialize the result table using a usual (not timestamped) store.
After that, the KTable is filtered and streamed. This stream is processed by a processor that accesses the store.
The problem/bug is that even if I tell to use a non-timestamped store, a timestamped one is used, which leads to a ClassCastException in the processor (it iterates over the store and expects the items to be of type "KeyValue" but they are of type "ValueAndTimestamp").
Here is the code (schematically).
First, I define the topology:
KTable table = ...aggregate( initializer, // initializer for the KTable row aggregator, // aggregator Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- Non-Timestamped! .withKeySerde(...).withValueSerde(...)); table.toStream().process(theProcessor);
In the class for the processor:
public void init(ProcessorContext context) { var store = context.getStateStore("MyStore"); // Returns a TimestampedKeyValueStore! }
A timestamped store is returned even if I explicitly told to use a non-timestamped one!
I tried to find the cause for this behaviour and think that I've found it. It lies in this line: https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241
There, TimestampedKeyValueStoreMaterializer is used regardless of whether materialization supplier is a timestamped one or not.
I think this is a bug.