Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6551

Incorrect aggregate results if seqOp(...) mutates its first argument

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.3.0
    • None
    • PySpark
    • None
    • Amazon EMR, AMI version 3.5

    Description

      Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp mutating its first argument.

      • the results are incorrect if seqOp mutates its first argument
      • additionally, the zero value is modified if combOp mutates its first argument (this is slightly surprising, would be nice to document)

      I'm aggregating the RDD into a nontrivial data structure, and it would be wasteful to copy the whole data structure into a new instance in every seqOp, so mutation is an important feature.

      I'm seeing the following behavior:

      def inc_mutate(counter, item):
          counter[0] += 1
          return counter
      
      def inc_pure(counter, item):
          return [counter[0] + 1]
      
      def merge_mutate(c1, c2):
          c1[0] += c2[0]
          return c1
      
      def merge_pure(c1, c2):
          return [c1[0] + c2[0]]
      
      # correct answer, when neither function mutates their arguments
      init = [0]
      sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
      # [10]
      init
      # [0]
      
      # incorrect answer if seqOp mutates its first argument
      init = [0]
      sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
      # [20] <- WRONG
      init
      # [0]
      
      # zero value is modified if combOp mutates its first argument
      init = [0]
      sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
      # [10]
      init
      # [10]
      
      # for completeness
      init = [0]
      sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
      # [20]
      init
      # [20]
      

      I'm running on an EMR cluster launched with:

      aws emr create-cluster --name jarno-spark \
       --ami-version 3.5 \
       --instance-type c3.8xlarge \
       --instance-count 5 \
       --ec2-attributes KeyName=foo \
       --applications Name=Ganglia \
       --log-uri s3://foo/log \
       --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            jseppanen Jarno Seppanen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment