diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 1d072ad287..42d2ac03cf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -26,10 +26,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.EmptyStackException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -135,6 +137,7 @@ // Time after which metastore cache is updated from metastore DB by the background update thread private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD; private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false); + private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -230,78 +233,86 @@ static void prewarm(RawStore rawStore) { // Continue with next database continue; } + tblsPendingPrewarm.addTableNamesForPrewarming(tblNames); + int totalTablesToCache = tblNames.size(); int numberOfTablesCachedSoFar = 0; - for (String tblName : tblNames) { - tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(dbName, tblName)) { - continue; - } - Table table; + while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) { try { - table = rawStore.getTable(dbName, tblName); - } catch (MetaException e) { - // It is possible the table is deleted during fetching tables of the database, - // in that case, continue with the next table - continue; - } - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - try { - ColumnStatistics tableColStats = null; - List partitions = null; - List partitionColStats = null; - AggrStats aggrStatsAllPartitions = null; - AggrStats aggrStatsAllButDefaultPartition = null; - if (table.isSetPartitionKeys()) { - Deadline.startTimer("getPartitions"); - partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - List partNames = new ArrayList<>(partitions.size()); - for (Partition p : partitions) { - partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); - } - if (!partNames.isEmpty()) { - // Get partition column stats for this table - Deadline.startTimer("getPartitionColumnStatistics"); - partitionColStats = - rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Get aggregate stats for all partitions of a table and for all but default - // partition - Deadline.startTimer("getAggrPartitionColumnStatistics"); - aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + String tblName = + StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm()); + if (!shouldCacheTable(dbName, tblName)) { + continue; + } + Table table; + try { + table = rawStore.getTable(dbName, tblName); + } catch (MetaException e) { + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + continue; + } + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + try { + ColumnStatistics tableColStats = null; + List partitions = null; + List partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; + if (table.isSetPartitionKeys()) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate - // stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList<>(); - List partVals = new ArrayList<>(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); + List partNames = new ArrayList<>(partitions.size()); + for (Partition p : partitions) { + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); + } + if (!partNames.isEmpty()) { + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = + rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default + // partition + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList<>(); + List partVals = new ArrayList<>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggrPartitionColumnStatistics"); - aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } else { + Deadline.startTimer("getTableColumnStatistics"); + tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); } - } else { - Deadline.startTimer("getTableColumnStatistics"); - tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); - Deadline.stopTimer(); + sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + } catch (MetaException | NoSuchObjectException e) { + // Continue with next table + continue; } - sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, - aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); - } catch (MetaException | NoSuchObjectException e) { - // Continue with next table + LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, ++numberOfTablesCachedSoFar, totalTablesToCache); + } catch (EmptyStackException e) { + // We've prewarmed this database, continue with the next one continue; } - LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, - tblName, ++numberOfTablesCachedSoFar, tblNames.size()); } LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, ++numberOfDatabasesCachedSoFar, dbNames.size()); @@ -314,6 +325,32 @@ static void prewarm(RawStore rawStore) { sharedCache.completeTableCachePrewarm(); } + static class TablesPendingPrewarm { + private Stack tableNames = new Stack<>(); + + private synchronized void addTableNamesForPrewarming(List tblNames) { + tableNames.clear(); + if (tblNames != null) { + tableNames.addAll(tblNames); + } + } + + private synchronized boolean hasMoreTablesToPrewarm() { + return !tableNames.empty(); + } + + private synchronized String getNextTableNameToPrewarm() { + return tableNames.pop(); + } + + private synchronized void prioritizeTableForPrewarm(String tblName) { + // If the table is in the pending prewarm list, move it to the top + if (tableNames.remove(tblName)) { + tableNames.push(tblName); + } + } + } + @VisibleForTesting static void setCachePrewarmedState(boolean state) { isCachePrewarmed.set(state); @@ -738,6 +775,10 @@ public Table getTable(String dbName, String tblName) throws MetaException { Table tbl = sharedCache.getTableFromCache(dbName, tblName); if (tbl == null) { // This table is not yet loaded in cache + // If the prewarm thread is working on this table's database, + // let's move this table to the top of tblNamesBeingPrewarmed stack, + // so that it gets loaded to the cache faster and is available for subsequent requests + tblsPendingPrewarm.prioritizeTableForPrewarm(tblName); return rawStore.getTable(dbName, tblName); } if (tbl != null) {