diff --git common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java index 7c9d72f..8ace205 100644 --- common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java +++ common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java @@ -75,6 +75,7 @@ sb.append(Path.SEPARATOR).append("*"); } Path pathPattern = new Path(path, sb.toString()); + LOG.debug("Getting globStatus for " + pathPattern); return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 6afe957..0933ef7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -21,14 +21,20 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -48,6 +54,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Table Scan Operator If the data is coming from the map-reduce framework, just * forward it. This will be needed as part of local work when data is not being @@ -269,6 +277,43 @@ public void closeOp(boolean abort) throws HiveException { } } + @Override + public void jobCloseOp(final Configuration conf, final boolean success) + throws HiveException { + List> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("JobClose-Thread-%d").build()) : null; + for (final Operator op : childOperators) { + if (null == pool) { + op.jobClose(conf, success); + } else { + // If query is multi-file insert there will be multiple FS ops. + // By doing this we are calling jobClose on all those operators in parallel. + futures.add(pool.submit(new Callable() { + + @Override + public Void call() throws Exception { + op.jobClose(conf, success); + return null; + } + })); + } + } + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } + } + /** * The operator name for this operator type. This is used to construct the * rule for an operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fd25978..c177f5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -20,6 +20,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -35,7 +37,6 @@ import java.io.OutputStream; import java.io.Serializable; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; import java.net.URLDecoder; @@ -50,7 +51,6 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Collection; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -63,8 +63,11 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -93,12 +96,14 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; @@ -1132,7 +1137,7 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, * the target directory * @throws IOException */ - public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, + public static void renameOrMoveFiles(final FileSystem fs, Path src, final Path dst) throws IOException, HiveException { if (!fs.exists(dst)) { if (!fs.rename(src, dst)) { @@ -1379,12 +1384,12 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI } } - public static void mvFileToFinalPath(Path specPath, Configuration hconf, - boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, + public static void mvFileToFinalPath(final Path specPath,final Configuration hconf, + boolean success, final Logger log,final DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - FileSystem fs = specPath.getFileSystem(hconf); + final FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { @@ -1392,15 +1397,72 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); if(statuses != null && statuses.length > 0) { // remove any tmp file or double-committed output files - List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf); + ObjectPair, FileStatus[]> result = + Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf); + List emptyBuckets = result.getFirst(); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); } // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + List> futures = new LinkedList<>(); + final ExecutorService pool = + hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15) > 0 ? + Executors.newFixedThreadPool(hconf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("JobClose-FS-Thread-%d") + .build()) : null; + + if (null != pool) { + fs.mkdirs(specPath); + // Instead of single rename, we are making multiple calls on FS. This is non-intuitive + // since we are making more calls then necessary, but on some FS (mostly cloud) rename + // on dir is copy Files + delete. By using thread pool we are renaming underneath dirs in parallel. + FileStatus[] stats = result.getSecond(); + if (stats != null) { + for (final FileStatus status : stats) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + LinkedHashMap fullPartSpec = + new LinkedHashMap(dpCtx.getPartSpec()); + Warehouse.makeSpecFromName(fullPartSpec, status.getPath()); + Path dest = new Path(specPath, Warehouse.makePartPath(fullPartSpec)); + log.info("Moving src:" + status.getPath() + " to: " + dest); + Utilities.renameOrMoveFiles(fs, status.getPath(), dest); + return null; + } + })); + } + } else { + //Base directory might have multiple files. If so move them in parallel. + //Again, this is non-intuitive, but on FS like cloud this would be beneficial + for(final FileStatus stat : fs.listStatus(tmpPath)) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + log.info("Moving src file:" + stat.getPath() + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, stat.getPath(), specPath); + return null; + } + })); + } + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + } else { + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + } } } else { fs.delete(tmpPath, true); @@ -1470,7 +1532,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); - return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + ObjectPair, FileStatus[]> result = + removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + return result.getFirst(); } /** @@ -1478,17 +1542,17 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, + public static ObjectPair, FileStatus[]> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { if (fileStats == null) { return null; } List result = new ArrayList(); + FileStatus parts[] = null; HashMap taskIDToFile = null; if (dpCtx != null) { - FileStatus parts[] = fileStats; - + parts = fileStats; for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() + " is not a directory"; @@ -1506,8 +1570,6 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I taskIDToFile = removeTempOrDuplicateFiles(items, fs); // if the table is bucketed and enforce bucketing, we should check and generate all buckets if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { - // refresh the file list - items = fs.listStatus(parts[i].getPath()); // get the missing buckets and generate empty buckets String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); @@ -1525,7 +1587,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } else { FileStatus[] items = fileStats; if (items.length == 0) { - return result; + return new ObjectPair<>(result,parts); } taskIDToFile = removeTempOrDuplicateFiles(items, fs); if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null @@ -1544,8 +1606,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } } - - return result; + return new ObjectPair<>(result,parts); } public static HashMap removeTempOrDuplicateFiles(FileStatus[] items,