Description
The following Spark shell snippet creates a series of query plans that grow exponentially. The i-th plan is created using 4 cached copies of the i - 1-th plan.
(0 until 6).foldLeft(Seq(1, 2, 3).toDS) { (plan, iteration) => val start = System.currentTimeMillis() val result = plan.join(plan, "value").join(plan, "value").join(plan, "value").join(plan, "value") result.cache() System.out.println(s"Iteration $iteration takes time ${System.currentTimeMillis() - start} ms") result.as[Int] }
We can see that although all plans are cached, the query planning time still grows exponentially and quickly becomes unbearable.
Iteration 0 takes time 9 ms Iteration 1 takes time 19 ms Iteration 2 takes time 61 ms Iteration 3 takes time 219 ms Iteration 4 takes time 830 ms Iteration 5 takes time 4080 ms
Similar scenarios can be found in iterative ML code and significantly affects usability.
This issue can be fixed by introducing a checkpoint() method for Dataset that truncates both the query plan and the lineage of the underlying RDD.
Attachments
Issue Links
- is duplicated by
-
SPARK-11879 Checkpoint support for DataFrame/Dataset
- Resolved
- links to