diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 9e528b5..aa5d914 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -24,6 +24,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +61,9 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are * stats that require data scanning and are collected during query execution (unless the user @@ -146,7 +154,7 @@ private int aggregateStats(Hive db) { EnvironmentContext environmentContext = null; try { // Stats setup: - Warehouse wh = new Warehouse(conf); + final Warehouse wh = new Warehouse(conf); if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) { try { scc = getContext(); @@ -216,6 +224,57 @@ private int aggregateStats(Hive db) { // Need to get the old stats of the partition // and update the table stats based on the old and new stats. List updates = new ArrayList(); + + //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems + final Map fileStatusMap = new ConcurrentHashMap(); + int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1); + // In case thread count is set to 0, use single thread. + poolSize = Math.max(poolSize, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("stats-updater-thread-%d") + .build()); + final List> futures = Lists.newLinkedList(); + LOG.debug("Getting file stats of all partitions. threadpool size:" + poolSize); + try { + for(final Partition partn : partitions) { + final String partitionName = partn.getName(); + final org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition(); + Map parameters = tPart.getParameters(); + + if (!existStats(parameters) && atomic) { + continue; + } + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + FileStatus[] partfileStatus = wh.getFileStatusesForSD(tPart.getSd()); + fileStatusMap.put(partitionName, partfileStatus); + return null; + } + })); + } + pool.shutdown(); + for(Future future : futures) { + future.get(); + } + } catch (InterruptedException e) { + LOG.debug("Cancelling " + futures.size() + " file stats lookup tasks"); + //cancel other futures + for (Future future : futures) { + future.cancel(true); + } + // Fail the query if the stats are supposed to be reliable + if (work.isStatsReliable()) { + ret = 1; + } + } finally { + if (pool != null) { + pool.shutdownNow(); + } + LOG.debug("Finished getting file stats of all partitions"); + } + for (Partition partn : partitions) { // // get the old partition stats @@ -230,7 +289,8 @@ private int aggregateStats(Hive db) { .getDestinationCreateTable().isEmpty())) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); } - if (!existStats(parameters) && atomic) { + //only when the stats exist, it is added to fileStatusMap + if (!fileStatusMap.containsKey(partn.getName())) { continue; } @@ -241,7 +301,7 @@ private int aggregateStats(Hive db) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } - updateQuickStats(wh, parameters, tPart.getSd()); + updateQuickStats(parameters, fileStatusMap.get(partn.getName())); if (StatsSetupConst.areBasicStatsUptoDate(parameters)) { if (statsAggregator != null) { String prefix = getAggregationPrefix(table, partn); @@ -371,6 +431,11 @@ private void updateQuickStats(Warehouse wh, Map parameters, * calculate fast statistics */ FileStatus[] partfileStatus = wh.getFileStatusesForSD(desc); + updateQuickStats(parameters, partfileStatus); + } + + private void updateQuickStats(Map parameters, + FileStatus[] partfileStatus) throws MetaException { MetaStoreUtils.populateQuickStats(partfileStatus, parameters); }