Details
Description
Here's a weird behavior where RDD.zip or DataFrame.withColumn after a repartition produces "misaligned" data, meaning different column values in the same row aren't matched, as if a zip shuffled the collections before zipping them. It's difficult to reproduce because it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 (bin-without-hadoop).
Here's the most similar issue I was able to find. It appears to not have been repro'd and then closed optimistically, and it smells like it could have been the same underlying cause that was never fixed:
Also, this DataFrame.zip issue is related in spirit, since we were trying to build it ourselves when we ran into this problem. Let me put in my vote for reopening the issue and supporting DataFrame.zip in the standard lib.
Brief repro
Fail: withColumn(udf) after DataFrame.repartition
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) [r for r in df.collect() if r.a != r.b][:3] # Should be []
Sample outputs (nondeterministic):
[Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)] [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)] [] [Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)] [Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)] [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
Fail: RDD.zip after DataFrame.repartition
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b)) [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
Sample outputs (nondeterministic):
[] [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)] [] [] [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)] []
Test setup:
- local[8]: MASTER=local[8]
- dist[N]: 1 driver + 1 master + N workers
"Fail" tests pass? cluster mode spark version
----------------------------------------------------
yes local[8] 1.3.0-cdh5.4.5
no dist[4] 1.3.0-cdh5.4.5
yes local[8] 1.4.1
yes dist[1] 1.4.1
no dist[2] 1.4.1
no dist[4] 1.4.1
yes local[8] 1.5.0
yes dist[1] 1.5.0
no dist[2] 1.5.0
no dist[4] 1.5.0
Detailed repro
Start `pyspark` and run these imports:
from pyspark.sql import Row from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, StructType, StructField
Fail: withColumn(udf) after DataFrame.repartition
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting partition
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)) df = df.repartition(100) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting partitions
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)) df = df.repartition(100) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting partitions
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)) df = df.repartition(1) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)) df = df.coalesce(10) df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Ok: withColumn without udf
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) df = df.withColumn('b', df.a) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Ok: createDataFrame(RDD.map) instead of withColumn(udf)
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) rdd = df.map(lambda r: Row(a=r.a, b=r.a)) df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())])) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Fail: createDataFrame(RDD.zip) instead of withColumn(udf)
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b)) df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())])) len([r for r in df.collect() if r.a != r.b]) # Should be 0
Fail: RDD.zip after DataFrame.repartition
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) df = df.repartition(100) rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b)) len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
Fail: RDD.zip after RDD.repartition after 100 starting partitions
- Failure requires ≥3 workers (whether dist or pseudo-dist)
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100) rdd = rdd.repartition(100) rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b)) len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
Ok: RDD.zip after RDD.repartition after 1 starting partition
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1) rdd = rdd.repartition(100) rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b)) len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
Test setup:
- local[8]: MASTER=local[8]
- pseudo-dist[N]: 1 driver + 1 master + N workers; master and workers all on same OS
- dist[N]: 1 driver + 1 master + N workers; master and workers all on separate OS's
- Spark 1.3.0-cdh5.4.5 with dist[4] didn't trip any of the withColumn failures, but did trip the zip failures
- - indicates a configuration I didn't try
"Ok" tests pass? "Fail" tests pass? platform cluster mode spark version ---------------------------------------------------------------- yes yes ubuntu local[8] 1.3.0-cdh5.4.5 - - ubuntu pseudo-dist[1] 1.3.0-cdh5.4.5 - - ubuntu pseudo-dist[2] 1.3.0-cdh5.4.5 yes no[zip], yes[withColumn] ubuntu dist[4] 1.3.0-cdh5.4.5 yes yes osx local[8] 1.4.1 yes yes ubuntu local[8] 1.4.1 yes yes osx pseudo-dist[1] 1.4.1 - - ubuntu pseudo-dist[1] 1.4.1 yes no osx pseudo-dist[2] 1.4.1 - - ubuntu pseudo-dist[2] 1.4.1 - - osx dist[4] 1.4.1 yes no ubuntu dist[4] 1.4.1 yes yes osx local[8] 1.5.0 yes yes ubuntu local[8] 1.5.0 yes yes osx pseudo-dist[1] 1.5.0 yes yes ubuntu pseudo-dist[1] 1.5.0 yes no osx pseudo-dist[2] 1.5.0 yes no ubuntu pseudo-dist[2] 1.5.0 - - osx dist[4] 1.5.0 yes no ubuntu dist[4] 1.5.0
Attachments
Issue Links
- duplicates
-
SPARK-8632 Poor Python UDF performance because of RDD caching
- Resolved
- is duplicated by
-
SPARK-10494 Multiple Python UDFs together with aggregation or sort merge join may cause OOM (failed to acquire memory)
- Resolved
- links to