Index: shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java =================================================================== --- shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 1406021) +++ shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (working copy) @@ -2,9 +2,12 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -125,4 +128,10 @@ return ""; } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file + return "hdfs".equals(fs.getUri().getScheme()); + } } Index: shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java =================================================================== --- shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 1406021) +++ shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (working copy) @@ -33,7 +33,10 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.Progressable; import org.apache.pig.ResourceSchema; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.net.NetUtils; @@ -118,4 +121,11 @@ return ""; } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In case of viewfs we need to lookup where the actual file is to know the filesystem in use. + // resolvePath is a sure shot way of knowing which file system the file is. + return "hdfs".equals(fs.resolvePath(path).toUri().getScheme()); + } } Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java =================================================================== --- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 1406021) +++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (working copy) @@ -21,13 +21,14 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.util.Progressable; @@ -89,4 +90,12 @@ public InetSocketAddress getResourceManagerAddress(Configuration conf); public String getPropertyName(PropertyName name); + + /** Checks if file is in HDFS filesystem. + * + * @param fs + * @param path + * @return true if the file is in HDFS, false if the file is in other file systems. + */ + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException; } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1406021) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -35,12 +35,9 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.HCatMapRedUtil; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -54,11 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; -import java.io.PrintWriter; -import java.io.Writer; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; @@ -176,18 +169,16 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (getOutputDirMarking(jobContext.getConfiguration())) { Path outputPath = new Path(jobInfo.getLocation()); - if (outputPath != null) { - FileSystem fileSys = outputPath.getFileSystem(jobContext - .getConfiguration()); - // create a file in the folder to mark it - if (fileSys.exists(outputPath)) { - Path filePath = new Path(outputPath, - SUCCEEDED_FILE_NAME); - if (!fileSys.exists(filePath)) { // may have been - // created by - // baseCommitter.commitJob() - fileSys.create(filePath).close(); - } + FileSystem fileSys = outputPath.getFileSystem(jobContext + .getConfiguration()); + // create a file in the folder to mark it + if (fileSys.exists(outputPath)) { + Path filePath = new Path(outputPath, + SUCCEEDED_FILE_NAME); + if (!fileSys.exists(filePath)) { // may have been + // created by + // baseCommitter.commitJob() + fileSys.create(filePath).close(); } } } @@ -266,7 +257,10 @@ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } // Apply the group and permissions to the leaf partition and files. - applyGroupAndPerms(fs, partPath, perms, grpName, true); + // Need not bother in case of HDFS as permission is taken care of by setting UMask + if(!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) { + applyGroupAndPerms(fs, partPath, perms, grpName, true); + } if (dynamicPartitioningUsed){ String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs); if (harProcessor.isEnabled()){ @@ -287,23 +281,11 @@ String group, boolean recursive) throws IOException { fs.setPermission(dir, permission); - try { - fs.setOwner(dir, null, group); - } catch (AccessControlException ace) { - LOG.warn("Error changing group of " + dir, ace); - } - if (recursive) { - for (FileStatus fileStatus : fs.listStatus(dir)) { - if (fileStatus.isDir()) { - applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive); - } else { - fs.setPermission(fileStatus.getPath(), permission); - try { - fs.setOwner(dir, null, group); - } catch (AccessControlException ace) { - LOG.warn("Error changing group of " + dir, ace); - } - } + for (FileStatus fileStatus : fs.listStatus(dir)) { + if (fileStatus.isDir()) { + applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true); + } else { + fs.setPermission(fileStatus.getPath(), permission); } } } @@ -366,46 +348,88 @@ * Move all of the files from the temp directory to the final location * @param fs the output file system * @param file the file to move - * @param src the source directory - * @param dest the target directory + * @param srcDir the source directory + * @param destDir the target directory * @param dryRun - a flag that simply tests if this move would succeed or not based * on whether other files exist where we're trying to copy * @throws java.io.IOException */ private void moveTaskOutputs(FileSystem fs, Path file, - Path src, - Path dest, boolean dryRun) throws IOException { + Path srcDir, + Path destDir, final boolean dryRun) throws IOException { + + final Path finalOutputPath = getFinalPath(file, srcDir, destDir); if (fs.isFile(file)) { - Path finalOutputPath = getFinalPath(file, src, dest); - if (dryRun){ -// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem"); - if (fs.exists(finalOutputPath)){ - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible."); + if(LOG.isDebugEnabled()) { + LOG.debug("Testing if moving file: [" + file + "] to [" + + finalOutputPath + "] would cause a problem"); } - }else{ -// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]"); + if (fs.exists(finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); + } + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]"); + } if (!fs.rename(file, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); } if (!fs.rename(file, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath); } } } } else if(fs.getFileStatus(file).isDir()) { - FileStatus[] paths = fs.listStatus(file); - Path finalOutputPath = getFinalPath(file, src, dest); - if (!dryRun){ - fs.mkdirs(finalOutputPath); - } - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(fs, path.getPath(), src, dest,dryRun); + FileStatus[] children = fs.listStatus(file); + if (children != null && children.length > 0) { + FileStatus firstChild = children[0]; + if(firstChild.isDir()) { + // If the first child is directory, then rest would be directory too according to HCatalog dir structure + // recurse in that case + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + } + } else { + + + if (!dryRun) { + if (dynamicPartitioningUsed) { + // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself + // instead of moving each file under the directory. See HCATALOG-538 + + final Path parentDir = finalOutputPath.getParent(); + // Create the directory + fs.mkdirs(parentDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Moving directory: " + file + " to " + parentDir); + } + if (!fs.rename(file, parentDir)) { + final String msg = "Failed to move file: " + file + " to " + parentDir; + LOG.error(msg); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); + } + } else { + // In case of no partition we have to move each file + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + } + } + } else { + if(fs.exists(finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); + } + } } } + } else { + // Should never happen + final String msg = "Unknown file type being asked to be moved, erroring out"; + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); } } @@ -566,12 +590,12 @@ Path src = new Path(ptnRootLocation); // check here for each dir we're copying out, to see if it // already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath,true); - moveTaskOutputs(fs, src, src, tblPath,false); + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); fs.delete(src, true); try { updateTableSchema(client, table, jobInfo.getOutputSchema()); - LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); client.add_partitions(partitionsToAdd); partitionsAdded = partitionsToAdd; } catch (Exception e){ @@ -587,17 +611,16 @@ }else{ // no harProcessor, regular operation - // No duplicate partition publish case to worry about because we'll - // get a AlreadyExistsException here if so, and appropriately rollback updateTableSchema(client, table, jobInfo.getOutputSchema()); - LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); - client.add_partitions(partitionsToAdd); + LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); partitionsAdded = partitionsToAdd; if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath,false); + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); fs.delete(src, true); } + client.add_partitions(partitionsToAdd); } } catch (Exception e) { if( partitionsAdded.size() > 0 ) { @@ -653,8 +676,7 @@ Path src; OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); if (dynamicPartitioningUsed) { - src = new Path(getPartitionRootLocation(jobInfo.getLocation() - .toString(), jobInfo.getTableInfo().getTable() + src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() .getPartitionKeysSize())); } else { src = new Path(jobInfo.getLocation()); @@ -686,7 +708,7 @@ client.cancelDelegationToken(tokenStrForm); } } catch (MetaException e) { - LOG.warn("MetaException while cancelling delegation token.",e ); + LOG.warn("MetaException while cancelling delegation token.", e); } catch (TException e) { LOG.warn("TException while cancelling delegation token.", e); } finally {