Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31412 New Adaptive Query Execution in Spark SQL
  3. SPARK-23128

The basic framework for the new Adaptive Query Execution



    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 3.0.0
    • SQL
    • None


      SPARK-9850 proposed the basic idea of adaptive execution in Spark. In DAGScheduler, a new API is added to support submitting a single map stage.  The current implementation of adaptive execution in Spark SQL supports changing the reducer number at runtime. An Exchange coordinator is used to determine the number of post-shuffle partitions for a stage that needs to fetch shuffle data from one or multiple stages. The current implementation adds ExchangeCoordinator while we are adding Exchanges. However there are some limitations. First, it may cause additional shuffles that may decrease the performance. We can see this from EnsureRequirements rule when it adds ExchangeCoordinator.  Secondly, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges because we don’t have a global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 tables’ join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. Thirdly, with the current framework it is not easy to implement other features in adaptive execution flexibly like changing the execution plan and handling skewed join at runtime.

      We'd like to introduce a new way to do adaptive execution in Spark SQL and address the limitations. The idea is described at https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing


        1. AdaptiveExecutioninBaidu.pdf
          539 kB
          Yuanjian Li

        Issue Links



              carsonwang Carson Wang
              carsonwang Carson Wang
              11 Vote for this issue
              67 Start watching this issue