Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10685

Misaligned data with RDD.zip and DataFrame.withColumn after repartition

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.4.1, 1.5.0
    • Fix Version/s: 1.5.1, 1.6.0
    • Component/s: PySpark, SQL
    • Labels:
      None
    • Environment:
      • OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
      • Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5

      Description

      Here's a weird behavior where RDD.zip or DataFrame.withColumn after a repartition produces "misaligned" data, meaning different column values in the same row aren't matched, as if a zip shuffled the collections before zipping them. It's difficult to reproduce because it's nondeterministic, doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 (bin-without-hadoop).

      Here's the most similar issue I was able to find. It appears to not have been repro'd and then closed optimistically, and it smells like it could have been the same underlying cause that was never fixed:

      Also, this DataFrame.zip issue is related in spirit, since we were trying to build it ourselves when we ran into this problem. Let me put in my vote for reopening the issue and supporting DataFrame.zip in the standard lib.

      Brief repro

      Fail: withColumn(udf) after DataFrame.repartition

      df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df = df.repartition(100)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      [r for r in df.collect() if r.a != r.b][:3] # Should be []
      

      Sample outputs (nondeterministic):

      [Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)]
      [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
      []
      [Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)]
      [Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)]
      [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
      

      Fail: RDD.zip after DataFrame.repartition

      df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df  = df.repartition(100)
      rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
      [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
      

      Sample outputs (nondeterministic):

      []
      [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
      []
      []
      [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
      []
      

      Test setup:

      • local[8]: MASTER=local[8]
      • dist[N]: 1 driver + 1 master + N workers
      "Fail" tests pass?  cluster mode  spark version
      ----------------------------------------------------
      yes                 local[8]      1.3.0-cdh5.4.5
      no                  dist[4]       1.3.0-cdh5.4.5
      yes                 local[8]      1.4.1
      yes                 dist[1]       1.4.1
      no                  dist[2]       1.4.1
      no                  dist[4]       1.4.1
      yes                 local[8]      1.5.0
      yes                 dist[1]       1.5.0
      no                  dist[2]       1.5.0
      no                  dist[4]       1.5.0
      

      Detailed repro

      Start `pyspark` and run these imports:

      from pyspark.sql import Row
      from pyspark.sql.functions import udf
      from pyspark.sql.types import IntegerType, StructType, StructField
      

      Fail: withColumn(udf) after DataFrame.repartition

      df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df = df.repartition(100)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting partition

      df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1))
      df = df.repartition(100)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting partitions

      df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
      df = df.repartition(100)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting partitions

      df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
      df = df.repartition(1)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions

      df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100))
      df = df.coalesce(10)
      df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Ok: withColumn without udf

      df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df = df.repartition(100)
      df = df.withColumn('b', df.a)
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Ok: createDataFrame(RDD.map) instead of withColumn(udf)

      df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df  = df.repartition(100)
      rdd = df.map(lambda r: Row(a=r.a, b=r.a))
      df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())]))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Fail: createDataFrame(RDD.zip) instead of withColumn(udf)

      df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df  = df.repartition(100)
      rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
      df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + [StructField('b', IntegerType())]))
      len([r for r in df.collect() if r.a != r.b]) # Should be 0
      

      Fail: RDD.zip after DataFrame.repartition

      df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
      df  = df.repartition(100)
      rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, b=y.b))
      len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
      

      Fail: RDD.zip after RDD.repartition after 100 starting partitions

      • Failure requires ≥3 workers (whether dist or pseudo-dist)
      rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)
      rdd = rdd.repartition(100)
      rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
      len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
      

      Ok: RDD.zip after RDD.repartition after 1 starting partition

      rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)
      rdd = rdd.repartition(100)
      rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
      len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
      

      Test setup:

      • local[8]: MASTER=local[8]
      • pseudo-dist[N]: 1 driver + 1 master + N workers; master and workers all on same OS
      • dist[N]: 1 driver + 1 master + N workers; master and workers all on separate OS's
      • Spark 1.3.0-cdh5.4.5 with dist[4] didn't trip any of the withColumn failures, but did trip the zip failures
      • - indicates a configuration I didn't try
      "Ok" tests pass?  "Fail" tests pass?        platform  cluster mode    spark version
      ----------------------------------------------------------------
      yes               yes                       ubuntu    local[8]        1.3.0-cdh5.4.5
      -                 -                         ubuntu    pseudo-dist[1]  1.3.0-cdh5.4.5
      -                 -                         ubuntu    pseudo-dist[2]  1.3.0-cdh5.4.5
      yes               no[zip], yes[withColumn]  ubuntu    dist[4]         1.3.0-cdh5.4.5
      yes               yes                       osx       local[8]        1.4.1
      yes               yes                       ubuntu    local[8]        1.4.1
      yes               yes                       osx       pseudo-dist[1]  1.4.1
      -                 -                         ubuntu    pseudo-dist[1]  1.4.1
      yes               no                        osx       pseudo-dist[2]  1.4.1
      -                 -                         ubuntu    pseudo-dist[2]  1.4.1
      -                 -                         osx       dist[4]         1.4.1
      yes               no                        ubuntu    dist[4]         1.4.1
      yes               yes                       osx       local[8]        1.5.0
      yes               yes                       ubuntu    local[8]        1.5.0
      yes               yes                       osx       pseudo-dist[1]  1.5.0
      yes               yes                       ubuntu    pseudo-dist[1]  1.5.0
      yes               no                        osx       pseudo-dist[2]  1.5.0
      yes               no                        ubuntu    pseudo-dist[2]  1.5.0
      -                 -                         osx       dist[4]         1.5.0
      yes               no                        ubuntu    dist[4]         1.5.0
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rxin Reynold Xin
                Reporter:
                jdanbrown Dan Brown
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: