Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8406

Race condition when writing Parquet files

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.4.1, 1.5.0
    • Component/s: SQL
    • Labels:
      None

      Description

      To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (the <id> in output file name "part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may see wrong max part number generated by newly written files by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive IDs in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, thus one of them gets overwritten by the other.

      The following Spark shell snippet can reproduce nonconsecutive part numbers:

      sqlContext.range(0, 128).repartition(16).write.mode("overwrite").parquet("foo")
      

      "16" can be replaced with any integer that is greater than the default parallelism on your machine (usually it means core number, on my machine it's 8).

      -rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 /user/lian/foo/_SUCCESS
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00001.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00002.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00003.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00004.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00005.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00006.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00007.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00008.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00017.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00018.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00019.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00020.gz.parquet
      -rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 /user/lian/foo/part-r-00021.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00022.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00023.gz.parquet
      -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 /user/lian/foo/part-r-00024.gz.parquet
      

      And here is another Spark shell snippet for reproducing overwriting:

      sqlContext.range(0, 10000).repartition(500).write.mode("overwrite").parquet("foo")
      sqlContext.read.parquet("foo").count()
      

      Expected answer should be 10000, but you may see a number like 9960 due to overwriting. The actual number varies for different runs and different nodes.

      Notice that the newly added ORC data source is less likely to hit this issue because it uses task ID and System.currentTimeMills() to generate the output file name. Thus, the ORC data source may hit this issue only when two tasks with the same task ID (which means they are in two concurrent jobs) are writing to the same location within the same millisecond.

        Issue Links

          Activity

          Hide
          yhuai Yin Huai added a comment -

          Nathan McCarthy I have merged the fix to both master and branch 1.4.

          Show
          yhuai Yin Huai added a comment - Nathan McCarthy I have merged the fix to both master and branch 1.4.
          Hide
          yhuai Yin Huai added a comment -

          Issue resolved by pull request 6864
          https://github.com/apache/spark/pull/6864

          Show
          yhuai Yin Huai added a comment - Issue resolved by pull request 6864 https://github.com/apache/spark/pull/6864
          Hide
          apachespark Apache Spark added a comment -

          User 'liancheng' has created a pull request for this issue:
          https://github.com/apache/spark/pull/6932

          Show
          apachespark Apache Spark added a comment - User 'liancheng' has created a pull request for this issue: https://github.com/apache/spark/pull/6932
          Hide
          lian cheng Cheng Lian added a comment -

          Nathan McCarthy, thanks again for the report. Here is a summary for better understanding of this issue.

          Show
          lian cheng Cheng Lian added a comment - Nathan McCarthy , thanks again for the report. Here is a summary for better understanding of this issue.
          Hide
          apachespark Apache Spark added a comment -

          User 'liancheng' has created a pull request for this issue:
          https://github.com/apache/spark/pull/6864

          Show
          apachespark Apache Spark added a comment - User 'liancheng' has created a pull request for this issue: https://github.com/apache/spark/pull/6864
          Hide
          lian cheng Cheng Lian added a comment -

          Yeah, just updated the JIRA description. ORC may hit this issue only when two tasks with the same task ID (which means they are in two concurrent jobs) are writing to the same location within the same millisecond.

          Show
          lian cheng Cheng Lian added a comment - Yeah, just updated the JIRA description. ORC may hit this issue only when two tasks with the same task ID (which means they are in two concurrent jobs) are writing to the same location within the same millisecond.
          Hide
          marmbrus Michael Armbrust added a comment -

          It seems to me that ORC is not free of this bug, but instead just more likely to avoid a problem, right?

          Show
          marmbrus Michael Armbrust added a comment - It seems to me that ORC is not free of this bug, but instead just more likely to avoid a problem, right?
          Hide
          lian cheng Cheng Lian added a comment - - edited

          An example task execution order which causes overwriting:

          1. Writing a DataFrame with 4 RDD partitions to an empty directory.
          2. Task 1 and task 2 get scheduled, while task 3 and task 4 are queued. Both task 1 and task 2 find current max part number to be 0 (because destination directory is empty).
          3. Task 1 finishes, generates part-r-00001.gz.parquet. Current max part number becomes 1.
          4. Task 4 gets scheduled, decides to write to part-r-00005.gz.parquet (5 = current max part number + task ID), but hasn't start writing the file yet.
          5. Task 2 finishes, generates part-r-00002.gz.parquet. Current max part number becomes 2.
          6. Task 3 gets scheduled, also decides to write to part-r-00005.gz.parquet since task 4 hasn't start writing its output file, and task 3 finds current max part number is still 2.
          7. Task 4 finishes writing part-r-00005.gz.parquet
          8. Task 3 finishes writing part-r-00005.gz.parquet
          9. Output of task 4 is overwritten.
          Show
          lian cheng Cheng Lian added a comment - - edited An example task execution order which causes overwriting: Writing a DataFrame with 4 RDD partitions to an empty directory. Task 1 and task 2 get scheduled, while task 3 and task 4 are queued. Both task 1 and task 2 find current max part number to be 0 (because destination directory is empty). Task 1 finishes, generates part-r-00001.gz.parquet . Current max part number becomes 1. Task 4 gets scheduled, decides to write to part-r-00005.gz.parquet (5 = current max part number + task ID), but hasn't start writing the file yet. Task 2 finishes, generates part-r-00002.gz.parquet . Current max part number becomes 2. Task 3 gets scheduled, also decides to write to part-r-00005.gz.parquet since task 4 hasn't start writing its output file, and task 3 finds current max part number is still 2. Task 4 finishes writing part-r-00005.gz.parquet Task 3 finishes writing part-r-00005.gz.parquet Output of task 4 is overwritten.
          Hide
          nemccarthy Nathan McCarthy added a comment -

          This is hitting us hard. Let me know if there is anything we can do to help on this end with contributing a fix or testing.

          FYI heres details from the mailing list.

          When trying to save a data frame with 569610608 rows.

          dfc.write.format("parquet").save(“/data/map_parquet_file")

          We get random results between runs. Caching the data frame in memory makes no difference. It looks like the write out misses some of the RDD partitions. We have an RDD with 6750 partitions. When we write out we get less files out than the number of partitions. When reading the data back in and running a count, we get smaller number of rows.

          I’ve tried counting the rows in all different ways. All return the same result, 560214031 rows, missing about 9.4 million rows (0.15%).

          qc.read.parquet("/data/map_parquet_file").count
          qc.read.parquet("/data/map_parquet_file").rdd.count
          qc.read.parquet("/data/map_parquet_file").mapPartitions

          {itr => var c = 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator }

          .reduce(_ + _)

          Looking on HDFS the files, there are 6643 .parquet files. 107 missing partitions (about 0.15%).

          Then writing out the same cached DF again to a new file gives 6717 files on hdfs (about 33 files missing or 0.5%);

          dfc.write.parquet(“/data/map_parquet_file_2")

          And we get 566670107 rows back (about 3million missing ~0.5%);

          qc.read.parquet("/data/map_parquet_file_2").count

          Writing the same df out to json writes the expected number (6750) of parquet files and returns the right number of rows 569610608.

          dfc.write.format("json").save("/data/map_parquet_file_3")
          qc.read.format("json").load("/data/map_parquet_file_3").count

          One thing to note is that the parquet part files on HDFS are not the normal sequential part numbers like for the json output and parquet output in Spark 1.3.

          part-r-06151.gz.parquet part-r-118401.gz.parquet part-r-146249.gz.parquet part-r-196755.gz.parquet part-r-35811.gz.parquet part-r-55628.gz.parquet part-r-73497.gz.parquet part-r-97237.gz.parquet
          part-r-06161.gz.parquet part-r-118406.gz.parquet part-r-146254.gz.parquet part-r-196763.gz.parquet part-r-35826.gz.parquet part-r-55647.gz.parquet part-r-73500.gz.parquet _SUCCESS

          We are using MapR 4.0.2 for hdfs.

          Show
          nemccarthy Nathan McCarthy added a comment - This is hitting us hard. Let me know if there is anything we can do to help on this end with contributing a fix or testing. FYI heres details from the mailing list. — When trying to save a data frame with 569610608 rows. dfc.write.format("parquet").save(“/data/map_parquet_file") We get random results between runs. Caching the data frame in memory makes no difference. It looks like the write out misses some of the RDD partitions. We have an RDD with 6750 partitions. When we write out we get less files out than the number of partitions. When reading the data back in and running a count, we get smaller number of rows. I’ve tried counting the rows in all different ways. All return the same result, 560214031 rows, missing about 9.4 million rows (0.15%). qc.read.parquet("/data/map_parquet_file").count qc.read.parquet("/data/map_parquet_file").rdd.count qc.read.parquet("/data/map_parquet_file").mapPartitions {itr => var c = 0; itr.foreach(_ => c = c + 1); Seq(c).toIterator } .reduce(_ + _) Looking on HDFS the files, there are 6643 .parquet files. 107 missing partitions (about 0.15%). Then writing out the same cached DF again to a new file gives 6717 files on hdfs (about 33 files missing or 0.5%); dfc.write.parquet(“/data/map_parquet_file_2") And we get 566670107 rows back (about 3million missing ~0.5%); qc.read.parquet("/data/map_parquet_file_2").count Writing the same df out to json writes the expected number (6750) of parquet files and returns the right number of rows 569610608. dfc.write.format("json").save("/data/map_parquet_file_3") qc.read.format("json").load("/data/map_parquet_file_3").count One thing to note is that the parquet part files on HDFS are not the normal sequential part numbers like for the json output and parquet output in Spark 1.3. part-r-06151.gz.parquet part-r-118401.gz.parquet part-r-146249.gz.parquet part-r-196755.gz.parquet part-r-35811.gz.parquet part-r-55628.gz.parquet part-r-73497.gz.parquet part-r-97237.gz.parquet part-r-06161.gz.parquet part-r-118406.gz.parquet part-r-146254.gz.parquet part-r-196763.gz.parquet part-r-35826.gz.parquet part-r-55647.gz.parquet part-r-73500.gz.parquet _SUCCESS We are using MapR 4.0.2 for hdfs.

            People

            • Assignee:
              lian cheng Cheng Lian
              Reporter:
              lian cheng Cheng Lian
              Shepherd:
              Yin Huai
            • Votes:
              2 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development