Description
A topology with a groupBy after a suppress operation results in a ClassCastException
The following sample topology
Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost"); StreamsBuilder builder = new StreamsBuilder(); builder.<String, String>stream("topic") .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), BufferConfig.unbounded())) .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); builder.build(properties);
results in this exception:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 cannot be cast to org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
Attachments
Issue Links
- Blocked
-
KAFKA-8289 KTable<Windowed<String>, Long> can't be suppressed
- Resolved
- links to