Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29419

Seq.toDS / spark.createDataset(Seq) is not thread-safe

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.0, 3.0.0
    • 2.4.6, 3.0.0
    • SQL

    Description

      The Seq.toDS and spark.createDataset(Seq) code is not thread-safe: if the caller-supplied Encoder is used in multiple threads then createDataset's usage of the encoder may lead to incorrect answers because the Encoder's internal mutable state will be updated by from multiple threads.

      Here is an example demonstrating the problem:

      import org.apache.spark.sql._
      
      val enc = implicitly[Encoder[(Int, Int)]]
      
      val datasets = (1 to 100).par.map { _ =>
        val pairs = (1 to 100).map(x => (x, x))
        spark.createDataset(pairs)(enc)
      }
      
      datasets.reduce(_ union _).collect().foreach {
        pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
      }

      Due to the thread-safety issue, the above example results in the creation of corrupted records where different input records' fields are co-mingled.

      This bug is similar to SPARK-22355, a related problem in Dataset.collect() (fixed in Spark 2.2.1+).

      Fortunately, this has a simple one-line fix (copy the encoder); I'll submit a patch for this shortly.

      Attachments

        Issue Links

          Activity

            People

              joshrosen Josh Rosen
              joshrosen Josh Rosen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: