Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5488

KStream.branch should not return a Array of streams we have to access by known index

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 2.8.0
    • streams

    Description

      KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

      long story short: it's a mess to get a KStream<>[] out from KStream<>branch(Predicate<>...). It breaks the fluent API and it produces bad code which is not that good to maintain since you have to know the right index for an unnamed branching stream.

      Example

      import org.apache.kafka.streams.kstream.KStreamBuilder;
      import org.apache.kafka.streams.kstream.KStream;
      
      public class StreamAppWithBranches {
          public static void main(String... args) {
              KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
              .<byte[], EventType>stream("eventTopic")
              .branch(
                  (k, v) -> EventType::validData
                  (k, v) -> true
              );
              
              branchedStreams[0]
              .to("topicValidData");
              
              branchedStreams[1]
              .to("topicInvalidData");
          }
      }
      

      Quick idea, s.th. like void branch(final BranchDefinition<Predicate<>, Consumer<KStream<>>>... branchPredicatesAndHandlers); where you can write branches/streams code nested where it belongs to

      so it would be possible to write code like

              new KStreamBuilder()
              .<byte[], EventType>stream("eventTopic")
              .branch(
                  Branch.create(
                      (k, v) -> EventType::validData,
                      stream -> stream.to("topicValidData")
                  ),
                  Branch.create(
                      (k, v) -> true,
                      stream -> stream.to("topicInvalidData")
                  )
              );
      

      I'll go forward to evaluate some ideas:
      https://gitlab.com/childno.de/apache_kafka/snippets/1665655

      Attachments

        Issue Links

          Activity

            People

              iponomarev Ivan Ponomarev
              childnode Marcel "childNo͡.de" Trautwein
              Votes:
              2 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: