From 86717b60147190f70e0d8940b6d99fe1cfb60f64 Mon Sep 17 00:00:00 2001 From: Ashutosh Chauhan Date: Fri, 8 Jul 2016 17:33:53 -0700 Subject: [PATCH] HIVE-14128 : Parallelize jobClose phases --- .../hadoop/hive/ql/exec/TableScanOperator.java | 45 ++++++++++++++ .../org/apache/hadoop/hive/ql/exec/Utilities.java | 70 ++++++++++++++++++---- 2 files changed, 104 insertions(+), 11 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 6afe957..0933ef7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -21,14 +21,20 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -48,6 +54,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Table Scan Operator If the data is coming from the map-reduce framework, just * forward it. This will be needed as part of local work when data is not being @@ -269,6 +277,43 @@ public void closeOp(boolean abort) throws HiveException { } } + @Override + public void jobCloseOp(final Configuration conf, final boolean success) + throws HiveException { + List> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JobClose-Thread-%d").build()) : null; + for (final Operator op : childOperators) { + if (null == pool) { + op.jobClose(conf, success); + } else { + // If query is multi-file insert there will be multiple FS ops. + // By doing this we are calling jobClose on all those operators in parallel. + futures.add(pool.submit(new Callable() { + + @Override + public Void call() throws Exception { + op.jobClose(conf, success); + return null; + } + })); + } + } + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } + } + /** * The operator name for this operator type. This is used to construct the * rule for an operator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 12a929a..831f8a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -64,8 +66,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -95,6 +100,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -1129,7 +1135,7 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, * the target directory * @throws IOException */ - public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, + public static void renameOrMoveFiles(final FileSystem fs, Path src, final Path dst) throws IOException, HiveException { if (!fs.exists(dst)) { if (!fs.rename(src, dst)) { @@ -1376,27 +1382,68 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI } } - public static void mvFileToFinalPath(Path specPath, Configuration hconf, - boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, + public static void mvFileToFinalPath(final Path specPath,final Configuration hconf, + boolean success, final Logger log,final DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, - HiveException { + HiveException { - FileSystem fs = specPath.getFileSystem(hconf); + final FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files - ArrayList emptyBuckets = + ObjectPair, FileStatus[]> result = Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf); + ArrayList emptyBuckets = result.getFirst(); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); } // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + List> futures = new LinkedList<>(); + final ExecutorService pool = + result.getSecond() != null && hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JobClose-FS-Thread-%d").build()) : null; + + if (null != pool) { + fs.mkdirs(specPath); + // Instead of single rename, we are making multiple calls on FS. This is unintuitive + // since we are making more calls then necessary, but on some FS (mostly cloud) rename + // on dir is copy Files + delete. By using thread pool we are renaming underneath dirs in parallel. + + for (final FileStatus status : result.getSecond()) { + + futures.add(pool.submit(new Callable() { + + @Override + public Void call() throws Exception { + LinkedHashMap fullPartSpec = new LinkedHashMap(dpCtx.getPartSpec()); + Warehouse.makeSpecFromName(fullPartSpec, status.getPath()); + Path dest = new Path(specPath,Warehouse.makePartPath(fullPartSpec)); + log.info("Moving tmp dir: " + status.getPath() + " to: " + dest); + Utilities.renameOrMoveFiles(fs, status.getPath(), dest); + return null; + } + })); + + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } else { + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + } + } } else { fs.delete(tmpPath, true); @@ -1465,16 +1512,17 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, + public static ObjectPair, FileStatus[]> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { if (path == null) { return null; } ArrayList result = new ArrayList(); + FileStatus parts[] = null; HashMap taskIDToFile = null; if (dpCtx != null) { - FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); + parts = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() @@ -1527,7 +1575,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } - return result; + return new ObjectPair<>(result,parts); } public static HashMap removeTempOrDuplicateFiles(FileStatus[] items, -- 1.7.12.4 (Apple Git-37)