Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22828

Data corruption happens when same RDD being repeatedly used as parent RDD of a custom RDD which reads each parent RDD in concurrent threads

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • Spark Core

    Description

      I have defined a custom RDD that

      • computes the output based on input data using our traditional data transformation code. To give an extreme example, this custom RDD can behave as a union, joiner etc.
      • takes one or more parent RDDs as input, where some or all parent RDDs can be the same
      • reads input parent RDDs in concurrent threads (i.e. reader threads)
      • computes the data in one or more transformation thread that concurrently running as the reader threads
      • ...

      In certain cases, we see data being corrupted when our reader threads read them in. The corruption happens when all of the following conditions are met:

      • Multiple parent RDDs of the custom RDD are actually the same RDD. e.g. same-source union.
      The scala code is kind of like this:
      Rdd rdd1 = ...
      Rdd customRdd = new MyRdd(rdd1, rdd1, ...)
      
      • The parent RDD is not a result of repartitioning or sorting-within-partition.
      • There is no persistence on the same parent RDD.
      • spark.sql.shuffle.partitions is set to 1. We saw corruption as well when the value is set to small value like 2, which is also the source partition count.

      This data corruption happens even when number of executors and cores are set to 1. Meaning this corruption is not related to multiple partitions running concurrently.

      Data corruption doesn't happen when either of the condition is met:
      1. Instead of setting the same parent RDD as multiple input to my custom RDD, we do a select (of all columns) operation on that parent RDD, and use different select RDD as input.

       The scala code is like this:
      Rdd rdd1 = ...
      Rdd customRdd = new MyRdd(rdd1.select($1,$2,...), rdd1.select($1, $2), ...)
      

      2. we persist the parent RDD

      Rdd rdd1 = ...
      rdd1.persist(...)
      Rdd customRdd = MyRdd(rdd1, rdd1, ...)
      

      3. we use single thread to read parent RDD in custom RDD implementation
      4. Use our default value (100) for spark.sql.shuffle.partitions

      Attachments

        Activity

          People

            Unassigned Unassigned
            yxiao Yongqin Xiao
            jiangxb jiangxb
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: