Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.0.0
    • Component/s: SQL
    • Labels:
      None
    • Target Version/s:

      Description

      When we use DirectParquetOutputCommitter on S3 and speculation is enabled, there is a chance that we can loss data.

      Here is the code to reproduce the problem.

      import org.apache.spark.sql.functions._
      val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: Int, partitionId: Int, attemptNumber: Int) => {
        if (partitionId == 0 && i == 5) {
          if (attemptNumber > 0) {
            Thread.sleep(15000)
            throw new Exception("new exception")
          } else {
            Thread.sleep(10000)
          }
        }
        
        i
      })
      
      val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
        val context = org.apache.spark.TaskContext.get()
        val partitionId = context.partitionId
        val attemptNumber = context.attemptNumber
        iter.map(i => (i, partitionId, attemptNumber))
      }.toDF("i", "partitionId", "attemptNumber")
      
      df
        .select(failSpeculativeTask($"i", $"partitionId", $"attemptNumber").as("i"), $"partitionId", $"attemptNumber")
        .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter")
      
      sqlContext.read.load("/home/yin/outputCommitter").count
      // The result is 99 and 5 is missing from the output.
      

      What happened is that the original task finishes first and uploads its output file to S3, then the speculative task somehow fails. Because we have to call output stream's close method, which uploads data to S3, we actually uploads the partial result generated by the failed speculative task to S3 and this file overwrites the correct file generated by the original task.

        Issue Links

          Activity

          Hide
          rxin Reynold Xin added a comment -

          Let's not remove it for now until we have a better alternative.

          Show
          rxin Reynold Xin added a comment - Let's not remove it for now until we have a better alternative.
          Hide
          stevel@apache.org Steve Loughran added a comment -

          The fault is having speculation turned on, rather than the committer itself. Best to add a way for the system to detect that the output is going to an object store with potential consistency issues, and reject.

          In HADOOP-9545 we've been considering an explicit object store API, one which uses PUT to write stuff, rather than pretend that the output stream is writing stuff and that close() is a low-cost, minimal side-effect operation.

          Show
          stevel@apache.org Steve Loughran added a comment - The fault is having speculation turned on, rather than the committer itself. Best to add a way for the system to detect that the output is going to an object store with potential consistency issues, and reject. In HADOOP-9545 we've been considering an explicit object store API, one which uses PUT to write stuff, rather than pretend that the output stream is writing stuff and that close() is a low-cost, minimal side-effect operation.
          Hide
          yhuai Yin Huai added a comment -

          Steve Loughran Seems HADOOP-9545 is not the right jira?

          Show
          yhuai Yin Huai added a comment - Steve Loughran Seems HADOOP-9545 is not the right jira?
          Hide
          stevel@apache.org Steve Loughran added a comment -

          sorry! HADOOP-9565

          Show
          stevel@apache.org Steve Loughran added a comment - sorry! HADOOP-9565
          Hide
          stevel@apache.org Steve Loughran added a comment -

          I should add that as the default committer is using rename(), on some object stores (s3n, swift), that's a client-side copy may be taking place. On s3a a server-side copy happens. After all of these, a recursive delete kicks in. So the FileOutputCommitter is equally prone to race conditions, and uses significantly more IO; rename() is likely to take time O(data) rather than O(1). I'd go for direct, if you are planning to use s3 as the direct output of an operation. For speculation, better to write to HDFS and then copy after

          Show
          stevel@apache.org Steve Loughran added a comment - I should add that as the default committer is using rename(), on some object stores (s3n, swift), that's a client-side copy may be taking place. On s3a a server-side copy happens. After all of these, a recursive delete kicks in. So the FileOutputCommitter is equally prone to race conditions, and uses significantly more IO; rename() is likely to take time O(data) rather than O(1). I'd go for direct, if you are planning to use s3 as the direct output of an operation. For speculation, better to write to HDFS and then copy after
          Hide
          rxin Reynold Xin added a comment -

          Note that since this is a problem when there are multiple attempts of the same task due to failures, even when speculation is off. Let's remove it in Spark 2.0 so users don't run into corrupted outputs.

          Show
          rxin Reynold Xin added a comment - Note that since this is a problem when there are multiple attempts of the same task due to failures, even when speculation is off. Let's remove it in Spark 2.0 so users don't run into corrupted outputs.
          Hide
          apachespark Apache Spark added a comment -

          User 'rxin' has created a pull request for this issue:
          https://github.com/apache/spark/pull/12229

          Show
          apachespark Apache Spark added a comment - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/12229
          Hide
          stevel@apache.org Steve Loughran added a comment -

          The problem with returning to the classic committer is that it assumes that rename is O(1) & atomic, neither condition holding against s3 or swift. It can fail too, except the failure window is smaller (the O(output-size) rename phase) rather than the whole app.

          Someone (and I suspect it will be me, unless there are other volunteers) will have to do something that works better. Ideally something like direct output with a commit protocol that either works well on an object store with create consistency (as all S3 installations now do), or at least can outsource the consistency requirements to something else (as s3mper does).

          At the very least, it can do recovery of previous failures on startup through some cleanup mechanism.

          Show
          stevel@apache.org Steve Loughran added a comment - The problem with returning to the classic committer is that it assumes that rename is O(1) & atomic, neither condition holding against s3 or swift. It can fail too, except the failure window is smaller (the O(output-size) rename phase) rather than the whole app. Someone (and I suspect it will be me, unless there are other volunteers) will have to do something that works better. Ideally something like direct output with a commit protocol that either works well on an object store with create consistency (as all S3 installations now do), or at least can outsource the consistency requirements to something else (as s3mper does). At the very least, it can do recovery of previous failures on startup through some cleanup mechanism.
          Hide
          mkim Mingyu Kim added a comment -

          Reynold Xin, can you clarify why this is a problem even when speculation is off? I've been using direct output committers without any problem with speculation off. DataFrame knew how to clean up left-over files from failed tasks as long as the two task runs don't overlap. (i.e. retry starts after the former try finishes)

          Show
          mkim Mingyu Kim added a comment - Reynold Xin , can you clarify why this is a problem even when speculation is off? I've been using direct output committers without any problem with speculation off. DataFrame knew how to clean up left-over files from failed tasks as long as the two task runs don't overlap. (i.e. retry starts after the former try finishes)
          Hide
          rxin Reynold Xin added a comment -

          I think Josh et al already replied – but to close the loop, the direct committer is not safe when there is a network partition, e.g. Spark driver might not be aware of a task that's running on the executor.

          Show
          rxin Reynold Xin added a comment - I think Josh et al already replied – but to close the loop, the direct committer is not safe when there is a network partition, e.g. Spark driver might not be aware of a task that's running on the executor.
          Hide
          matt.martin Matt Martin added a comment - - edited

          For what it's worth, I gather that the Netflix folks have their own S3-specific solution (based purely on the couple minutes they spend talking about in this Hadoop Summit talk: https://youtu.be/85sew9OFaYc?t=8m39s).

          Show
          matt.martin Matt Martin added a comment - - edited For what it's worth, I gather that the Netflix folks have their own S3-specific solution (based purely on the couple minutes they spend talking about in this Hadoop Summit talk: https://youtu.be/85sew9OFaYc?t=8m39s ).
          Hide
          andyd88 Andy Dang added a comment -

          What's the alternative for this? Because we rely on this for publishing our parquet files to S3 and without this the normal output committer takes forever to rename the files in S3.

          Show
          andyd88 Andy Dang added a comment - What's the alternative for this? Because we rely on this for publishing our parquet files to S3 and without this the normal output committer takes forever to rename the files in S3.
          Hide
          stevel@apache.org Steve Loughran added a comment -

          The solution for this is going to be s3guard, HADOOP-13345, which adds the dynamo-metadata storing for atomic/consistent operations, plus, as an added bonus, the ability to skip s3 HTTP calls in getFileStatus(). That'll be the foundation for an output committer to handle speculative commits and bypass the rename.

          That work is just starting up —I would strongly encourage you to get involved, making sure your needs are represented, and helping test it.

          Until then:

          1. switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard.
          2. use the Hadoop FileOutputCommitter and set mapreduce.fileoutputcommitter.algorithm.version to 2.
          Show
          stevel@apache.org Steve Loughran added a comment - The solution for this is going to be s3guard, HADOOP-13345 , which adds the dynamo-metadata storing for atomic/consistent operations, plus, as an added bonus, the ability to skip s3 HTTP calls in getFileStatus(). That'll be the foundation for an output committer to handle speculative commits and bypass the rename. That work is just starting up —I would strongly encourage you to get involved, making sure your needs are represented, and helping test it. Until then: switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard. use the Hadoop FileOutputCommitter and set mapreduce.fileoutputcommitter.algorithm.version to 2.
          Hide
          Mayank-Shete Mayank Shete added a comment -

          How can you achieve this in AWS EMR 5.0 while using Spark 2.0 ?

          Show
          Mayank-Shete Mayank Shete added a comment - How can you achieve this in AWS EMR 5.0 while using Spark 2.0 ?
          Hide
          stevel@apache.org Steve Loughran added a comment -

          Amazon EMR's s3 is its own codebase; afraid you'll have to talk to the EMR team there

          Show
          stevel@apache.org Steve Loughran added a comment - Amazon EMR's s3 is its own codebase; afraid you'll have to talk to the EMR team there
          Hide
          aid129 Ai Deng added a comment -

          You can add this line for your SparkContext, and this change the EMR's hadoop config

          sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
          
          Show
          aid129 Ai Deng added a comment - You can add this line for your SparkContext, and this change the EMR's hadoop config sparkContext.hadoopConfiguration.set( "mapreduce.fileoutputcommitter.algorithm.version" , "2" )
          Show
          yhuai Yin Huai added a comment - Steve Loughran I took a quick look at hadoop 1 ( https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L111 ) and hadoop 2 ( https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L326 ). Seems Hadoop 1 actually uses algorithm 2. Is my understanding correct?
          Hide
          stevel@apache.org Steve Loughran added a comment -

          Looking at the git logs to see which code changed, MAPREDUCE-4815(https://issues.apache.org/jira/browse/MAPREDUCE-4815) implies that you are correct —but that at some point in Hadoop 0.23 (the one between 1.x and 2.x), the commit algorithm changed and slowed down

          Show
          stevel@apache.org Steve Loughran added a comment - Looking at the git logs to see which code changed, MAPREDUCE-4815 ( https://issues.apache.org/jira/browse/MAPREDUCE-4815 ) implies that you are correct —but that at some point in Hadoop 0.23 (the one between 1.x and 2.x), the commit algorithm changed and slowed down
          Hide
          yhuai Yin Huai added a comment - - edited

          Thanks! Seems https://issues.apache.org/jira/browse/MAPREDUCE-2702 introduced the change (diff).

          Show
          yhuai Yin Huai added a comment - - edited Thanks! Seems https://issues.apache.org/jira/browse/MAPREDUCE-2702 introduced the change ( diff ).
          Hide
          chiragvaya Chirag Vaya added a comment -

          Mingyu Kim Can you please tell us in what environment(Standalone Spark on single node or multiple nodes or AWS EMR) were you using direct output committer ? According to Reynold Xin, any environment that has network partition (e.g. AWS EMR) would lead to inconsistencies. Please correct me if i am wrong on this.

          Show
          chiragvaya Chirag Vaya added a comment - Mingyu Kim Can you please tell us in what environment(Standalone Spark on single node or multiple nodes or AWS EMR) were you using direct output committer ? According to Reynold Xin , any environment that has network partition (e.g. AWS EMR) would lead to inconsistencies. Please correct me if i am wrong on this.
          Hide
          stevel@apache.org Steve Loughran added a comment -

          HADOOP-13786 covers adding a committer for direct output to S3, provided S3 adds the ability to fail any PUT which would ovewrite and existing blob, that is: with an atomic PUT-without-overwrite. Dynamo can add this for AWS S3; other implementations of the API may be able to provide something similar

          Show
          stevel@apache.org Steve Loughran added a comment - HADOOP-13786 covers adding a committer for direct output to S3, provided S3 adds the ability to fail any PUT which would ovewrite and existing blob, that is: with an atomic PUT-without-overwrite. Dynamo can add this for AWS S3; other implementations of the API may be able to provide something similar
          Hide
          apachespark Apache Spark added a comment -

          User 'rxin' has created a pull request for this issue:
          https://github.com/apache/spark/pull/16796

          Show
          apachespark Apache Spark added a comment - User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/16796
          Hide
          apachespark Apache Spark added a comment -

          User 'cloud-fan' has created a pull request for this issue:
          https://github.com/apache/spark/pull/18689

          Show
          apachespark Apache Spark added a comment - User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/18689
          Hide
          apachespark Apache Spark added a comment -

          User 'cloud-fan' has created a pull request for this issue:
          https://github.com/apache/spark/pull/18716

          Show
          apachespark Apache Spark added a comment - User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/18716

            People

            • Assignee:
              rxin Reynold Xin
              Reporter:
              yhuai Yin Huai
            • Votes:
              0 Vote for this issue
              Watchers:
              18 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development