Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
None
-
None
-
None
-
None
Description
I am trying to use `KTable.suppress()` and I am getting the following error :
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95) at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87) at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
My code is as follows :
final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream .aggregate(lastAggregator, lastAggregator, materialized); final KTable<Windowed<Object>, GenericRecord> suppressedTable = groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); // write the change-log stream to the topic suppressedTable.toStream((k, v) -> k.key()) .mapValues(joinValueMapper::apply) .to(props.joinTopic());
The code without using `suppressedTable` works... what am i doing wrong.
Someone else has encountered the same issue :
https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
Slack conversation : https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800
Attachments
Issue Links
- is duplicated by
-
KAFKA-9259 suppress() for windowed-Serdes does not work with default serdes
- Open
- relates to
-
KAFKA-9259 suppress() for windowed-Serdes does not work with default serdes
- Open