Spark introduced new writer mode to overwrite only related partitions in
SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS.
A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")
If HDFS location "/my/hdfs/location" does not exist, there will be no output.
This seems to be caused by the job commit change in
SPARK-20236 in HadoopMapReduceCommitProtocol.
In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level.
This does not happen in unit test covered with
SPARK-20236 with local file system.
We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed.
Reference: In official hdfs documentation(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename operation has preconditions: