Affects Version/s: 1.4.0
Fix Version/s: None
Component/s: Spark Core
In my Spark Summit 2015 presentation I touted sorted joins. It would be a shame to talk about how great they are and then not try to introduce them into Spark.
When joining co-partitioned RDDs, the current Spark implementation builds a map of the contents of one partition and looks up the items from the other partition. (https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala using AppendOnlyMap.)
Another option for lining up the keys from the two partitions is to sort them both and then merge. Just doing this may already be a performance improvement.
But what we do is we sort the partitions up-front, and then enjoy the benefits over many operations. Our joins are 10x faster than normal Spark joins and don't trigger GC. The hash-based join builds a large hashmap (the size of the partition) while the sorted join does not allocate any memory. The sorted partitions also benefit other operations, such as distinct, where we also avoid building a hashmap. (I think the logic is similar to sort-based shuffle, just at a later stage of the process.)
Our implementation is based on zipPartitions, and this is entirely workable. We have a custom RDD subclass (SortedRDD) and it overrides a bunch of methods. We have an implicit class that adds a toSortedRDD method on pair-RDDs.
But I think integrating this into Spark could take it a step further. What we have not investigated is cases where the sorting could be skipped. For example when an RDD came out of a sort-based shuffle, its partitions will be sorted, right? So even if the user never asks for the partitions to be sorted, they can become so, and the faster sorted implementations of join, distinct, etc could kick in automatically. This would speed up applications without any change in their code.
Instead of a subclass it would probably be best to do this with a simple "hasSortedPartitions" variable in the RDD. Then perhaps operations could have a "preservesPartitionOrder" parameter, like it is done with "partitioner" and "preservesPartitioning" now. (For example filter(), mapValues(), join(), and distinct() all keep the partition sorted.)
What do you think about all this?