Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
In the doc, for aggregating function, the example is incorrect
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#aggregating
It says
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, /* initializer */ (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */ Materialized.as("aggregated-stream-store") /* state store name */ .withValueSerde(Serdes.Long()); /* serde for aggregate value */
Generic types are missing. Instead, it should be
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, /* initializer */ (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */ Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store") /* state store name */ .withValueSerde(Serdes.Long()); /* serde for aggregate value */
Otherwise, code won't work. I myself verified it.
Reference
Attachments
Issue Links
- links to