Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
None
-
None
Description
Currently CartesianProduct relies on RDD.cartesian, in which the computation is realized as follows
override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] =
{ val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) }From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. Which is really heavy and may never finished if n is large, especially when rdd2 is coming from ShuffleRDD.
We should have some optimization on CartesianProduct by caching rightResults. The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. I think we should have some cleanup hook after query execution.
With the hook available, we can easily optimize such Cartesian join. I believe such cleanup hook may also benefit other query optimizations.
Attachments
Issue Links
- duplicates
-
SPARK-11982 Improve performance of CartesianProduct
- Resolved