Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.1
-
None
Description
Recently when evaluating the code in HadoopMapReduceCommitProtocol#commitJob, I found some bad codepath under the dynamicPartitionOverwrite == true scenario:
# BLOCK 1 if (dynamicPartitionOverwrite) { val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths") absPartitionPaths.foreach(fs.delete(_, true)) } # BLOCK 2 for ((src, dst) <- filesToMove) { fs.rename(new Path(src), new Path(dst)) } # BLOCK 3 if (dynamicPartitionOverwrite) { val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") for (part <- partitionPaths) { val finalPartPath = new Path(path, part) if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { // According to the official hadoop FileSystem API spec, delete op should assume // the destination is no longer present regardless of return value, thus we do not // need to double check if finalPartPath exists before rename. // Also in our case, based on the spec, delete returns false only when finalPartPath // does not exist. When this happens, we need to take action if parent of finalPartPath // also does not exist(e.g. the scenario described on SPARK-23815), because // FileSystem API spec on rename op says the rename dest(finalPartPath) must have // a parent that exists, otherwise we may get unexpected result on the rename. fs.mkdirs(finalPartPath.getParent) } fs.rename(new Path(stagingDir, part), finalPartPath) } }
Assuming dynamicPartitionOverwrite == true, we have the following sequence of events:
- Block 1 deletes all parent directories of filesToMove.values
- Block 2 attempts to rename all filesToMove.keys to filesToMove.values
- Block 3 does directory-level renames to place files into their final locations
All renames in Block 2 will always fail, since all parent directories of filesToMove.values were just deleted in Block 1. Under a normal HDFS scenario, the contract of fs.rename is to return false under such a failure scenario, as opposed to throwing an exception. There is a separate issue here that Block 2 should probably be checking for those false return values – but this allows for dynamicPartitionOverwrite to "work", albeit with a bunch of failed renames in the middle. Really, we should only run Block 2 in the dynamicPartitionOverwrite == false case, and consolidate Blocks 1 and 3 to run in the true case.
We discovered this issue when testing against a FileSystem implementation which was throwing an exception for this failed rename scenario instead of returning false, escalating the silent/ignored rename failures into actual failures.