Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32461 Shuffled hash join improvement
  3. SPARK-32634

Introduce sort-based fallback mechanism for shuffled hash join



    • Sub-task
    • Status: In Progress
    • Minor
    • Resolution: Unresolved
    • 3.1.0
    • None
    • SQL
    • None


      A major pain point for spark users to stay away from using shuffled hash join is out of memory issue. Shuffled hash join tends to have OOM issue because it allocates in-memory hashed relation (`UnsafeHashedRelation` or `LongHashedRelation`) for build side, and there's no recovery (e.g. fallback/spill) once the size of hashed relation grows and cannot fit in memory. On the other hand, shuffled hash join is more CPU and IO efficient than sort merge join when joining one large table and a small table (but small table is too large to be broadcasted), as SHJ does not sort the large table, but SMJ needs to do that.

      To improve the reliability of shuffled hash join, a fallback mechanism can be introduced to avoid shuffled hash join OOM issue completely. Similarly we already have a fallback to sort-based aggregation for hash aggregate. The idea is:

      (1).Build hashed relation as current, but monitor the hashed relation size when inserting each build side row. If size of hashed relation being always smaller than a configurable threshold, go to (2.1), else go to (2.2).

      (2.1).Current shuffled hash join logic: reading stream side rows and probing hashed relation.

      (2.2).Fall back to sort merge join: Sort stream side rows, and sort build side rows (iterate rows already in hashed relation (e.g. through `BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build side rows). Then doing sort merge join for stream + build side rows.



      (1).the fallback is dynamic and happened per task, which means task 0 can incur the fallback e.g. if it has a big build side, but task 1,2 don't need to incur the fallback depending on the size of hashed relation.

      (2).there's no major code change for SHJ and SMJ. Major change is around HashedRelation to introduce some new methods, e.g. `HashedRelation.destructiveValues()` to return an Iterator of build side rows in hashed relation and cleaning up hashed relation along the way.

      (3).we have run this feature by default in our internal fork more than 2 years, and we benefit a lot from it with users can choose to use SHJ, and we don't need to worry about SHJ reliability (see https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal from our side, I tweak here to make it less intrusive and more acceptable, e.g. not introducing a separate join operator, but doing the fallback automatically inside SHJ operator itself).




            Unassigned Unassigned
            chengsu Cheng Su
            0 Vote for this issue
            12 Start watching this issue