Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23298

distinct.count on Dataset/DataFrame yields non-deterministic results



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0, 2.2.0
    • None
    • Shuffle, Spark Core, SQL, YARN


      This is what happens (EDIT - managed to get a reproducible example):

      /* Exemplary spark-shell starting command 
      /opt/spark/bin/spark-shell \
      --num-executors 269 \
      --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
      --conf spark.kryoserializer.buffer.max=512m 
      // The spark.sql.shuffle.partitions is 2154 here, if that matters
      val df = spark.range(10000000).withColumn("col1", (rand() * 1000).cast("long")).withColumn("col2", (rand() * 1000).cast("long")).drop("id")
      // Then, ideally in a new session
      val df = spark.read.parquet("/test.parquet")
      // res1: Long = 1001256                                                            
      // res2: Long = 999955   

      The text_dataset.out file is a dataset with one string per line. The string has alphanumeric characters as well as colons and spaces. The line length does not exceed 1200. I don't think that's important though, as the issue appeared on various other datasets, I just tried to narrow it down to the simplest possible case. (the case is now fully reproducible with the above code)

      The observations regarding the issue are as follows:

      • I managed to reproduce it on both spark 2.2 and spark 2.1.
      • The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
      • The issue is not reproducible on a single machine (e.g. laptop) in spark local mode.
      • It seems that once the correct count is computed, it is not possible to reproduce the issue in the same spark session. In other words, I was able to get 2-3 incorrect distinct.count results consecutively, but once it got right, it always returned the correct value. I had to re-run spark-shell to observe the problem again.
      • The issue appears on both Dataset and DataFrame (i.e. using read.text or read.textFile).
      • The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
      • Not a single container has failed in those multiple invalid executions.
      • YARN doesn't show any warnings or errors in those invalid executions.
      • The execution plan determined for both valid and invalid executions was always the same (it's shown in the SQL tab of the UI).
      • The number returned in the invalid executions was always greater than the correct number (24 014 227).
      • This occurs even though the input is already completely deduplicated (i.e. distinct.count shouldn't change anything).
      • The input isn't replicated (i.e. there's only one copy of each file block on the HDFS).
      • The problem is probably not related to reading from HDFS. Spark was always able to correctly read all input records (which was shown in the UI), and that number got malformed after the exchange phase:
        • correct execution:
          Input Size / Records: 3.9 GB / 24014227 (first stage)
          Shuffle Write: 3.3 GB / 24014227 (first stage)
          Shuffle Read: 3.3 GB / 24014227 (second stage)
        • incorrect execution:
          Input Size / Records: 3.9 GB / 24014227 (first stage)
          Shuffle Write: 3.3 GB / 24014227 (first stage)
          Shuffle Read: 3.3 GB / 24020150 (second stage)
      • The problem might be related with the internal way of Encoders hashing. The reason might be:
        • in a simple `distinct.count` invocation, there are in total three hash-related stages (called `HashAggregate`),
        • excerpt from scaladoc for `distinct` method says:
             * @note Equality checking is performed directly on the encoded representation of the data
             * and thus is not affected by a custom `equals` function defined on `T`.
      • One of my suspicions was the number of partitions we're using (2154). This is greater than 2000, which means that a different data structure (i.e. HighlyCompressedMapStatus_instead of _CompressedMapStatus) will be used for book-keeping during the shuffle. Unfortunately after decreasing the number below this threshold the problem still occurs.
      • It's easier to reproduce the issue with a large number of partitions.
      • One of my another suspicions was that it's somehow related to the number of blocks on the HDFS (974). I was able to reproduce the problem with both less and more partitions than this value, so I think this is not the case.
      • Final note: It looks like for some reason the data gets duplicated in the process of data exchange during the shuffle (because shuffle read sees more elements than shuffle write has written).

      Please let me know if you have any other questions.

      I couldn't find much about similar problems on the Web, the only thing I found was on the spark mailing list where someone using PySpark has found that one of his/her executors was hashing things differently than the other one which caused a similar issue.

      I didn't include a reproducible example as this is just a long file with strings and as this occurred on many different datasets, I doubt it's data-related. If that's necessary though, please let me know and I will try to prepare an example.




            Unassigned Unassigned
            mjukiewicz Mateusz Jukiewicz
            1 Vote for this issue
            9 Start watching this issue