diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7cc15e2..70816bd 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -571,7 +571,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " + "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " + "undefined and most likely undesired behavior will result"), - METASTORE_FS_HANDLER_THREADS_COUNT("hive.metastore.fshandler.threads", 20, + METASTORE_FS_HANDLER_THREADS_COUNT("hive.metastore.fshandler.threads", 15, "Number of threads to be allocated for metastore handler for fs operations."), METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " + "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " + diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 2a9dc11..0ff2447 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.stats; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; @@ -27,7 +28,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -588,18 +596,42 @@ public static long getFileSizeForTable(HiveConf conf, Table table) { * - partition list * @return sizes of partitions */ - public static List getFileSizeForPartitions(HiveConf conf, List parts) { - List sizes = Lists.newArrayList(); - for (Partition part : parts) { - Path path = part.getDataLocation(); - long size = 0; - try { - FileSystem fs = path.getFileSystem(conf); - size = fs.getContentSummary(path).getLength(); - } catch (Exception e) { - size = 0; + public static List getFileSizeForPartitions(final HiveConf conf, List parts) { + LOG.info("Number of partitions : " + parts.size()); + ArrayList> futures = new ArrayList<>(); + + int threads = Math.max(1, conf.getIntVar(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT)); + final ExecutorService pool = Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Get-Partitions-Size-%d") + .build()); + + final ArrayList sizes = new ArrayList<>(parts.size()); + for (final Partition part : parts) { + final Path path = part.getDataLocation(); + futures.add(pool.submit(new Callable() { + @Override + public Long call() throws Exception { + try { + LOG.debug("Partition path : " + path); + FileSystem fs = path.getFileSystem(conf); + return fs.getContentSummary(path).getLength(); + } catch (IOException e) { + return 0L; + } + } + })); + } + + try { + for(int i = 0; i < futures.size(); i++) { + sizes.add(i, futures.get(i).get()); } - sizes.add(size); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Exception in processing files ", e); + } finally { + pool.shutdownNow(); } return sizes; }