commit a690304613801697ce381e84d2277c03cabfd1e4 Author: Ashutosh Chauhan Date: Tue Jun 28 21:58:36 2016 -0700 Parallelize job close for both tables and partitions. Movetask parallel diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index d98ea84..105de94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -22,8 +22,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; @@ -31,6 +35,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -48,6 +53,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Table Scan Operator If the data is coming from the map-reduce framework, just * forward it. This will be needed as part of local work when data is not being @@ -253,6 +260,43 @@ public void closeOp(boolean abort) throws HiveException { } } + @Override + public void jobCloseOp(final Configuration conf, final boolean success) + throws HiveException { + List> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JobClose-Thread-%d").build()) : null; + for (final Operator op : childOperators) { + if (null == pool) { + op.jobClose(conf, success); + } else { + // If query is multi-file insert there will be multiple FS ops. + // By doing this we are calling jobClose on all those operators in parallel. + futures.add(pool.submit(new Callable() { + + @Override + public Void call() throws Exception { + op.jobClose(conf, success); + return null; + } + })); + } + } + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } + } + /** * The operator name for this operator type. This is used to construct the * rule for an operator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fca00dc..18709ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -69,8 +69,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -102,6 +105,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -209,6 +213,7 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Utilities. @@ -1766,7 +1771,7 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, * the target directory * @throws IOException */ - public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, + public static void renameOrMoveFiles(final FileSystem fs, Path src, final Path dst) throws IOException, HiveException { if (!fs.exists(dst)) { if (!fs.rename(src, dst)) { @@ -1989,27 +1994,68 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI } } - public static void mvFileToFinalPath(Path specPath, Configuration hconf, - boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, + public static void mvFileToFinalPath(final Path specPath,final Configuration hconf, + boolean success,final Log log, final DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - FileSystem fs = specPath.getFileSystem(hconf); + final FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files - ArrayList emptyBuckets = + ObjectPair, FileStatus[]> result = Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf); // create empty buckets if necessary + ArrayList emptyBuckets = result.getFirst(); if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); } + List> futures = new LinkedList<>(); + final ExecutorService pool = + result.getSecond() != null && hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JobClose-FS-Thread-%d").build()) : null; + // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + + if (null != pool) { + fs.mkdirs(specPath); + // Instead of single rename, we are making multiple calls on FS. This is unintuitive + // since we are making more calls then necessary, but on some FS (mostly cloud) rename + // on dir is copy Files + delete. By using thread pool we are renaming underneath dirs in parallel. + + for (final FileStatus status : result.getSecond()) { + + futures.add(pool.submit(new Callable() { + + @Override + public Void call() throws Exception { + LinkedHashMap fullPartSpec = new LinkedHashMap(dpCtx.getPartSpec()); + Warehouse.makeSpecFromName(fullPartSpec, status.getPath()); + Path dest = new Path(specPath,Warehouse.makePartPath(fullPartSpec)); + log.info("Moving tmp dir: " + status.getPath() + " to: " + dest); + Utilities.renameOrMoveFiles(fs, status.getPath(), dest); + return null; + } + })); + + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } else { + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + } } } else { fs.delete(tmpPath, true); @@ -2078,16 +2124,17 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, + public static ObjectPair, FileStatus[]> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { if (path == null) { return null; } ArrayList result = new ArrayList(); + FileStatus parts[] = null; HashMap taskIDToFile = null; if (dpCtx != null) { - FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); + parts = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() @@ -2141,7 +2188,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } - return result; + return new ObjectPair<>(result,parts); } public static HashMap removeTempOrDuplicateFiles(FileStatus[] items,