From f27f2514036cd775c29f949a98dc8cc3dbb7e066 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Tue, 2 Feb 2016 18:03:44 -0800 Subject: [PATCH] HIVE-12988 : Improve dynamic partition loading IV --- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 176 ++++++++++++++------- 1 file changed, 117 insertions(+), 59 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 75d2519..b59b7da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -33,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -42,9 +43,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -119,6 +124,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatus; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.security.UserGroupInformation; @@ -128,6 +134,9 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * This class has functions that implement meta data/DDL operations using calls @@ -1483,7 +1492,7 @@ public Partition loadPartition(Path loadPath, Table tbl, isSrcLocal); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { - newFiles = new ArrayList<>(); + newFiles = Collections.synchronizedList(new ArrayList()); } FileSystem fs = tbl.getDataLocation().getFileSystem(conf); @@ -1730,7 +1739,7 @@ private void constructOneLBLocationMap(FileStatus fSta, public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid) throws HiveException { - List newFiles = new ArrayList(); + List newFiles = Collections.synchronizedList(new ArrayList()); Table tbl = getTable(tableName); HiveConf sessionConf = SessionState.getSessionConf(); if (replace) { @@ -2560,31 +2569,32 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, } } - // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest - private static List> checkPaths(HiveConf conf, FileSystem fs, - FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace) - throws HiveException { + private static void copyFiles(final HiveConf conf, final FileSystem fs, + FileStatus[] srcs, FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List newFiles) + throws HiveException { - List> result = new ArrayList>(); try { - FileStatus destStatus = !replace ? FileUtils.getFileStatusOrNull(fs, destf) : null; - if (destStatus != null && !destStatus.isDir()) { + FileStatus destStatus = FileUtils.getFileStatusOrNull(fs, destf); + if (destStatus != null && !destStatus.isDirectory()) { throw new HiveException("checkPaths: destination " + destf + " should be a directory"); } + HadoopShims.HdfsFileStatus fullDestStatus = ShimLoader.getHadoopShims().getFullFileStatus(conf, fs, destf); + boolean inheritPerms = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + for (FileStatus src : srcs) { FileStatus[] items; - if (src.isDir()) { + if (src.isDirectory()) { items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - Arrays.sort(items); } else { items = new FileStatus[] {src}; } - List srcToDest = new ArrayList(); for (FileStatus item : items) { - Path itemSource = item.getPath(); + final Path itemSource = item.getPath(); if (Utilities.isTempPath(item)) { // This check is redundant because temp files are removed by @@ -2594,41 +2604,41 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, continue; } - Path itemDest = new Path(destf, itemSource.getName()); - - if (!replace) { - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - String name = itemSource.getName(); - String filetype; - int index = name.lastIndexOf('.'); - if (index >= 0) { - filetype = name.substring(index); - name = name.substring(0, index); - } else { - filetype = ""; - } - // It's possible that the file we're copying may have the same - // relative name as an existing file in the "destf" directory. - // So let's make a quick check to see if we can rename any - // potential offenders so as to allow them to move into the - // "destf" directory. The scheme is dead simple: simply tack - // on "_copy_N" where N starts at 1 and works its way up until - // we find a free space. - - // removed source file staging.. it's more confusing when failed. - for (int counter = 1; fs.exists(itemDest) || destExists(result, itemDest); counter++) { - itemDest = new Path(destf, name + ("_copy_" + counter) + filetype); - } + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + final String name; + final String filetype; + String itemName = itemSource.getName(); + int index = itemName.lastIndexOf('.'); + if (index >= 0) { + filetype = itemName.substring(index); + name = itemName.substring(0, index); + } else { + name = itemName; + filetype = ""; } - srcToDest.add(new Path[]{itemSource, itemDest}); + final ListenableFuture future = pool.submit(new Callable() { + @Override + public Path call() throws Exception { + Path itemDest = new Path(destf, itemSource.getName()); + for (int counter = 1; fs.exists(itemDest); counter++) { + itemDest = new Path(destf, name + ("_copy_" + counter) + filetype); + } + if (null != newFiles) { + newFiles.add(itemDest); + } + return itemDest; + } + }); + future.addListener(new MvFile(conf, itemSource, destf, isSrcLocal, srcFs, fs, fullDestStatus, inheritPerms), + MoreExecutors.sameThreadExecutor()); } - result.add(srcToDest); } - } catch (IOException e) { + pool.shutdown(); + pool.awaitTermination(3, TimeUnit.HOURS); + } catch (Exception e) { throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e); } - return result; } private static boolean destExists(List> result, Path proposed) { @@ -2685,6 +2695,69 @@ private static String getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileS return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path).toString(); } + static class MvFile implements Runnable { + + public MvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLocal, + FileSystem srcFs, FileSystem destFs, HdfsFileStatus destStatus, + boolean inheritPerms) { + super(); + this.conf = conf; + this.srcf = srcf; + this.destf = destf; + this.isSrcLocal = isSrcLocal; + this.destFs = destFs; + this.srcFs = srcFs; + this.destStatus = destStatus; + this.inheritPerms = inheritPerms; + } + + HiveConf conf; + Path srcf; + Path destf; + boolean isSrcLocal; + FileSystem destFs; + FileSystem srcFs; + HadoopShims.HdfsFileStatus destStatus; + boolean inheritPerms; + + @Override + public void run() { + boolean success = false; + try { + if (isSrcLocal) { + // For local src file, copy to hdfs + destFs.copyFromLocalFile(srcf, destf); + success = true; + } else { + if (needToCopy(srcf, destf, srcFs, destFs)) { + //copy if across file system or encryption zones. + LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); + success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, + true, // delete source + false, // overwrite destination + conf); + } else { + success = destFs.rename(srcf, destf); + } + } + + LOG.info("Renaming src: " + srcf.toString() + + ", dest: " + destf.toString() + ", Status:" + success); + } catch (Exception ioe) { + LOG.error("Unable to move source " + srcf + " to destination " + destf, ioe); + } + + if (success && inheritPerms && !isSrcLocal) { + try { + ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, destFs, destf); + } catch (IOException e) { + LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); + } + } + //return success; + } + } + //it is assumed that parent directory of the destf should already exist when this //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under @@ -2867,22 +2940,7 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, if (isAcid) { moveAcidFiles(srcFs, srcs, destf, newFiles); } else { - // check that source and target paths exist - List> result = checkPaths(conf, fs, srcs, srcFs, destf, false); - // move it, move it - try { - for (List sdpairs : result) { - for (Path[] sdpair : sdpairs) { - if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) { - throw new IOException("Cannot move " + sdpair[0] + " to " - + sdpair[1]); - } - if (newFiles != null) newFiles.add(sdpair[1]); - } - } - } catch (IOException e) { - throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e); - } + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); } } -- 1.7.12.4 (Apple Git-37)