Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6416

Document that RDD.fold() requires the operator to be commutative

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.4.0
    • 1.5.0
    • Documentation, Spark Core
    • None

    Description

      Spark's RDD.fold operation has some confusing behaviors when a non-commutative reduce function is used.

      Here's an example, which was originally reported on StackOverflow (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):

      sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
      8
      

      To understand what's going on here, let's look at the definition of Spark's `fold` operation.

      I'm going to show the Python version of the code, but the Scala version exhibits the exact same behavior (you can also browse the source on GitHub:

          def fold(self, zeroValue, op):
              """
              Aggregate the elements of each partition, and then the results for all
              the partitions, using a given associative function and a neutral "zero
              value."
              The function C{op(t1, t2)} is allowed to modify C{t1} and return it
              as its result value to avoid object allocation; however, it should not
              modify C{t2}.
              >>> from operator import add
              >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
              15
              """
              def func(iterator):
                  acc = zeroValue
                  for obj in iterator:
                      acc = op(obj, acc)
                  yield acc
              vals = self.mapPartitions(func).collect()
              return reduce(op, vals, zeroValue)
      

      (For comparison, see the Scala implementation of `RDD.fold`).

      Spark's `fold` operates by first folding each partition and then folding the results. The problem is that an empty partition gets folded down to the zero element, so the final driver-side fold ends up folding one value for every partition rather than one value for each non-empty partition. This means that the result of `fold` is sensitive to the number of partitions:

          >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
          100
          >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
          50
          >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
          1
      

      In this last case, what's happening is that the single partition is being folded down to the correct value, then that value is folded with the zero-value at the driver to yield 1.

      I think the underlying problem here is that our fold() operation implicitly requires the operator to be commutative in addition to associative, but this isn't documented anywhere. Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's an easy way to fix this. Therefore, I think we should update the documentation and examples to clarify this requirement and explain that our fold acts more like a reduce with a default value than the type of ordering-sensitive fold() that users may expect in functional languages.

      Attachments

        Issue Links

          Activity

            People

              srowen Sean R. Owen
              joshrosen Josh Rosen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: