Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.4.0
-
None
Description
Spark's RDD.fold operation has some confusing behaviors when a non-commutative reduce function is used.
Here's an example, which was originally reported on StackOverflow (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 ) 8
To understand what's going on here, let's look at the definition of Spark's `fold` operation.
I'm going to show the Python version of the code, but the Scala version exhibits the exact same behavior (you can also browse the source on GitHub:
def fold(self, zeroValue, op): """ Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value." The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value to avoid object allocation; however, it should not modify C{t2}. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15 """ def func(iterator): acc = zeroValue for obj in iterator: acc = op(obj, acc) yield acc vals = self.mapPartitions(func).collect() return reduce(op, vals, zeroValue)
(For comparison, see the Scala implementation of `RDD.fold`).
Spark's `fold` operates by first folding each partition and then folding the results. The problem is that an empty partition gets folded down to the zero element, so the final driver-side fold ends up folding one value for every partition rather than one value for each non-empty partition. This means that the result of `fold` is sensitive to the number of partitions:
>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 ) 100 >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 ) 50 >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 ) 1
In this last case, what's happening is that the single partition is being folded down to the correct value, then that value is folded with the zero-value at the driver to yield 1.
I think the underlying problem here is that our fold() operation implicitly requires the operator to be commutative in addition to associative, but this isn't documented anywhere. Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's an easy way to fix this. Therefore, I think we should update the documentation and examples to clarify this requirement and explain that our fold acts more like a reduce with a default value than the type of ordering-sensitive fold() that users may expect in functional languages.
Attachments
Issue Links
- is related to
-
SPARK-7683 Confusing behavior of fold function of RDD in pyspark
- Resolved
- links to