Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17962

DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.0.0
    • None
    • Spark Core, YARN
    • None
    • Centos 7.2, Hadoop 2.7.2, Spark 2.0.0

    Description

      Environment can be reproduced via this git repo using the Deploy to Azure button: https://github.com/shankinson/spark (The cluster name must be the same as the resource group name used for this to launch properly, login with username hadoop, and launch the shell with /home/hadoop/spark-2.0.0-bin-hadoop2.7/bin/spark-shell --master yarn --deploy-mode client --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=in-memory --driver-memory 10g --driver-cores 4)

      We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We had written some new code using the Spark DataFrame/DataSet APIs but are noticing incorrect results on a join after writing and then reading data to Windows Azure Storage Blobs (The default HDFS location). I've been able to duplicate the issue with the following snippet of code running on the cluster.

      case class UserDimensions(user: Long, dimension: Long, score: Double)
      case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

      val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
      val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS

      dims.show
      cent.show
      dims.join(cent, dims("dimension") === cent("dimension") ).show
      outputs

      user dimension score
      12345 0 1.0
      dimension cluster score
      0 1 1.0
      1 0 1.0
      2 2 1.0
      user dimension score dimension cluster score
      12345 0 1.0 0 1 1.0

      which is correct. However after writing and reading the data, we see this

      dims.write.mode("overwrite").save("/tmp/dims2.parquet")
      cent.write.mode("overwrite").save("/tmp/cent2.parquet")

      val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
      val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

      dims2.show
      cent2.show

      dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
      outputs

      user dimension score
      12345 0 1.0
      dimension cluster score
      0 1 1.0
      1 0 1.0
      2 2 1.0
      user dimension score dimension cluster score
      12345 0 1.0 null null null

      However, using the RDD API produces the correct result

      dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5)

      res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))

      We've tried changing the output format to ORC instead of parquet, but we see the same results. Running Spark 2.0 locally, not on a cluster, does not have this issue. Also running spark in local mode on the master node of the Hadoop cluster also works. Only when running on top of YARN do we see this issue.

      This also seems very similar to this issue: https://issues.apache.org/jira/browse/SPARK-10896

      We have also determined this appears to be related to the memory settings of the cluster. The worker machines have 56000MB available, the node manager memory is set to 54784M and executor memory set to 48407M when we see this issue happen. Lowering the executor memory to something like 28407M removes the issue from happening.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              shankinson Stephen Hankinson
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: