Query planning is one of the main factors in high performance, but the current Spark engine requires the execution DAG for a job to be set in advance. Even with cost-based optimization, it is hard to know the behavior of data and user-defined functions well enough to always get great execution plans. This JIRA proposes to add adaptive query execution, so that the engine can change the plan for each query as it sees what data earlier stages produced.
We propose adding this to Spark SQL / DataFrames first, using a new API in the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, the functionality could be extended to other libraries or the RDD API, but that is more difficult than adding it in SQL.
I've attached a design doc by Yin Huai and myself explaining how it would work in more detail.
|Support submitting map stages individually in DAGScheduler||Resolved|
|Let reduce tasks fetch multiple map output partitions||Resolved|
|3.||Optimize shuffle fetch of contiguous partition IDs||In Progress|
|Introduce an ExchangeCoordinator to estimate the number of post-shuffle partitions.||Resolved|
|Aggregation: Determine the number of reducers at runtime||Resolved|
|6.||Join: Determine the join strategy (broadcast join or shuffle join) at runtime||In Progress||Unassigned|
|Join: Determine the number of reducers used by a shuffle join operator at runtime||Resolved|
|8.||Join: Handling data skew||In Progress||Unassigned|
|9.||Improve the way that we plan shuffled join||Open||Unassigned|
|10.||Allow shuffle readers to request data from just one mapper||Open||Unassigned|