Description
KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
long story short: it's a mess to get a KStream<>[] out from KStream<>branch(Predicate<>...). It breaks the fluent API and it produces bad code which is not that good to maintain since you have to know the right index for an unnamed branching stream.
Example
import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; public class StreamAppWithBranches { public static void main(String... args) { KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder() .<byte[], EventType>stream("eventTopic") .branch( (k, v) -> EventType::validData (k, v) -> true ); branchedStreams[0] .to("topicValidData"); branchedStreams[1] .to("topicInvalidData"); } }
Quick idea, s.th. like void branch(final BranchDefinition<Predicate<>, Consumer<KStream<>>>... branchPredicatesAndHandlers); where you can write branches/streams code nested where it belongs to
so it would be possible to write code like
new KStreamBuilder() .<byte[], EventType>stream("eventTopic") .branch( Branch.create( (k, v) -> EventType::validData, stream -> stream.to("topicValidData") ), Branch.create( (k, v) -> true, stream -> stream.to("topicInvalidData") ) );
I'll go forward to evaluate some ideas:
https://gitlab.com/childno.de/apache_kafka/snippets/1665655
Attachments
Issue Links
- fixes
-
KAFKA-8296 Kafka Streams branch method raises type warnings
- Resolved
- relates to
-
KAFKA-8296 Kafka Streams branch method raises type warnings
- Resolved
- supercedes
-
KAFKA-8651 Add a #branch overload that takes a Map of predicate names to predicates
- Resolved
- links to