Index: core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1427404) +++ core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -67,6 +67,9 @@ */ class FileOutputCommitterContainer extends OutputCommitterContainer { + private static final String TEMP_DIR_NAME = "_temporary"; + private static final String LOGS_DIR_NAME = "_logs"; + private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class); private final boolean dynamicPartitioningUsed; private boolean partitionsDiscovered; @@ -405,6 +408,9 @@ Path srcDir, Path destDir, final boolean dryRun) throws IOException { + if (file.getName().equals(TEMP_DIR_NAME) || file.getName().equals(LOGS_DIR_NAME) || file.getName().equals(SUCCEEDED_FILE_NAME)) { + return; + } final Path finalOutputPath = getFinalPath(file, srcDir, destDir); if (fs.isFile(file)) { if (dryRun){ @@ -434,45 +440,58 @@ } } else if(fs.getFileStatus(file).isDir()) { FileStatus[] children = fs.listStatus(file); - if (children != null && children.length > 0) { - FileStatus firstChild = children[0]; - if(firstChild.isDir()) { - // If the first child is directory, then rest would be directory too according to HCatalog dir structure - // recurse in that case - for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + FileStatus firstChild = null; + if (children != null) { + int index=0; + while (index < children.length) { + if (!children[index].getPath().getName().equals(TEMP_DIR_NAME) && !children[index].getPath().getName().equals(LOGS_DIR_NAME) && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) { + firstChild = children[index]; + break; } - } else { + index++; + } + } + if(firstChild!=null && firstChild.isDir()) { + // If the first child is directory, then rest would be directory too according to HCatalog dir structure + // recurse in that case + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + } + } else { + if (!dryRun) { + if (dynamicPartitioningUsed) { + // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself + // instead of moving each file under the directory. See HCATALOG-538 - if (!dryRun) { - if (dynamicPartitioningUsed) { - // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself - // instead of moving each file under the directory. See HCATALOG-538 - - final Path parentDir = finalOutputPath.getParent(); - // Create the directory - fs.mkdirs(parentDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Moving directory: " + file + " to " + parentDir); - } - if (!fs.rename(file, parentDir)) { - final String msg = "Failed to move file: " + file + " to " + parentDir; - LOG.error(msg); - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); - } - } else { - // In case of no partition we have to move each file - for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); - } + final Path parentDir = finalOutputPath.getParent(); + // Create the directory + Path placeholder = new Path(parentDir, "_placeholder"); + if (fs.mkdirs(parentDir)) { + // It is weired but we need a placeholder, + // otherwise rename cannot move file to the right place + fs.create(placeholder).close(); } + if (LOG.isDebugEnabled()) { + LOG.debug("Moving directory: " + file + " to " + parentDir); + } + if (!fs.rename(file, parentDir)) { + final String msg = "Failed to move file: " + file + " to " + parentDir; + LOG.error(msg); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); + } + fs.delete(placeholder, false); } else { - if(fs.exists(finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath - + ", duplicate publish not possible."); + // In case of no partition we have to move each file + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); } } + } else { + if(fs.exists(finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); + } } } } else {