SPARK-9850 proposed the basic idea of adaptive execution in Spark. In DAGScheduler, a new API is added to support submitting a single map stage. The current implementation of adaptive execution in Spark SQL supports changing the reducer number at runtime. An Exchange coordinator is used to determine the number of post-shuffle partitions for a stage that needs to fetch shuffle data from one or multiple stages. The current implementation adds ExchangeCoordinator while we are adding Exchanges. However there are some limitations. First, it may cause additional shuffles that may decrease the performance. We can see this from EnsureRequirements rule when it adds ExchangeCoordinator. Secondly, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges because we don’t have a global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 tables’ join in a single stage, the same ExchangeCoordinator should be used in three Exchanges but currently two separated ExchangeCoordinator will be added. Thirdly, with the current framework it is not easy to implement other features in adaptive execution flexibly like changing the execution plan and handling skewed join at runtime.
We'd like to introduce a new way to do adaptive execution in Spark SQL and address the limitations. The idea is described at https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing
|collect the runtime statistics of row count in map stage||Open||Unassigned|