Index: shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java =================================================================== --- shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 1406285) +++ shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (working copy) @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -140,4 +142,10 @@ return ""; } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file + return "hdfs".equals(fs.getUri().getScheme()); + } } Index: shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java =================================================================== --- shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 1406285) +++ shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (working copy) @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.net.NetUtils; @@ -116,4 +119,11 @@ return ""; } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In case of viewfs we need to lookup where the actual file is to know the filesystem in use. + // resolvePath is a sure shot way of knowing which file system the file is. + return "hdfs".equals(fs.resolvePath(path).toUri().getScheme()); + } } Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java =================================================================== --- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 1406285) +++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (working copy) @@ -21,6 +21,8 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -88,4 +90,14 @@ public InetSocketAddress getResourceManagerAddress(Configuration conf); public String getPropertyName(PropertyName name); + + /** + * Checks if file is in HDFS filesystem. + * + * @param fs + * @param path + * @return true if the file is in HDFS, false if the file is in other file systems. + */ + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException; + } Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1406285) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -161,8 +160,7 @@ Path src; OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (dynamicPartitioningUsed) { - src = new Path(getPartitionRootLocation(jobInfo.getLocation() - .toString(), jobInfo.getTableInfo().getTable() + src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() .getPartitionKeysSize())); } else { src = new Path(jobInfo.getLocation()); @@ -205,18 +203,16 @@ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (getOutputDirMarking(jobContext.getConfiguration())) { Path outputPath = new Path(jobInfo.getLocation()); - if (outputPath != null) { - FileSystem fileSys = outputPath.getFileSystem(jobContext - .getConfiguration()); - // create a file in the folder to mark it - if (fileSys.exists(outputPath)) { - Path filePath = new Path(outputPath, - SUCCEEDED_FILE_NAME); - if (!fileSys.exists(filePath)) { // may have been - // created by - // baseCommitter.commitJob() - fileSys.create(filePath).close(); - } + FileSystem fileSys = outputPath.getFileSystem(jobContext + .getConfiguration()); + // create a file in the folder to mark it + if (fileSys.exists(outputPath)) { + Path filePath = new Path(outputPath, + SUCCEEDED_FILE_NAME); + if (!fileSys.exists(filePath)) { // may have been + // created by + // baseCommitter.commitJob() + fileSys.create(filePath).close(); } } } @@ -303,7 +299,10 @@ } // Apply the group and permissions to the leaf partition and files. - applyGroupAndPerms(fs, partPath, perms, grpName, true); + // Need not bother in case of HDFS as permission is taken care of by setting UMask + if (!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) { + applyGroupAndPerms(fs, partPath, perms, grpName, true); + } // Set the location in the StorageDescriptor if (dynamicPartitioningUsed) { @@ -325,22 +324,12 @@ String group, boolean recursive) throws IOException { fs.setPermission(dir, permission); - try { - fs.setOwner(dir, null, group); - } catch (AccessControlException ace) { - LOG.warn("Error changing group of " + dir, ace); - } if (recursive) { for (FileStatus fileStatus : fs.listStatus(dir)) { if (fileStatus.isDir()) { - applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive); + applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true); } else { fs.setPermission(fileStatus.getPath(), permission); - try { - fs.setOwner(dir, null, group); - } catch (AccessControlException ace) { - LOG.warn("Error changing group of " + dir, ace); - } } } } @@ -387,6 +376,7 @@ private void updateTableSchema(HiveMetaStoreClient client, Table table, HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException { + List newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema); if (newColumns.size() != 0) { @@ -403,46 +393,88 @@ * Move all of the files from the temp directory to the final location * @param fs the output file system * @param file the file to move - * @param src the source directory - * @param dest the target directory + * @param srcDir the source directory + * @param destDir the target directory * @param dryRun - a flag that simply tests if this move would succeed or not based * on whether other files exist where we're trying to copy * @throws java.io.IOException */ private void moveTaskOutputs(FileSystem fs, Path file, - Path src, - Path dest, boolean dryRun) throws IOException { + Path srcDir, + Path destDir, final boolean dryRun) throws IOException { + + final Path finalOutputPath = getFinalPath(file, srcDir, destDir); if (fs.isFile(file)) { - Path finalOutputPath = getFinalPath(file, src, dest); - - if (dryRun) { -// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem"); + if (dryRun){ + if(LOG.isDebugEnabled()) { + LOG.debug("Testing if moving file: [" + file + "] to [" + + finalOutputPath + "] would cause a problem"); + } if (fs.exists(finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible."); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); } } else { -// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]"); + if(LOG.isDebugEnabled()) { + LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]"); + } if (!fs.rename(file, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); } if (!fs.rename(file, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath); } } } - } else if (fs.getFileStatus(file).isDir()) { - FileStatus[] paths = fs.listStatus(file); - Path finalOutputPath = getFinalPath(file, src, dest); - if (!dryRun) { - fs.mkdirs(finalOutputPath); - } - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(fs, path.getPath(), src, dest, dryRun); + } else if(fs.getFileStatus(file).isDir()) { + FileStatus[] children = fs.listStatus(file); + if (children != null && children.length > 0) { + FileStatus firstChild = children[0]; + if(firstChild.isDir()) { + // If the first child is directory, then rest would be directory too according to HCatalog dir structure + // recurse in that case + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + } + } else { + + + if (!dryRun) { + if (dynamicPartitioningUsed) { + // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself + // instead of moving each file under the directory. See HCATALOG-538 + + final Path parentDir = finalOutputPath.getParent(); + // Create the directory + fs.mkdirs(parentDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Moving directory: " + file + " to " + parentDir); + } + if (!fs.rename(file, parentDir)) { + final String msg = "Failed to move file: " + file + " to " + parentDir; + LOG.error(msg); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); + } + } else { + // In case of no partition we have to move each file + for (FileStatus child : children) { + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + } + } + } else { + if(fs.exists(finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); + } + } } } + } else { + // Should never happen + final String msg = "Unknown file type being asked to be moved, erroring out"; + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); } } @@ -603,12 +635,12 @@ Path src = new Path(ptnRootLocation); // check here for each dir we're copying out, to see if it // already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath,true); - moveTaskOutputs(fs, src, src, tblPath,false); + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); fs.delete(src, true); try { updateTableSchema(client, table, jobInfo.getOutputSchema()); - LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); + LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); client.add_partitions(partitionsToAdd); partitionsAdded = partitionsToAdd; } catch (Exception e){ @@ -624,17 +656,16 @@ }else{ // no harProcessor, regular operation - // No duplicate partition publish case to worry about because we'll - // get a AlreadyExistsException here if so, and appropriately rollback updateTableSchema(client, table, jobInfo.getOutputSchema()); - LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos); - client.add_partitions(partitionsToAdd); + LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); partitionsAdded = partitionsToAdd; if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath,false); + moveTaskOutputs(fs, src, src, tblPath, true); + moveTaskOutputs(fs, src, src, tblPath, false); fs.delete(src, true); } + client.add_partitions(partitionsToAdd); } } catch (Exception e) { if (partitionsAdded.size() > 0) { @@ -680,7 +711,7 @@ client.cancelDelegationToken(tokenStrForm); } } catch (MetaException e) { - LOG.warn("MetaException while cancelling delegation token.",e ); + LOG.warn("MetaException while cancelling delegation token.", e); } catch (TException e) { LOG.warn("TException while cancelling delegation token.", e); } finally {