Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6034

Streams DSL to Processor Topology Translation Improvements

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: streams
    • Labels:


      Today our DSL to topology translation mechanism is operator-by-operator, and hence is sub-optimal in the resulted topology. Some known issues:

      1. The repartition topics for aggregations and joins may be duplicated, i.e. containing exactly the same data as other topics.
      2. state store's changelog topics are duplicates of other repartition / to / through topics.

      We'd better have improve our DSL translation with a global optimization goal for the number of internal topics as well as state stores, i.e. "logical plan optimization".


      Addendum: Possible Optimization Techniques from DSMS

      that we can leverage from the DSMS literature (http://hirzels.com/martin/papers/debs13-tutorial.pdf, http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf)

      1. Operator Separation / Fission: break a operator into consecutive sub-operators, or replicate an operator in parallel in the topology. Both of them is for better leveraging parallel processing power.

      Example: stream.map(“mapper”) => stream.map(“mapper1”).map(“mapper2”)
      where “mapper” = “mapper1” * “mapper2”

      • In Streams, consider the scenario where some sub-topology may have different parallelism requirements, such that mapper1 may need N worker while mapper2 may need M workers where M >> N.

      2. Operator Fusion / Scheduling: we can either decide to pass an incoming tuple through the whole topology before processing the next tuple (i.e. Depth-First, or Item Model, or FIFO scheduling), or block until we have completed processing all the queued tuples at one operator and collects all its intermediate outputs before moving on to the next operator (i.e. Breath-First, or Block Model, or Greedy scheduling, note that this would require some buffering).

      Example: if we have a topology with three consecutive operators O1 -> O2 -> O3, where each operator will take one tuple and generate 2 outputs, and if we only have one tuple queued for O1 from the source, with Depth-First we will execute the following ordering: O1, O2, O3, O3, O2, O3, O3; while with Breadth-First we will execute O1, O2, O2, O3, O3, O3, O3; and we can do something in between of these two mechanisms (i.e. dynamical scheduling), and if there are new coming tuples for O1 during the processing, we may decide to come back to O1 (i.e. give it a higher priority).

      The main motivation is to reduce memory footprint in the queues for operators, and also if one has multi-threads to balance loads. A related optimization technique is called "load shedding" with the scheduling, i.e. when the processing capacity cannot meet incoming rates, decide whether and how much to drop records at the head of the topology while scheduling current queued records.

      • In Streams since we use Kafka as a persistent "buffer" and control the queued size with consumer polling frequency, we always use Depth-First to reduce intermediate result memory footprint, and this optimization category does not apply. However, consider the case when one operation of the sub-topology is IO heavy or needs to access remote data, then instead of applying some async processing policy we could break the operation into its own sub-topology with threads that can suspend / resume with those remote calls.

      3. Non-Stateful Operator Reordering (or "Selection Pushing Forward"): we want to push the operators with low selectivity and low cost to the front of the topology. For example, if we have a topology of O1 -> O2, where O1's selectivity is 1.2 and cost (i.e. time to process one tuple) is 2, and O2's selectivity is 0.1 and cost is 1, we would like to re-order it to O2 -> O1.

      This is one of the most common low-hanging fruit in stream optimizations.
      Example: if you have a Source -> Map(“mapper”) -> Filter(“predicate”) topology, and would almost always better to reorder to Source -> Filter(“predicate1”) -> Map(“mapper1”) if you can translate the original mapper and predicate to the new predicate and mapper.

      • In Streams since runtime is schema agnostic it is tricky to find the matching predicate1 / mapper1.

      4. Redundancy Elimination: For example, if we have a topology of Branch > [O1 -> O2, O1 -> O3], we may want to re-write it to O1> Branch -> [O2, O3].

      5. Multi-Join Operator Reordering: with multi-joins like "stream1.join(stream2).join(stream3)", we can try to re-order the join ordering based on join selectivity and cost, and also whether or not materialize intermediate results, for example, whether store the results of "stream1.join(stream2)" in a separate state store or always process the same join operation; a similar optimization is call "join operator sharing", for example, if we have one sub-topology with "stream1.join(stream2).join(stream3)" and another sub-topology with "stream1.join(stream2)", in this case, we may want to not materialize the results of "stream1.join(stream2)" in both topology.

      • This may be related to KIP-150.


        Issue Links



            • Assignee:
              guozhang Guozhang Wang
              Guozhang Wang


              • Created:

                Issue deployment