diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java index 6a85936..44106f5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java @@ -55,7 +55,7 @@ // Run the cleaner thread until cache is cleanUntil% occupied private final float cleanUntil; // Nodes go stale after this - private final long timeToLive; + private final long timeToLiveMs; // Max time when waiting for write locks on node list private final long maxWriterWaitTime; // Max time when waiting for read locks on node list @@ -73,12 +73,12 @@ // To track cleaner metrics int numRemovedTTL = 0, numRemovedLRU = 0; - private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLive, + private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLiveMs, float falsePositiveProbability, float maxVariance, long maxWriterWaitTime, long maxReaderWaitTime, float maxFull, float cleanUntil) { this.maxCacheNodes = maxCacheNodes; this.maxPartsPerCacheNode = maxPartsPerCacheNode; - this.timeToLive = timeToLive; + this.timeToLiveMs = timeToLiveMs; this.falsePositiveProbability = falsePositiveProbability; this.maxVariance = maxVariance; this.maxWriterWaitTime = maxWriterWaitTime; @@ -97,9 +97,9 @@ public static synchronized AggregateStatsCache getInstance(Configuration conf) { int maxPartitionsPerCacheNode = HiveConf .getIntVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS); - long timeToLive = + long timeToLiveMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL, - TimeUnit.SECONDS); + TimeUnit.SECONDS)*1000; // False positives probability we are ready to tolerate for the underlying bloom filter float falsePositiveProbability = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_FPP); @@ -120,7 +120,7 @@ public static synchronized AggregateStatsCache getInstance(Configuration conf) { float cleanUntil = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL); self = - new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLive, + new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLiveMs, falsePositiveProbability, maxVariance, maxWriterWaitTime, maxReaderWaitTime, maxFull, cleanUntil); } @@ -213,7 +213,7 @@ public AggrColStats get(String dbName, String tblName, String colName, List partNames, List candidates) { - // Hits, misses, shouldSkip for a node + // Hits, misses tracked for a candidate node MatchStats matchStats; // MatchStats for each candidate Map candidateMatchStats = new HashMap(); @@ -227,26 +227,23 @@ private AggrColStats findBestMatch(List partNames, List ca // Note: we're not creating a copy of the list for saving memory for (AggrColStats candidate : candidates) { // Variance check - if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) - / numPartsRequested) > maxVariance) { + if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) / numPartsRequested) + > maxVariance) { continue; } // TTL check if (isExpired(candidate)) { continue; - } - else { + } else { candidateMatchStats.put(candidate, new MatchStats(0, 0)); } } // We'll count misses as we iterate int maxMisses = (int) maxVariance * numPartsRequested; for (String partName : partNames) { - for (AggrColStats candidate : candidates) { - matchStats = candidateMatchStats.get(candidate); - if (matchStats == null) { - continue; - } + for (Map.Entry entry : candidateMatchStats.entrySet()) { + AggrColStats candidate = entry.getKey(); + matchStats = entry.getValue(); if (candidate.getBloomFilter().test(partName.getBytes())) { ++matchStats.hits; } else { @@ -464,7 +461,7 @@ private void evictOneNode() { } private boolean isExpired(AggrColStats aggrColStats) { - return System.currentTimeMillis() - aggrColStats.lastAccessTime > timeToLive; + return (System.currentTimeMillis() - aggrColStats.lastAccessTime) > timeToLiveMs; } /** @@ -502,7 +499,7 @@ public int hashCode() { @Override public String toString() { - return "Database: " + dbName + ", Table: " + tblName + ", Column: " + colName; + return "database:" + dbName + ", table:" + tblName + ", column:" + colName; } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 5ef3b9a..8bee978 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1106,24 +1106,23 @@ public AggrStats aggrColStatsForPartitions(String dbName, String tableName, if (isAggregateStatsCacheEnabled) { AggrColStats colStatsAggrCached; List colStatsAggrFromDB; - int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); - float falsePositiveProbability = aggrStatsCache.getFalsePositiveProbability(); + int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); + float fpp = aggrStatsCache.getFalsePositiveProbability(); int partitionsRequested = partNames.size(); - if (partitionsRequested > maxPartitionsPerCacheNode) { + if (partitionsRequested > maxPartsPerCacheNode) { colStatsList = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound, useDensityFunctionForNDVEstimation); } else { colStatsList = new ArrayList(); + // Bloom filter for the new node that we will eventually add to the cache + BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames); for (String colName : colNames) { // Check the cache first colStatsAggrCached = aggrStatsCache.get(dbName, tableName, colName, partNames); if (colStatsAggrCached != null) { colStatsList.add(colStatsAggrCached.getColStats()); } else { - // Bloom filter for the new node that we will eventually add to the cache - BloomFilter bloomFilter = - new BloomFilter(maxPartitionsPerCacheNode, falsePositiveProbability); List colNamesForDB = new ArrayList(); colNamesForDB.add(colName); // Read aggregated stats for one column @@ -1148,6 +1147,15 @@ public AggrStats aggrColStatsForPartitions(String dbName, String tableName, return new AggrStats(colStatsList, partsFound); } + private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, float fpp, + List partNames) { + BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp); + for (String partName : partNames) { + bloomFilter.add(partName.getBytes()); + } + return bloomFilter; + } + private long partsFoundForPartitions(String dbName, String tableName, List partNames, List colNames) throws MetaException { long partsFound = 0; @@ -1174,8 +1182,8 @@ private long partsFoundForPartitions(String dbName, String tableName, } private List columnStatisticsObjForPartitions(String dbName, - String tableName, List partNames, List colNames, long partsFound, boolean useDensityFunctionForNDVEstimation) - throws MetaException { + String tableName, List partNames, List colNames, long partsFound, + boolean useDensityFunctionForNDVEstimation) throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index ca1eae6..6c9efba 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -93,7 +93,6 @@ public void run() { // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); - server.serve(); String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg);