diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 9e528b5..51bafc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -24,7 +24,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -143,10 +149,10 @@ private int aggregateStats(Hive db) { StatsAggregator statsAggregator = null; int ret = 0; StatsCollectionContext scc = null; - EnvironmentContext environmentContext = null; + final EnvironmentContext environmentContext = new EnvironmentContext();; try { // Stats setup: - Warehouse wh = new Warehouse(conf); + final Warehouse wh = new Warehouse(conf); if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) { try { scc = getContext(); @@ -160,9 +166,8 @@ private int aggregateStats(Hive db) { } List partitions = getPartitionsList(db); - boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC); - - String tableFullName = table.getDbName() + "." + table.getTableName(); + final boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC); + final String tableFullName = table.getDbName() + "." + table.getTableName(); if (partitions == null) { org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable(); @@ -200,7 +205,6 @@ private int aggregateStats(Hive db) { } // write table stats to metastore if (!getWork().getNoStatsAggregator()) { - environmentContext = new EnvironmentContext(); environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK); } @@ -212,61 +216,87 @@ private int aggregateStats(Hive db) { } LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']'); } else { + int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("stats-updater-thread-%d") + .build()); + final List> futures = Lists.newLinkedList(); + final StatsAggregator statsAgg = statsAggregator; + // Partitioned table: // Need to get the old stats of the partition // and update the table stats based on the old and new stats. - List updates = new ArrayList(); - for (Partition partn : partitions) { - // - // get the old partition stats - // - org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition(); - Map parameters = tPart.getParameters(); - if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { - StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); - } else if (work.getTableSpecs() != null - || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) - || (work.getLoadFileDesc() != null && !work.getLoadFileDesc() - .getDestinationCreateTable().isEmpty())) { - StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); - } - if (!existStats(parameters) && atomic) { - continue; - } - - // The collectable stats for the aggregator needs to be cleared. - // For eg. if a file is being loaded, the old number of rows are not valid - if (work.isClearAggregatorStats()) { - // we choose to keep the invalid stats and only change the setting. - StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); + final List updates = new ArrayList(); + try { + for (final Partition partn : partitions) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + // + // get the old partition stats + // + org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition(); + Map parameters = tPart.getParameters(); + if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { + StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); + } else if (work.getTableSpecs() != null + || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) + || (work.getLoadFileDesc() != null && !work.getLoadFileDesc() + .getDestinationCreateTable().isEmpty())) { + StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); + } + if (!existStats(parameters) && atomic) { + return null; + } + + // The collectable stats for the aggregator needs to be cleared. + // For eg. if a file is being loaded, the old number of rows are not valid + if (work.isClearAggregatorStats()) { + // we choose to keep the invalid stats and only change the setting. + StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); + } + + updateQuickStats(wh, parameters, tPart.getSd()); + if (StatsSetupConst.areBasicStatsUptoDate(parameters)) { + if (statsAgg != null) { + String prefix = getAggregationPrefix(table, partn); + updateStats(statsAgg, parameters, prefix, atomic); + } + if (!getWork().getNoStatsAggregator()) { + environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, + StatsSetupConst.TASK); + } + } + updates.add(new Partition(table, tPart)); + + if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) { + console.printInfo("Partition " + tableFullName + partn.getSpec() + + " stats: [" + StatsTask.toString(parameters) + ']'); + } + LOG.info("Partition " + tableFullName + partn.getSpec() + + " stats: [" + StatsTask.toString(parameters) + ']'); + return null; + } + })); } + pool.shutdown(); - updateQuickStats(wh, parameters, tPart.getSd()); - if (StatsSetupConst.areBasicStatsUptoDate(parameters)) { - if (statsAggregator != null) { - String prefix = getAggregationPrefix(table, partn); - updateStats(statsAggregator, parameters, prefix, atomic); - } - if (!getWork().getNoStatsAggregator()) { - environmentContext = new EnvironmentContext(); - environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, - StatsSetupConst.TASK); - } + for (Future future : futures) { + future.get(); } - updates.add(new Partition(table, tPart)); - - if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) { - console.printInfo("Partition " + tableFullName + partn.getSpec() + - " stats: [" + toString(parameters) + ']'); + } catch (InterruptedException e) { + LOG.debug("Cancelling " + futures.size() + " for partition stats update"); + //cancel other futures + for (Future future : futures) { + future.cancel(true); } - LOG.info("Partition " + tableFullName + partn.getSpec() + - " stats: [" + toString(parameters) + ']'); } if (!updates.isEmpty()) { - db.alterPartitions(tableFullName, updates, environmentContext); + db.alterPartitions(tableFullName, updates, environmentContext); + } } - } - } catch (Exception e) { console.printInfo("[Warning] could not update stats.", "Failed with exception " + e.getMessage() + "\n" @@ -374,7 +404,7 @@ private void updateQuickStats(Warehouse wh, Map parameters, MetaStoreUtils.populateQuickStats(partfileStatus, parameters); } - private String toString(Map parameters) { + static String toString(Map parameters) { StringBuilder builder = new StringBuilder(); for (String statType : StatsSetupConst.supportedStats) { String value = parameters.get(statType);