Details
Description
The pyspark.rdd.sortByKey always fills only two partitions when ascending=False – the first one and the last one.
Simple example sorting numbers 0..999 into 10 partitions in descending order:
sc.parallelize(range(1000)).keyBy(lambda x:x).sortByKey(ascending=False, numPartitions=10).glom().map(len).collect()
returns the following partition sizes:
[469, 0, 0, 0, 0, 0, 0, 0, 0, 531]
When ascending=True, all works as expected:
sc.parallelize(range(1000)).keyBy(lambda x:x).sortByKey(ascending=True, numPartitions=10).glom().map(len).collect() Out: [116, 96, 100, 87, 132, 101, 101, 95, 87, 85]
The problem is caused by the following line 565 in rdd.py:
samples = sorted(samples, reverse=(not ascending), key=keyfunc)
That sorts the samples descending if ascending=False. Nevertheless samples should always be in ascending order, because it is (after subsampling to variable bounds) used in a bisect_left call:
def rangePartitioner(k): p = bisect.bisect_left(bounds, keyfunc(k)) if ascending: return p else: return numPartitions - 1 - p
As you can see, rangePartitioner already handles the ascending=False by itself, so the fix for the whole problem is really trivial: just change line rdd.py:565 to
samples = sorted(samples, reverse=(not ascending), key=keyfunc)
Attachments
Issue Links
- links to