Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23470

Use blocking shuffles but pipeline within a slot for batch mode



    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: API / DataStream
    • Labels:


      As discussed in FLINK-23402, we would like to introduce a good default shuffle mode for batch runtime mode that is a trade-off between all pipelined and all blocking shuffles.

      From the discussion in FLINK-23402:

      For the shuffle modes, I think those three settings are actually sufficient.:

      1. pipeline all, for batch execution that wants pipelined shuffles. (Still batch recovery, no checkpoints, batch operators)
      2. batch all, just in case you want to.
      3. batch shuffles, pipeline within a slot. (DEFAULT)

      This should be the default, and it means we batch whenever a slot has a dependency on another slot.

      A dependency between slots is:

      • any all-to-all connection (keyBy, broadcast, rebalance, random)
      • any pointwise connection (rescale)
      • any forward between different slot sharing groups
        Effectively only FORWARD connections within the same slot sharing group has no dependency on another slot.

      That mode makes a lot of sense as the default, because it guarantees that we can always run the program as long as we have at least one slot. No resource starvation ever. But it retains pipelining where we don't chain operators due to missing chaining logic (but we still slot-share them).

      Compared to this (3) mode, FORWARD_EDGES_PIPELINED and POINTWISE_EDGES_PIPELINED are not well-defined.

      POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain amount of resources, related to the rescale factor. Otherwise the job may fail with resource starvation. Hard to understand and debug for users; not a great option in my opinion.

      FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation when the forward connection connects different slot sharing groups.
      That's why I would drop those (they make it confusing for users) not reuse the GlobalDataExchangeMode, and rather introduce the option (3) above, which mostly batches the exchanges, except when then they are guaranteed to be in the same slot.

      As a side note: The difference between (3) and (2) should be already relatively small in SQL jobs and become smaller over time, as more and more can be chained together.




            • Assignee:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              3 Start watching this issue


              • Created: