The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this:
... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND <function calculation...>
However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an ExternalAppendOnlyUnsafeRowArray). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible.
We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join".
This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used).
For implementing the described change we propose changes to these classes:
- ExtractEquiJoinKeys – a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc.
- InMemoryUnsafeRowQueue – the moving window implementation to be used instead of the ExternalAppendOnlyUnsafeRowArray class. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. To make the change as less intrusive as possible, we propose to implement InMemoryUnsafeRowQueue as a subclass of ExternalAppendOnlyUnsafeRowArray
- JoinSelection – a strategy that uses ExtractEquiJoinKeys and needs to be aware of the extracted range conditions
- SortMergeJoinExec – the main implementation of the optimization. Needs to support two code paths:
- when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner)
- when whole-stage code generation is turned on (methods doProduce and genScanner)
- SortMergeJoinInnerRangeScanner – implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned off
- InnerJoinSuite – functional tests
- JoinBenchmark – performance tests
The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added.
To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely.
Potential use-cases for this are joins based on spatial or temporal distance calculations.