Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Cannot Reproduce
-
None
-
None
Description
With defined serdes for intermediate topic in aggregation catch the ClassCastException: from custom class to the ByteArray.
In debug I saw that defined serde isn't used for creation sinkNode (incide `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
Instead defined serde inside aggregation call is used default Impl with empty plugs instead of implementations
Unable to find source-code formatter for language: koltin. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml
userTable.join( skicardsTable.groupBy { key, value -> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) } .aggregate( { mutableSetOf<SkicardInfo>() }, { ownerId, skicardInfo, accumulator -> accumulator.put(skicardInfo) }, { ownerId, skicardInfo, accumulator -> accumulator }, skicardByOwnerIdSerde, skicardByOwnerIdTopicName ), { userCreatedOrUpdated, skicardInfoSet -> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } ).to( userWithSkicardsTable )
I think current behavior of `doAggregate` with serdes and/or stores setting up should be changed because that is incorrect in release 0.10.0.1-cp1 to.