Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.1.1
-
None
Description
The current implementation of the join operation does not use an iterator (i.e. lazy evaluation), causing it to explicitly evaluate the co-grouped values. In big data applications, these value collections can be very large. This causes the cartesian product of all co-grouped values for a specific key of both RDDs to be kept in memory during the flatMapValues operation, resulting in an O(size(pair._1)*size(pair._2)) memory consumption instead of O(1). Very large value collections will therefore cause "GC overhead limit exceeded" exceptions and fail the task, or at least slow down execution dramatically.
//... def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1; w <- pair._2) yield (v, w) ) } //...
Since cogroup returns an Iterable instance of an Array, the join implementation could be changed to the following, which uses lazy evaluation instead, and has almost no memory overhead:
//... def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) } //...
Alternatively, if the current implementation is intentionally not using lazy evaluation for some reason, there could be a lazyJoin() method next to the original join implementation that utilizes lazy evaluation. This of course applies to other join operations as well.
Thanks!
Attachments
Issue Links
- is duplicated by
-
SPARK-4824 Join should use `Iterator` rather than `Iterable`
- Closed
- links to