Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
1.3.0
-
None
-
None
-
Amazon EMR, AMI version 3.5
Description
Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp mutating its first argument.
- the results are incorrect if seqOp mutates its first argument
- additionally, the zero value is modified if combOp mutates its first argument (this is slightly surprising, would be nice to document)
I'm aggregating the RDD into a nontrivial data structure, and it would be wasteful to copy the whole data structure into a new instance in every seqOp, so mutation is an important feature.
I'm seeing the following behavior:
def inc_mutate(counter, item): counter[0] += 1 return counter def inc_pure(counter, item): return [counter[0] + 1] def merge_mutate(c1, c2): c1[0] += c2[0] return c1 def merge_pure(c1, c2): return [c1[0] + c2[0]] # correct answer, when neither function mutates their arguments init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure) # [10] init # [0] # incorrect answer if seqOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure) # [20] <- WRONG init # [0] # zero value is modified if combOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate) # [10] init # [10] # for completeness init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate) # [20] init # [20]
I'm running on an EMR cluster launched with:
aws emr create-cluster --name jarno-spark \ --ami-version 3.5 \ --instance-type c3.8xlarge \ --instance-count 5 \ --ec2-attributes KeyName=foo \ --applications Name=Ganglia \ --log-uri s3://foo/log \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-9021 Change RDD.aggregate() to do reduce(mapPartitions()) instead of mapPartitions.fold()
- Resolved
- links to