Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29419

Seq.toDS / spark.createDataset(Seq) is not thread-safe

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0, 3.0.0
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:

      Description

      The Seq.toDS and spark.createDataset(Seq) code is not thread-safe: if the caller-supplied Encoder is used in multiple threads then createDataset's usage of the encoder may lead to incorrect answers because the Encoder's internal mutable state will be updated by from multiple threads.

      Here is an example demonstrating the problem:

      import org.apache.spark.sql._
      
      val enc = implicitly[Encoder[(Int, Int)]]
      
      val datasets = (1 to 100).par.map { _ =>
        val pairs = (1 to 100).map(x => (x, x))
        spark.createDataset(pairs)(enc)
      }
      
      datasets.reduce(_ union _).collect().foreach {
        pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
      }

      Due to the thread-safety issue, the above example results in the creation of corrupted records where different input records' fields are co-mingled.

      This bug is similar to SPARK-22355, a related problem in Dataset.collect() (fixed in Spark 2.2.1+).

      Fortunately, this has a simple one-line fix (copy the encoder); I'll submit a patch for this shortly.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                joshrosen Josh Rosen
                Reporter:
                joshrosen Josh Rosen
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated: