Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35106

HadoopMapReduceCommitProtocol performs bad rename when dynamic partition overwrite is used

    XMLWordPrintableJSON

Details

    Description

      Recently when evaluating the code in HadoopMapReduceCommitProtocol#commitJob, I found some bad codepath under the dynamicPartitionOverwrite == true scenario:

            # BLOCK 1
            if (dynamicPartitionOverwrite) {
              val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
              logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
              absPartitionPaths.foreach(fs.delete(_, true))
            }
            # BLOCK 2
            for ((src, dst) <- filesToMove) {
              fs.rename(new Path(src), new Path(dst))
            }
      
            # BLOCK 3
            if (dynamicPartitionOverwrite) {
              val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
              logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
              for (part <- partitionPaths) {
                val finalPartPath = new Path(path, part)
                if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
                  // According to the official hadoop FileSystem API spec, delete op should assume
                  // the destination is no longer present regardless of return value, thus we do not
                  // need to double check if finalPartPath exists before rename.
                  // Also in our case, based on the spec, delete returns false only when finalPartPath
                  // does not exist. When this happens, we need to take action if parent of finalPartPath
                  // also does not exist(e.g. the scenario described on SPARK-23815), because
                  // FileSystem API spec on rename op says the rename dest(finalPartPath) must have
                  // a parent that exists, otherwise we may get unexpected result on the rename.
                  fs.mkdirs(finalPartPath.getParent)
                }
                fs.rename(new Path(stagingDir, part), finalPartPath)
              }
            }
      

      Assuming dynamicPartitionOverwrite == true, we have the following sequence of events:

      1. Block 1 deletes all parent directories of filesToMove.values
      2. Block 2 attempts to rename all filesToMove.keys to filesToMove.values
      3. Block 3 does directory-level renames to place files into their final locations

      All renames in Block 2 will always fail, since all parent directories of filesToMove.values were just deleted in Block 1. Under a normal HDFS scenario, the contract of fs.rename is to return false under such a failure scenario, as opposed to throwing an exception. There is a separate issue here that Block 2 should probably be checking for those false return values – but this allows for dynamicPartitionOverwrite to "work", albeit with a bunch of failed renames in the middle. Really, we should only run Block 2 in the dynamicPartitionOverwrite == false case, and consolidate Blocks 1 and 3 to run in the true case.

      We discovered this issue when testing against a FileSystem implementation which was throwing an exception for this failed rename scenario instead of returning false, escalating the silent/ignored rename failures into actual failures.

      Attachments

        Activity

          People

            yuzhousun Yuzhou Sun
            xkrogen Erik Krogen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: