Description
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:
val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition.
I propose this formula, as it is much more accurate and leads to better distribution:
val stride = (upperBound / numPartitions.toFloat - lowerBound / numPartitions.toFloat).toLong
An example (using a date column):
Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows:
(18563L / 1000L) - (-15611 / 1000L) = 33
Starting from the lower bound, doing strides of 33, you'll end up at 2017-07-08. This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job.
Using the formula I'm proposing, you'd get:
((18563L / 1000F) - (-15611 / 1000F)).toLong = 34
This would put the upper bound at 2020-04-02, which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22).
In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater.
Attachments
Attachments
Issue Links
- relates to
-
SPARK-34844 JDBCRelation columnPartition function includes the first stride in the lower partition
- Resolved
- links to