Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5488

KStream.branch should not return a Array of streams we have to access by known index

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 2.8.0
    • streams

    Description

      KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

      long story short: it's a mess to get a KStream<>[] out from KStream<>branch(Predicate<>...). It breaks the fluent API and it produces bad code which is not that good to maintain since you have to know the right index for an unnamed branching stream.

      Example

      import org.apache.kafka.streams.kstream.KStreamBuilder;
      import org.apache.kafka.streams.kstream.KStream;
      
      public class StreamAppWithBranches {
          public static void main(String... args) {
              KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
              .<byte[], EventType>stream("eventTopic")
              .branch(
                  (k, v) -> EventType::validData
                  (k, v) -> true
              );
              
              branchedStreams[0]
              .to("topicValidData");
              
              branchedStreams[1]
              .to("topicInvalidData");
          }
      }
      

      Quick idea, s.th. like void branch(final BranchDefinition<Predicate<>, Consumer<KStream<>>>... branchPredicatesAndHandlers); where you can write branches/streams code nested where it belongs to

      so it would be possible to write code like

              new KStreamBuilder()
              .<byte[], EventType>stream("eventTopic")
              .branch(
                  Branch.create(
                      (k, v) -> EventType::validData,
                      stream -> stream.to("topicValidData")
                  ),
                  Branch.create(
                      (k, v) -> true,
                      stream -> stream.to("topicInvalidData")
                  )
              );
      

      I'll go forward to evaluate some ideas:
      https://gitlab.com/childno.de/apache_kafka/snippets/1665655

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            iponomarev Ivan Ponomarev
            childnode Marcel "childNo͡.de" Trautwein
            Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment