diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 829d5b7..3fd4310 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -377,12 +377,25 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " + "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " + "undefined and most likely undesired behavior will result"), - METASTORE_HBASE_CACHE_SIZE("hive.metastore.hbase.cache.size", 100000, "Maximum number of " + - "objects we will place in the hbase metastore cache. The objects will be divided up by " + + METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " + + "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " + "types that we need to cache."), - METASTORE_HBASE_CACHE_TIME_TO_LIVE("hive.metastore.hbase.cache.ttl", "600s", - new TimeValidator(TimeUnit.SECONDS), + METASTORE_HBASE_AGGREGATE_STATS_CACHE_SIZE("hive.metastore.hbase.aggregate.stats.cache.size", 10000, + "Maximum number of aggregate stats nodes that we will place in the hbase metastore aggregate stats cache."), + METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS("hive.metastore.hbase.aggregate.stats.max.partitions", 10000, + "Maximum number of partitions that are aggregated per cache node."), + METASTORE_HBASE_AGGREGATE_STATS_CACHE_FALSE_POSITIVE_PROBABILITY("hive.metastore.hbase.aggregate.stats.false.positive.probability", + (float) 0.01, "Maximum false positive probability for the Bloom Filter used in each aggregate stats cache node (default 1%)."), + METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_VARIANCE("hive.metastore.hbase.aggregate.stats.max.variance", (float) 0.1, + "Maximum tolerable variance in number of partitions between a cached node and our request (default 10%)."), + METASTORE_HBASE_CACHE_TIME_TO_LIVE("hive.metastore.hbase.cache.ttl", "600s", new TimeValidator(TimeUnit.SECONDS), "Number of seconds for stats items to live in the cache"), + METASTORE_HBASE_CACHE_MAX_WRITER_WAIT("hive.metastore.hbase.cache.max.writer.wait", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), + "Number of milliseconds a writer will wait before giving up."), + METASTORE_HBASE_CACHE_MAX_FULL("hive.metastore.hbase.cache.max.full", (float) 0.9, + "Maximum cache full % after which the cache cleaner thread kicks in."), + METASTORE_HBASE_CACHE_CLEAN_UNTIL("hive.metastore.hbase.cache.clean.until", (float) 0.8, + "The cleaner thread cleans until cache reaches this % full size."), METASTORE_HBASE_CONNECTION_CLASS("hive.metastore.hbase.connection.class", "org.apache.hadoop.hive.metastore.hbase.VanillaHBaseConnection", "Class used to connection to HBase"), diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 5d3e9c2..b76005f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -4272,8 +4272,7 @@ public boolean delete_partition_column_statistics(String dbName, String tableNam @Override public boolean delete_table_column_statistics(String dbName, String tableName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, TException, - InvalidInputException - { + InvalidInputException { dbName = dbName.toLowerCase(); tableName = tableName.toLowerCase(); @@ -4290,7 +4289,33 @@ public boolean delete_table_column_statistics(String dbName, String tableName, S endFunction("delete_column_statistics_by_table: ", ret != false, null, tableName); } return ret; - } + } + + @Override + public AggrStats get_aggr_stats_for(PartitionsStatsRequest request) + throws NoSuchObjectException, MetaException, TException { + startFunction("get_aggr_stats_for: db=" + request.getDbName() + " table=" + request.getTblName()); + AggrStats aggrStats = null; + try { + aggrStats = new AggrStats(getMS().get_aggr_stats_for(request.getDbName(), + request.getTblName(), request.getPartNames(), request.getColNames())); + return aggrStats; + } finally { + endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName()); + } + + } + + @Override + public boolean set_aggr_stats_for(SetPartitionsStatsRequest request) + throws NoSuchObjectException, InvalidObjectException, MetaException, InvalidInputException, + TException { + boolean ret = true; + for (ColumnStatistics colStats : request.getColStats()) { + ret = ret && update_partition_column_statistics(colStats); + } + return ret; + } @Override public List get_partitions_by_filter(final String dbName, @@ -5419,32 +5444,6 @@ public GetRoleGrantsForPrincipalResponse get_role_grants_for_principal( } @Override - public AggrStats get_aggr_stats_for(PartitionsStatsRequest request) - throws NoSuchObjectException, MetaException, TException { - startFunction("get_aggr_stats_for: db=" + request.getDbName() + " table=" + request.getTblName()); - AggrStats aggrStats = null; - try { - aggrStats = new AggrStats(getMS().get_aggr_stats_for(request.getDbName(), - request.getTblName(), request.getPartNames(), request.getColNames())); - return aggrStats; - } finally { - endFunction("get_partitions_statistics_req: ", aggrStats == null, null, request.getTblName()); - } - - } - - @Override - public boolean set_aggr_stats_for(SetPartitionsStatsRequest request) - throws NoSuchObjectException, InvalidObjectException, MetaException, InvalidInputException, - TException { - boolean ret = true; - for (ColumnStatistics colStats : request.getColStats()) { - ret = ret && update_partition_column_statistics(colStats); - } - return ret; - } - - @Override public NotificationEventResponse get_next_notification(NotificationEventRequest rqst) throws TException { RawStore ms = getMS(); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java new file mode 100644 index 0000000..a5cf697 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggregateStatsCache.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +class AggregateStatsCache { + + private static final Log LOG = LogFactory.getLog(AggregateStatsCache.class.getName()); + private static AggregateStatsCache self = null; + + // Backing store for this cache + private final Map cacheStore; + // Cache size + private final int maxCacheNodes; + // Current nodes in the cache + private int currentNodes = 0; + // Run the cleaner thread when the cache is 90% full (should this be configurable?) + private final float maxFull; + private final float cleanUntil; + // Nodes go stale after this + private final long timeToLive; + private final long maxWriterWaitTime; + private final int maxPartsPerCacheNode; + // For bloom filter + private final float falsePositiveProbability; + private final float maxVariance; + private boolean isCleaning = false; + + private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLive, + float falsePositiveProbability, float maxVariance, long maxWriterWaitTime, float maxFull, + float cleanUntil) { + this.maxCacheNodes = maxCacheNodes; + this.maxPartsPerCacheNode = maxPartsPerCacheNode; + this.timeToLive = timeToLive; + this.falsePositiveProbability = falsePositiveProbability; + this.maxVariance = maxVariance; + this.maxWriterWaitTime = maxWriterWaitTime; + this.maxFull = maxFull; + this.cleanUntil = cleanUntil; + this.cacheStore = new ConcurrentHashMap(this.maxCacheNodes); + } + + static synchronized AggregateStatsCache getInstance(Configuration conf) { + if (self == null) { + int maxCacheNodes = + HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_SIZE); + // The number of partitions aggregated per cache node + // If the number of partitions requested is > this value, we'll fetch directly from Metastore + int maxPartitionsPerCacheNode = + HiveConf.getIntVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS); + long timeToLive = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_TIME_TO_LIVE, + TimeUnit.SECONDS); + // False positives probability we are ready to tolerate for the underlying bloom filter + float falsePositiveProbability = + HiveConf.getFloatVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_FALSE_POSITIVE_PROBABILITY); + // Maximum tolerable variance in number of partitions between cached node and our request + float maxVariance = + HiveConf.getFloatVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_VARIANCE); + long maxWriterWaitTime = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_MAX_WRITER_WAIT, + TimeUnit.SECONDS); + float maxFull = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_MAX_FULL); + float cleanUntil = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_CLEAN_UNTIL); + self = + new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLive, + falsePositiveProbability, maxVariance, maxWriterWaitTime, maxFull, cleanUntil); + } + return self; + } + + int getMaxCacheNodes() { + return maxCacheNodes; + } + + int getCurrentNodes() { + return currentNodes; + } + + int getMaxPartsPerCacheNode() { + return maxPartsPerCacheNode; + } + + float getFalsePositiveProbability() { + return falsePositiveProbability; + } + + /** + * Return aggregate stats for a column from the cache or null + * @param dbName + * @param tblName + * @param colName + * @param partNames + * @return + */ + AggrColStats get(String dbName, String tblName, String colName, List partNames) { + if (partNames.size() > maxPartsPerCacheNode) { + return null; + } + // Cache key + Key key = new Key(dbName, tblName, colName); + AggrColStatsList candidateList = cacheStore.get(key); + // No key, or no nodes in candidate list + if ((candidateList == null) || (candidateList.nodes.size() == 0)) { + return null; + } + // Find the value object + // Update the timestamp of the key,value if value matches the criteria + // Return the value + AggrColStats match = null; + candidateList.readLock.lock(); + try { + match = findBestMatch(partNames, candidateList.nodes); + } finally { + candidateList.readLock.unlock(); + } + return match; + } + + /** + * Find the best match using the configurable error tolerance and time to live value + * + * @param partNames + * @param candidates + * @return best matched node or null + */ + private AggrColStats findBestMatch(List partNames, List candidates) { + // Hits, misses, shouldSkip for a node + MatchStats matchStats; + // The final match we intend to return + AggrColStats bestMatch = null; + // To compare among potentially multiple matches + int bestMatchHits = 0; + int numPartsRequested = partNames.size(); + // MatchStats for each candidate + Map candidateMatchStats = new HashMap(); + // 1st pass at marking invalid candidates + // Checks based on variance and TTL + // Note: we're not creating a copy of the list for saving memory + for (AggrColStats candidate : candidates) { + // Variance check + if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) + / numPartsRequested) > maxVariance) { + candidateMatchStats.put(candidate, new MatchStats(0, 0, true)); + } + // TTL check + if (isExpired(candidate)) { + candidateMatchStats.put(candidate, new MatchStats(0, 0, true)); + } + candidateMatchStats.put(candidate, new MatchStats(0, 0, false)); + } + // We'll cound misses as we iterate + int maxMisses = (int) maxVariance * numPartsRequested; + for (String partName : partNames) { + for (AggrColStats candidate : candidates) { + matchStats = candidateMatchStats.get(candidate); + if (matchStats.shouldSkip) { + continue; + } + if (candidate.getBloomFilter().contains(partName.getBytes())) { + ++matchStats.hits; + } else { + ++matchStats.misses; + } + // 2nd pass at marking invalid candidates + // If misses so far exceed max tolerable misses + if (matchStats.misses > maxMisses) { + matchStats.shouldSkip = true; + continue; + } + if (matchStats.hits > bestMatchHits) { + bestMatch = candidate; + } + } + } + if (bestMatch != null) { + // Update the last access time for this node + bestMatch.updateLastAccessTime(); + } + return bestMatch; + } + + /** + * + * @param dbName + * @param tblName + * @param colName + * @param numPartsCached + * @param colStats + * @param bloomFilter + */ + void add(String dbName, String tblName, String colName, int numPartsCached, + ColumnStatisticsObj colStats, BloomFilter bloomFilter) { + // Cache key + Key key = new Key(dbName, tblName, colName); + // New node to add to cache + AggrColStats node = new AggrColStats(numPartsCached, bloomFilter, colStats); + AggrColStatsList nodeList; + if (currentNodes / maxCacheNodes < maxFull) { + if (cacheStore.containsKey(key)) { + nodeList = cacheStore.get(key); + } + else { + nodeList = new AggrColStatsList(); + nodeList.nodes = new ArrayList(); + key.updateLastAccessTime(); + cacheStore.put(key, nodeList); + } + } + else { + // Run cleaner thread + clean(); + nodeList = cacheStore.get(key); + } + boolean isLocked = false; + try { + isLocked = nodeList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS); + if (isLocked) { + nodeList.nodes.add(node); + ++currentNodes; + } + } catch (InterruptedException e) { + LOG.debug(e); + return; + } + finally { + if (isLocked) { + nodeList.writeLock.unlock(); + } + } + } + + /** + * Cleans the expired nodes or removes LRU nodes of the cache until cache is cleanUntil% full. + */ + private void clean() { + // This spawns a separate thread to walk through the cache and removes expired nodes. + // Only one cleaner thread should be running at any point. + synchronized (this) { + if (isCleaning) { + return; + } + isCleaning = true; + } + Thread cleaner = new Thread() { + @Override + public void run() { + Iterator> mapIterator = cacheStore.entrySet().iterator(); + while (mapIterator.hasNext()) { + Map.Entry pair = + (Map.Entry) mapIterator.next(); + AggrColStats node; + AggrColStatsList candidateList = (AggrColStatsList) pair.getValue(); + List nodes = candidateList.nodes; + if (nodes.size() == 0) { + mapIterator.remove(); + continue; + } + candidateList.writeLock.lock(); + boolean isLocked = false; + try { + isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS); + if (isLocked) { + for (Iterator listIterator = nodes.iterator(); listIterator.hasNext();) { + node = listIterator.next(); + if (isExpired(node)) { + listIterator.remove(); + --currentNodes; + } + } + } + } catch (InterruptedException e) { + LOG.debug(e); + continue; + } finally { + if (isLocked) { + candidateList.writeLock.unlock(); + } + } + // We want to make sure this runs at a low priority in the background + Thread.yield(); + } + // If the expired nodes did not result in cache being cleanUntil% in size, + // start removing LRU nodes + while (currentNodes / maxCacheNodes > cleanUntil) { + evictOneNode(); + } + } + }; + cleaner.setPriority(Thread.MIN_PRIORITY); + cleaner.setDaemon(true); + cleaner.start(); + } + + /** + * Evict an LRU node or expired node whichever we find first + */ + private void evictOneNode() { + // Get the LRU key + Key lruKey = null; + for (Key key : cacheStore.keySet()) { + if (lruKey == null) { + lruKey = key; + continue; + } + if ((key.lastAccessTime < lruKey.lastAccessTime) && !(cacheStore.get(key).nodes.isEmpty())) { + lruKey = key; + } + } + // Now delete a node for this key's list + AggrColStatsList candidateList = cacheStore.get(lruKey); + candidateList.writeLock.lock(); + boolean isLocked = false; + try { + isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS); + if (isLocked) { + AggrColStats candidate; + AggrColStats lruNode = null; + int currentIndex = 0; + int deleteIndex = 0; + for (Iterator iterator = candidateList.nodes.iterator(); iterator.hasNext();) { + candidate = iterator.next(); + // Since we have to create space for 1, if we find an expired node we will remove it & + // return + if (isExpired(candidate)) { + iterator.remove(); + --currentNodes; + return; + } + // Sorry, too many ifs but this form looks optimal + // Update the LRU node from what we've seen so far + if (lruNode == null) { + lruNode = candidate; + ++currentIndex; + continue; + } + if (lruNode != null) { + if (candidate.lastAccessTime < lruNode.lastAccessTime) { + lruNode = candidate; + deleteIndex = currentIndex; + } + } + } + candidateList.nodes.remove(deleteIndex); + --currentNodes; + } + } catch (InterruptedException e) { + LOG.debug(e); + return; + } finally { + if (isLocked) { + candidateList.writeLock.unlock(); + } + } + } + + void printMetrics() { + + } + + private boolean isExpired(AggrColStats aggrColStats) { + return System.currentTimeMillis() - aggrColStats.lastAccessTime > timeToLive; + } + + /** + * Key object for the stats cache hashtable + */ + private static class Key { + private final String dbName; + private final String tblName; + private final String colName; + private long lastAccessTime = 0; + + Key(String db, String table, String col) { + // Don't construct an illegal cache key + if ((db == null) || (table == null) || (col == null)) { + throw new IllegalArgumentException("dbName, tblName, colName can't be null"); + } + dbName = db; + tblName = table; + colName = col; + } + + @Override + public boolean equals(Object other) { + if ((other == null) || !(other instanceof Key)) { + return false; + } + Key that = (Key) other; + return dbName.equals(that.dbName) && tblName.equals(that.tblName) + && colName.equals(that.colName); + } + + @Override + public int hashCode() { + return dbName.hashCode() * 31 + tblName.hashCode() * 31 + colName.hashCode(); + } + + void updateLastAccessTime() { + this.lastAccessTime = System.currentTimeMillis(); + } + } + + static class AggrColStatsList { + // TODO: figure out a better data structure for node list(?) + private List nodes = new ArrayList(); + private ReadWriteLock lock = new ReentrantReadWriteLock(); + private Lock readLock = lock.readLock(); + private Lock writeLock = lock.writeLock(); + + List getNodes() { + return nodes; + } + } + + static class AggrColStats { + private volatile long lastAccessTime; + private final int numPartsCached; + private final BloomFilter bloomFilter; + private final ColumnStatisticsObj colStats; + + AggrColStats(int numPartsCached, BloomFilter bloomFilter, + ColumnStatisticsObj colStats) { + this.numPartsCached = numPartsCached; + this.bloomFilter = bloomFilter; + this.colStats = colStats; + this.lastAccessTime = System.currentTimeMillis(); + } + + int getNumPartsCached() { + return numPartsCached; + } + + ColumnStatisticsObj getColStats() { + updateLastAccessTime(); + return colStats; + } + + BloomFilter getBloomFilter() { + return bloomFilter; + } + + void updateLastAccessTime() { + this.lastAccessTime = System.currentTimeMillis(); + } + } + + /** + * TODO: capture some metrics for the cache + */ + class Metrics { + + } + + /** + * Intermediate object, used to collect hits & misses for each cache node that is evaluate for an + * incoming request + */ + private static class MatchStats { + private int hits = 0; + private int misses = 0; + private boolean shouldSkip = false; + + MatchStats(int hits, int misses, boolean shouldSkip) { + this.hits = hits; + this.misses = misses; + this.shouldSkip = shouldSkip; + } + } + + /** + * TODO: implement memory management for the cache + */ + static class MemoryManager { + + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index c83d7c8..523913e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -51,6 +52,10 @@ import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.hbase.aggregator.ColumnStatsAggregator; +import org.apache.hadoop.hive.metastore.hbase.aggregator.ColumnStatsAggregatorFactory; +import org.apache.hadoop.hive.metastore.hbase.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter; import java.io.IOException; import java.security.MessageDigest; @@ -114,17 +119,17 @@ protected HBaseReadWrite initialValue() { private ObjectCache, Table> tableCache; private ObjectCache sdCache; private PartitionCache partCache; - private StatsCache statsCache; - private final Counter tableHits; - private final Counter tableMisses; - private final Counter tableOverflows; - private final Counter partHits; - private final Counter partMisses; - private final Counter partOverflows; - private final Counter sdHits; - private final Counter sdMisses; - private final Counter sdOverflows; - private final List counters; + private AggregateStatsCache aggrStatsCache; + private Counter tableHits; + private Counter tableMisses; + private Counter tableOverflows; + private Counter partHits; + private Counter partMisses; + private Counter partOverflows; + private Counter sdHits; + private Counter sdMisses; + private Counter sdOverflows; + private List counters; // roleCache doesn't use ObjectCache because I don't want to limit the size. I am assuming // that the number of roles will always be small (< 100) so caching the whole thing should not // be painful. @@ -182,8 +187,8 @@ private HBaseReadWrite(Configuration configuration) { } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } - int totalObjectsToCache = - HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_SIZE); + int totalCatalogObjectsToCache = + HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CATALOG_CACHE_SIZE); tableHits = new Counter("table cache hits"); tableMisses = new Counter("table cache misses"); @@ -205,24 +210,21 @@ private HBaseReadWrite(Configuration configuration) { counters.add(sdMisses); counters.add(sdOverflows); - // Divide 50/50 between catalog and stats, then give 1% of catalog space to storage - // descriptors (storage descriptors are shared, so 99% should be the same for a - // given table). - int sdsCacheSize = totalObjectsToCache / 100; + // Give 1% of catalog cache space to storage descriptors + // (storage descriptors are shared, so 99% should be the same for a given table) + int sdsCacheSize = totalCatalogObjectsToCache / 100; if (conf.getBoolean(NO_CACHE_CONF, false)) { tableCache = new BogusObjectCache, Table>(); sdCache = new BogusObjectCache(); partCache = new BogusPartitionCache(); - statsCache = StatsCache.getBogusStatsCache(); } else { tableCache = new ObjectCache, Table>(TABLES_TO_CACHE, tableHits, tableMisses, tableOverflows); sdCache = new ObjectCache(sdsCacheSize, sdHits, sdMisses, sdOverflows); - partCache = new PartitionCache(totalObjectsToCache / 2, partHits, partMisses, partOverflows); - statsCache = StatsCache.getInstance(conf); + partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, partMisses, partOverflows); + aggrStatsCache = AggregateStatsCache.getInstance(conf); } - roleCache = new HashMap(); entireRoleTableInCache = false; } @@ -1416,195 +1418,242 @@ public int hashCode() { /** * Update statistics for one or more columns for a table or a partition. + * * @param dbName database the table is in * @param tableName table to update statistics for * @param partName name of the partition, can be null if these are table level statistics. - * @param partVals partition values that define partition to update statistics for. If this is - * null, then these will be assumed to be table level statistics. - * @param stats Stats object with stats for one or more columns. + * @param partVals partition values that define partition to update statistics for. If this is + * null, then these will be assumed to be table level statistics + * @param stats Stats object with stats for one or more columns * @throws IOException */ void updateStatistics(String dbName, String tableName, String partName, List partVals, - ColumnStatistics stats) throws IOException { + ColumnStatistics stats) throws IOException { byte[] key = getStatisticsKey(dbName, tableName, partVals); String hbaseTable = getStatisticsTable(partVals); - byte[][] colnames = new byte[stats.getStatsObjSize()][]; - byte[][] serializeds = new byte[stats.getStatsObjSize()][]; - for (int i = 0; i < stats.getStatsObjSize(); i++) { - ColumnStatisticsObj obj = stats.getStatsObj().get(i); - serializeds[i] = HBaseUtils.serializeStatsForOneColumn(stats, obj); - String colname = obj.getColName(); - colnames[i] = HBaseUtils.buildKey(colname); - statsCache.put(dbName, tableName, partName, colname, obj, - stats.getStatsDesc().getLastAnalyzed()); - } - store(hbaseTable, key, STATS_CF, colnames, serializeds); + byte[][] serialized = new byte[stats.getStatsObjSize()][]; + store(hbaseTable, key, STATS_CF, colnames, serialized); } /** - * Get Statistics for a table + * Get statistics for a table + * * @param dbName name of database table is in * @param tableName name of table * @param colNames list of column names to get statistics for * @return column statistics for indicated table * @throws IOException */ - ColumnStatistics getTableStatistics(String dbName, String tableName, List colNames) + ColumnStatistics getTableStatistics(String dbName, String tblName, List colNames) throws IOException { - byte[] key = HBaseUtils.buildKey(dbName, tableName); - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(); - desc.setIsTblLevel(true); - desc.setDbName(dbName); - desc.setTableName(tableName); - stats.setStatsDesc(desc); - - // First we have to go through and see what's in the cache and fetch what we can from there. - // Then we'll fetch the rest from HBase - List stillLookingFor = new ArrayList(); - for (int i = 0; i < colNames.size(); i++) { - StatsCache.StatsInfo info = - statsCache.getTableStatistics(dbName, tableName, colNames.get(i)); - if (info == null) { - stillLookingFor.add(colNames.get(i)); - } else { - info.stats.setColName(colNames.get(i)); - stats.addToStatsObj(info.stats); - stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(), - info.lastAnalyzed)); - } - } - if (stillLookingFor.size() == 0) return stats; - - byte[][] colKeys = new byte[stillLookingFor.size()][]; + byte[] tabKey = HBaseUtils.buildKey(dbName, tblName); + ColumnStatistics tableStats = new ColumnStatistics(); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setIsTblLevel(true); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + tableStats.setStatsDesc(statsDesc); + byte[][] colKeys = new byte[colNames.size()][]; for (int i = 0; i < colKeys.length; i++) { - colKeys[i] = HBaseUtils.buildKey(stillLookingFor.get(i)); + colKeys[i] = HBaseUtils.buildKey(colNames.get(i)); } - Result res = read(TABLE_TABLE, key, STATS_CF, colKeys); + Result result = read(TABLE_TABLE, tabKey, STATS_CF, colKeys); for (int i = 0; i < colKeys.length; i++) { - byte[] serialized = res.getValue(STATS_CF, colKeys[i]); - if (serialized == null) { + byte[] serializedColStats = result.getValue(STATS_CF, colKeys[i]); + if (serializedColStats == null) { // There were no stats for this column, so skip it continue; } - ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(stats, serialized); - statsCache.put(dbName, tableName, null, stillLookingFor.get(i), obj, - stats.getStatsDesc().getLastAnalyzed()); - obj.setColName(stillLookingFor.get(i)); - stats.addToStatsObj(obj); + ColumnStatisticsObj obj = + HBaseUtils.deserializeStatsForOneColumn(tableStats, serializedColStats); + obj.setColName(colNames.get(i)); + tableStats.addToStatsObj(obj); } - return stats; + return tableStats; } /** * Get statistics for a set of partitions + * * @param dbName name of database table is in * @param tableName table partitions are in * @param partNames names of the partitions, used only to set values inside the return stats - * objects. - * @param partVals partition values for each partition, needed because this class doesn't know - * how to translate from partName to partVals - * @param colNames column names to fetch stats for. These columns will be fetched for all - * requested partitions. - * @return list of ColumnStats, one for each partition. The values will be in the same order - * as the partNames list that was passed in. + * objects + * @param partVals partition values for each partition, needed because this class doesn't know how + * to translate from partName to partVals + * @param colNames column names to fetch stats for. These columns will be fetched for all + * requested partitions + * @return list of ColumnStats, one for each partition. The values will be in the same order as + * the partNames list that was passed in * @throws IOException */ - List getPartitionStatistics(String dbName, String tableName, - List partNames, - List> partVals, - List colNames) throws IOException { - // Go through the cache first, see what we can fetch from there. This is complicated because - // we may have different columns for different partitions + List getPartitionStatistics(String dbName, String tblName, + List partNames, List> partVals, List colNames) + throws IOException { List statsList = new ArrayList(partNames.size()); - List stillLookingFor = new ArrayList(); - for (int pOff = 0; pOff < partVals.size(); pOff++) { - // Add an entry for this partition in the list - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(); - desc.setIsTblLevel(false); - desc.setDbName(dbName); - desc.setTableName(tableName); - desc.setPartName(partNames.get(pOff)); - stats.setStatsDesc(desc); - statsList.add(stats); - PartStatsInfo missing = null; - - for (int cOff = 0; cOff < colNames.size(); cOff++) { - StatsCache.StatsInfo info = statsCache.getPartitionStatistics(dbName, tableName, - partNames.get(pOff), colNames.get(cOff)); - if (info == null) { - if (missing == null) { - // We haven't started an entry for this one yet - missing = new PartStatsInfo(stats, partVals.get(pOff), partNames.get(pOff)); - stillLookingFor.add(missing); - } - missing.colNames.add(colNames.get(cOff)); - } else { - info.stats.setColName(colNames.get(cOff)); - stats.addToStatsObj(info.stats); - stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(), - info.lastAnalyzed)); - } - } - } - if (stillLookingFor.size() == 0) return statsList; - - // Build the list of gets. It may be different for each partition now depending on what we - // found in the cache. + ColumnStatistics partitionStats; + ColumnStatisticsDesc statsDesc; + byte[][] colKeys = new byte[colNames.size()][]; List gets = new ArrayList(); - for (PartStatsInfo pi : stillLookingFor) { - byte[][] colKeys = new byte[pi.colNames.size()][]; + // Initialize the list and build the Gets + for (int pOff = 0; pOff < partNames.size(); pOff++) { + // Add an entry for this partition in the stats list + partitionStats = new ColumnStatistics(); + statsDesc = new ColumnStatisticsDesc(); + statsDesc.setIsTblLevel(false); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setPartName(partNames.get(pOff)); + partitionStats.setStatsDesc(statsDesc); + statsList.add(partitionStats); + // Build the list of Gets for (int i = 0; i < colKeys.length; i++) { - colKeys[i] = HBaseUtils.buildKey(pi.colNames.get(i)); + colKeys[i] = HBaseUtils.buildKey(colNames.get(i)); } - pi.colKeys = colKeys; - - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, pi.partVals); - Get g = new Get(key); - for (byte[] colName : colKeys) g.addColumn(STATS_CF, colName); - gets.add(g); + byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff)); + Get get = new Get(partKey); + for (byte[] colName : colKeys) { + get.addColumn(STATS_CF, colName); + } + gets.add(get); } + HTableInterface htab = conn.getHBaseTable(PART_TABLE); + // Get results from HBase Result[] results = htab.get(gets); - + // Deserialize the stats objects and add to stats list for (int pOff = 0; pOff < results.length; pOff++) { - PartStatsInfo pi = stillLookingFor.get(pOff); - for (int cOff = 0; cOff < pi.colNames.size(); cOff++) { - byte[] serialized = results[pOff].getValue(STATS_CF, pi.colKeys[cOff]); - if (serialized == null) { + for (int cOff = 0; cOff < colNames.size(); cOff++) { + byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKeys[cOff]); + if (serializedColStats == null) { // There were no stats for this column, so skip it continue; } - ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(pi.stats, serialized); - statsCache.put(dbName, tableName, pi.partName, pi.colNames.get(cOff), obj, - pi.stats.getStatsDesc().getLastAnalyzed()); - obj.setColName(pi.colNames.get(cOff)); - pi.stats.addToStatsObj(obj); + partitionStats = statsList.get(pOff); + ColumnStatisticsObj colStats = + HBaseUtils.deserializeStatsForOneColumn(partitionStats, serializedColStats); + colStats.setColName(colNames.get(cOff)); + partitionStats.addToStatsObj(colStats); } } return statsList; } - private static class PartStatsInfo { - ColumnStatistics stats; - String partName; - List colNames; - List partVals; - byte[][] colKeys; + /** + * Get aggregate stats for a column from the DB and populate the bloom filter if it's not null + * @param dbName + * @param tblName + * @param partNames + * @param partVals + * @param colNames + * @return + * @throws IOException + */ + AggrStats getAggrStats(String dbName, String tblName, List partNames, + List> partVals, List colNames) throws IOException { + // One ColumnStatisticsObj per column + List colStatsList = new ArrayList(); + AggrColStats colStatsAggrCached; + ColumnStatisticsObj colStatsAggr; + int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); + float falsePositiveProbability = aggrStatsCache.getFalsePositiveProbability(); + int partitionsRequested = partNames.size(); + // TODO: Steal extrapolation logic from current MetaStoreDirectSql code + // Right now doing nothing and keeping partitionsFound == partitionsRequested + int partitionsFound = partitionsRequested; + for (String colName : colNames) { + if (partitionsRequested > maxPartitionsPerCacheNode) { + // Read from HBase but don't add to cache since it doesn't qualify the criteria + colStatsAggr = getAggrColStatsFromDB(dbName, tblName, colName, partNames, partVals, null); + colStatsList.add(colStatsAggr); + } else { + // Check the cache first + colStatsAggrCached = aggrStatsCache.get(dbName, tblName, colName, partNames); + if (colStatsAggrCached != null) { + colStatsList.add(colStatsAggrCached.getColStats()); + } else { + // Bloom filter for the new node that we will eventually add to the cache + BloomFilter bloomFilter = + new BloomFilter(maxPartitionsPerCacheNode, falsePositiveProbability); + colStatsAggr = + getAggrColStatsFromDB(dbName, tblName, colName, partNames, partVals, bloomFilter); + colStatsList.add(colStatsAggr); + // Update the cache to add this new aggregate node + aggrStatsCache.add(dbName, tblName, colName, partitionsFound, colStatsAggr, bloomFilter); + } + } + } + return new AggrStats(colStatsList, partitionsFound); + } - PartStatsInfo(ColumnStatistics s, List pv, String pn) { - stats = s; partVals = pv; partName = pn; - colNames = new ArrayList(); - colKeys = null; + /** + * + * @param dbName + * @param tblName + * @param partNames + * @param partVals + * @param colName + * @param bloomFilter + * @return + */ + private ColumnStatisticsObj getAggrColStatsFromDB(String dbName, String tblName, String colName, + List partNames, List> partVals, BloomFilter bloomFilter) + throws IOException { + ColumnStatisticsObj colStatsAggr = new ColumnStatisticsObj(); + boolean colStatsAggrInited = false; + ColumnStatsAggregator colStatsAggregator = null; + List gets = new ArrayList(); + byte[] colKey = HBaseUtils.buildKey(colName); + // Build a list of Gets, one per partition + for (int pOff = 0; pOff < partNames.size(); pOff++) { + byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff)); + Get get = new Get(partKey); + get.addColumn(STATS_CF, colKey); + gets.add(get); + } + HTableInterface htab = conn.getHBaseTable(PART_TABLE); + // Get results from HBase + Result[] results = htab.get(gets); + // Iterate through the results + // The results size and order is the same as the number and order of the Gets + // If the column is not present in a partition, the Result object will be empty + for (int pOff = 0; pOff < partNames.size(); pOff++) { + if (results[pOff].isEmpty()) { + // There were no stats for this column, so skip it + continue; + } + byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKey); + if (serializedColStats == null) { + // There were no stats for this column, so skip it + continue; + } + ColumnStatisticsObj colStats = + HBaseUtils.deserializeStatsForOneColumn(null, serializedColStats); + if (!colStatsAggrInited) { + // This is the 1st column stats object we got + colStatsAggr.setColName(colName); + colStatsAggr.setColType(colStats.getColType()); + colStatsAggr.setStatsData(colStats.getStatsData()); + colStatsAggregator = + ColumnStatsAggregatorFactory.getColumnStatsAggregator(colStats.getStatsData() + .getSetField()); + colStatsAggrInited = true; + } else { + // Perform aggregation with whatever we've already aggregated + colStatsAggregator.aggregate(colStatsAggr, colStats); + } + // Add partition to the bloom filter if it's requested + if (bloomFilter != null) { + bloomFilter.addToFilter(partNames.get(pOff).getBytes()); + } } + return colStatsAggr; } private byte[] getStatisticsKey(String dbName, String tableName, List partVals) { - return partVals == null ? - HBaseUtils.buildKey(dbName, tableName) : - HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils + .buildPartitionKey(dbName, tableName, partVals); } private String getStatisticsTable(List partVals) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 4b4cfeb..2390dd8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; +import org.apache.hadoop.hive.metastore.hbase.AggregateStatsCache.AggrColStats; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.thrift.TException; @@ -1228,8 +1229,8 @@ public Partition getPartitionWithAuth(String dbName, String tblName, List partVals) throws - NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, List partVals) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { try { getHBase().updateStatistics(statsObj.getStatsDesc().getDbName(), - statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), - partVals, statsObj); + statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), partVals, + statsObj); return true; } catch (IOException e) { LOG.error("Unable to update column statistics", e); @@ -1257,8 +1257,7 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics statsObj, @Override public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, - List colName) throws MetaException, - NoSuchObjectException { + List colName) throws MetaException, NoSuchObjectException { try { return getHBase().getTableStatistics(dbName, tableName, colName); } catch (IOException e) { @@ -1269,11 +1268,11 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName @Override public List getPartitionColumnStatistics(String dbName, String tblName, - List partNames, - List colNames) throws - MetaException, NoSuchObjectException { + List partNames, List colNames) throws MetaException, NoSuchObjectException { List> partVals = new ArrayList>(partNames.size()); - for (String partName : partNames) partVals.add(partNameToVals(partName)); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } try { return getHBase().getPartitionStatistics(dbName, tblName, partNames, partVals, colNames); } catch (IOException e) { @@ -1284,8 +1283,8 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName @Override public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, - List partVals, String colName) throws - NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + List partVals, String colName) throws NoSuchObjectException, MetaException, + InvalidObjectException, InvalidInputException { // NOP, stats will be deleted along with the partition when it is dropped. return true; } @@ -1297,6 +1296,26 @@ public boolean deleteTableColumnStatistics(String dbName, String tableName, Stri return true; } + /** + * Return aggregated statistics for each column in the colNames list aggregated over partitions in + * the partNames list + * + */ + @Override + public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, + List colNames) throws MetaException, NoSuchObjectException { + List> partVals = new ArrayList>(partNames.size()); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + try { + return getHBase().getAggrStats(dbName, tblName, partNames, partVals, colNames); + } catch (IOException e) { + LOG.error("Unable to fetch aggregate column statistics", e); + throw new MetaException("Failed fetching aggregate column statistics, " + e.getMessage()); + } + } + @Override public long cleanupEvents() { throw new UnsupportedOperationException(); @@ -1629,13 +1648,6 @@ public Function getFunction(String dbName, String funcName) throws MetaException } @Override - public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, - NoSuchObjectException { - throw new UnsupportedOperationException(); - } - - @Override public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) { throw new UnsupportedOperationException(); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 2e1ae27..7341fbb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Decimal; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TFieldIdEnum; import java.io.IOException; import java.nio.charset.Charset; @@ -900,15 +902,15 @@ static StorageDescriptorParts deserializeTable(String dbName, String tableName, return sdParts; } - static byte[] serializeStatsForOneColumn(ColumnStatistics stats, ColumnStatisticsObj obj) + static byte[] serializeStatsForOneColumn(ColumnStatistics partitionColumnStats, ColumnStatisticsObj colStats) throws IOException { HbaseMetastoreProto.ColumnStats.Builder builder = HbaseMetastoreProto.ColumnStats.newBuilder(); - builder.setLastAnalyzed(stats.getStatsDesc().getLastAnalyzed()); - if (obj.getColType() == null) { + builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed()); + if (colStats.getColType() == null) { throw new RuntimeException("Column type must be set"); } - builder.setColumnType(obj.getColType()); - ColumnStatisticsData colData = obj.getStatsData(); + builder.setColumnType(colStats.getColType()); + ColumnStatisticsData colData = colStats.getStatsData(); switch (colData.getSetField()) { case BOOLEAN_STATS: BooleanColumnStatsData boolData = colData.getBooleanStats(); @@ -988,14 +990,16 @@ static StorageDescriptorParts deserializeTable(String dbName, String tableName, return builder.build().toByteArray(); } - static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics stats, - byte[] bytes) throws IOException { + static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics partitionColumnStats, + byte[] bytes) throws IOException { HbaseMetastoreProto.ColumnStats proto = HbaseMetastoreProto.ColumnStats.parseFrom(bytes); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); + ColumnStatisticsObj colStats = new ColumnStatisticsObj(); long lastAnalyzed = proto.getLastAnalyzed(); - stats.getStatsDesc().setLastAnalyzed( - Math.max(lastAnalyzed, stats.getStatsDesc().getLastAnalyzed())); - obj.setColType(proto.getColumnType()); + if (partitionColumnStats != null) { + partitionColumnStats.getStatsDesc().setLastAnalyzed( + Math.max(lastAnalyzed, partitionColumnStats.getStatsDesc().getLastAnalyzed())); + } + colStats.setColType(proto.getColumnType()); ColumnStatisticsData colData = new ColumnStatisticsData(); if (proto.hasBoolStats()) { @@ -1059,9 +1063,8 @@ static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics stats, } else { throw new RuntimeException("Woh, bad. Unknown stats type!"); } - - obj.setStatsData(colData); - return obj; + colStats.setStatsData(colData); + return colStats; } /** @@ -1078,5 +1081,4 @@ static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics stats, keyEnd[keyEnd.length - 1]++; return keyEnd; } - } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java deleted file mode 100644 index 6066578..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hive.metastore.hbase; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Caching for stats. This implements an LRU cache. It does not remove entries explicitly as - * that is generally expensive to find all entries for a table or partition. Instead it lets them - * time out. When the cache is full a sweep is done in the background to remove expired entries. - * This cache is shared across all threads, and so operations are protected by reader or writer - * locks as appropriate. - */ -class StatsCache { - static final private Log LOG = LogFactory.getLog(StatsCache.class.getName()); - - private static StatsCache self = null; - - private final long timeToLive; - private final int maxSize; - private Map cache; - private ReadWriteLock lock; - private boolean cleaning; - private Counter tableMisses; - private Counter partMisses; - private Counter tableHits; - private Counter partHits; - private Counter cleans; - private List counters; - - static synchronized StatsCache getInstance(Configuration conf) { - if (self == null) { - int totalObjectsToCache = - HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_SIZE); - long timeToLive = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.METASTORE_HBASE_CACHE_TIME_TO_LIVE, TimeUnit.SECONDS); - self = new StatsCache(totalObjectsToCache / 2, timeToLive); - } - return self; - } - - /** - * @param max maximum number of objects to store in the cache. When max is reached, eviction - * policy is MRU. - * @param timeToLive time (in seconds) that an entry is valid. After this time the record will - * discarded lazily - */ - private StatsCache(int max, long timeToLive) { - maxSize = max; - this.timeToLive = timeToLive * 1000; - cache = new HashMap(); - lock = new ReentrantReadWriteLock(); - cleaning = false; - counters = new ArrayList(); - tableMisses = new Counter("Stats cache table misses"); - counters.add(tableMisses); - tableHits = new Counter("Stats cache table hits"); - counters.add(tableHits); - partMisses = new Counter("Stats cache partition misses"); - counters.add(partMisses); - partHits = new Counter("Stats cache partition hits"); - counters.add(partHits); - cleans = new Counter("Stats cache cleans"); - counters.add(cleans); - } - - /** - * Add an object to the cache. - * @param dbName name of database table is in - * @param tableName name of table - * @param partName name of partition, can be null if these are table level statistics - * @param colName name of the column these statistics are for - * @param stats stats - * @param lastAnalyzed last time these stats were analyzed - */ - void put(String dbName, String tableName, String partName, String colName, - ColumnStatisticsObj stats, long lastAnalyzed) { - // TODO - we may want to not put an entry in if we're full. - if (cache.size() >= maxSize) clean(); - Key key = new Key(dbName, tableName, partName, colName); - StatsInfo info = new StatsInfo(stats, lastAnalyzed); - lock.writeLock().lock(); - try { - cache.put(key, info); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Get table level statistics - * @param dbName name of database table is in - * @param tableName name of table - * @param colName of column to get stats for - * @return stats object for this column, or null if none cached - */ - StatsInfo getTableStatistics(String dbName, String tableName, String colName) { - return getStatistics(new Key(dbName, tableName, colName), tableHits, tableMisses); - } - - /** - * Get partition level statistics - * @param dbName name of database table is in - * @param tableName name of table - * @param partName name of this partition - * @param colName of column to get stats for - * @return stats object for this column, or null if none cached - */ - StatsInfo getPartitionStatistics(String dbName, String tableName, - String partName, String colName) { - return getStatistics(new Key(dbName, tableName, partName, colName), partHits, partMisses); - } - - String[] dumpMetrics() { - String[] strs = new String[counters.size()]; - for (int i = 0; i < strs.length; i++) { - strs[i] = counters.get(i).dump(); - } - return strs; - } - - private StatsInfo getStatistics(Key key, Counter hits, Counter misses) { - StatsInfo s = null; - lock.readLock().lock(); - try { - s = cache.get(key); - } finally { - lock.readLock().unlock(); - } - if (s == null) { - misses.incr(); - return null; - } else if (tooLate(s)) { - remove(key); - misses.incr(); - return null; - } else { - s.lastTouched = System.currentTimeMillis(); - hits.incr(); - return s; - } - } - - private void remove(Key key) { - lock.writeLock().lock(); - try { - // It's possible that multiple callers will call remove for a given key, so don't complain - // if the indicated key is already gone. - cache.remove(key); - } finally { - lock.writeLock().unlock(); - } - } - - private void clean() { - // TODO - we may want to add intelligence to check if we cleaned anything after a cleaning - // pass. If we're still at or near capacity we may want to reduce ttl and make another run. - // This spawns a separate thread to walk through the cache and clean. - synchronized (this) { - if (cleaning) return; - cleaning = true; - cleans.incr(); - } - - Thread cleaner = new Thread() { - @Override - public void run() { - try { - // Get the read lock and then make a copy of the map. This is so we can work through it - // without having concurrent modification exceptions. Then walk through and remove things - // one at a time. - List> entries = null; - lock.readLock().lock(); - try { - entries = new ArrayList>(cache.entrySet()); - } finally { - lock.readLock().unlock(); - } - for (Map.Entry entry : entries) { - if (tooLate(entry.getValue())) { - remove(entry.getKey()); - } - // We want to make sure this runs at a low priority in the background - Thread.yield(); - } - } catch (Throwable t) { - // Don't let anything past this thread that could end up killing the metastore - LOG.error("Caught exception in stats cleaner thread", t); - } finally { - cleaning = false; - } - } - }; - cleaner.setPriority(Thread.MIN_PRIORITY); - cleaner.setDaemon(true); - cleaner.start(); - } - - private boolean tooLate(StatsInfo stats) { - return System.currentTimeMillis() - stats.lastTouched > timeToLive; - } - - private static class Key { - private final String dbName, tableName, partName, colName; - - Key(String db, String table, String col) { - this(db, table, null, col); - } - - Key(String db, String table, String part, String col) { - dbName = db; tableName = table; partName = part; colName = col; - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof Key)) return false; - Key that = (Key)other; - if (partName == null) { - return that.partName == null && dbName.equals(that.dbName) && - tableName.equals(that.tableName) && colName.equals(that.colName); - } else { - return dbName.equals(that.dbName) && tableName.equals(that.tableName) && - partName.equals(that.partName) && colName.equals(that.colName); - } - } - - @Override - public int hashCode() { - int hashCode = dbName.hashCode() * 31 + tableName.hashCode(); - if (partName != null) hashCode = hashCode * 31 + partName.hashCode(); - return hashCode * 31 + colName.hashCode(); - } - } - - static class StatsInfo { - final ColumnStatisticsObj stats; - final long lastAnalyzed; - long lastTouched; - - StatsInfo(ColumnStatisticsObj obj, long la) { - stats = obj; - lastAnalyzed = la; - lastTouched = System.currentTimeMillis(); - } - } - - - /** - * This returns a stats cache that will store nothing and return nothing, useful - * for unit testing when you don't want the cache in your way. - * @return - */ - @VisibleForTesting - static StatsCache getBogusStatsCache() { - return new StatsCache(0, 0) { - @Override - void put(String dbName, String tableName, String partName, String colName, - ColumnStatisticsObj stats, long lastAnalyzed) { - } - - @Override - StatsInfo getTableStatistics(String dbName, String tableName, String colName) { - return null; - } - - @Override - StatsInfo getPartitionStatistics(String dbName, String tableName, - String partName, String colName) { - return null; - } - }; - } - - /** - * Go through and make all the entries in the cache old so they will time out when requested - */ - @VisibleForTesting - void makeWayOld() { - for (StatsInfo stats : cache.values()) { - stats.lastTouched = 1; - } - } - - @VisibleForTesting - int cacheSize() { - return cache.size(); - } - - @VisibleForTesting - boolean cleaning() { - return cleaning; - } - - @VisibleForTesting - void clear() { - cache.clear(); - } -} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BinaryColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BinaryColumnStatsAggregator.java new file mode 100644 index 0000000..fe83302 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BinaryColumnStatsAggregator.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; + +public class BinaryColumnStatsAggregator implements ColumnStatsAggregator{ + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + BinaryColumnStatsData aggregateData = aggregateColStats.getStatsData().getBinaryStats(); + BinaryColumnStatsData newData = newColStats.getStatsData().getBinaryStats(); + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BooleanColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BooleanColumnStatsAggregator.java new file mode 100644 index 0000000..1183ac7 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/BooleanColumnStatsAggregator.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; + +public class BooleanColumnStatsAggregator implements ColumnStatsAggregator { + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + BooleanColumnStatsData aggregateData = aggregateColStats.getStatsData().getBooleanStats(); + BooleanColumnStatsData newData = newColStats.getStatsData().getBooleanStats(); + aggregateData.setNumTrues(aggregateData.getNumTrues() + newData.getNumTrues()); + aggregateData.setNumFalses(aggregateData.getNumFalses() + newData.getNumFalses()); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregator.java new file mode 100644 index 0000000..6b4d9af --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregator.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; + +public interface ColumnStatsAggregator { + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats); +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregatorFactory.java new file mode 100644 index 0000000..682ba19 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/ColumnStatsAggregatorFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; + +public class ColumnStatsAggregatorFactory { + + private ColumnStatsAggregatorFactory() { + } + + public static ColumnStatsAggregator getColumnStatsAggregator(_Fields type) { + switch (type) { + case BOOLEAN_STATS: + return new BooleanColumnStatsAggregator(); + case LONG_STATS: + return new LongColumnStatsAggregator(); + case DOUBLE_STATS: + return new DoubleColumnStatsAggregator(); + case STRING_STATS: + return new StringColumnStatsAggregator(); + case BINARY_STATS: + return new BinaryColumnStatsAggregator(); + case DECIMAL_STATS: + return new DecimalColumnStatsAggregator(); + default: + throw new RuntimeException("Woh, bad. Unknown stats type!"); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DecimalColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DecimalColumnStatsAggregator.java new file mode 100644 index 0000000..c290edc --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DecimalColumnStatsAggregator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Decimal; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; + +public class DecimalColumnStatsAggregator implements ColumnStatsAggregator { + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + DecimalColumnStatsData aggregateData = aggregateColStats.getStatsData().getDecimalStats(); + DecimalColumnStatsData newData = newColStats.getStatsData().getDecimalStats(); + Decimal lowValue = + (aggregateData.getLowValue().compareTo(newData.getLowValue()) > 0) ? aggregateData + .getLowValue() : newData.getLowValue(); + aggregateData.setLowValue(lowValue); + Decimal highValue = + (aggregateData.getHighValue().compareTo(newData.getHighValue()) > 0) ? aggregateData + .getHighValue() : newData.getHighValue(); + aggregateData.setHighValue(highValue); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DoubleColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DoubleColumnStatsAggregator.java new file mode 100644 index 0000000..df46f19 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/DoubleColumnStatsAggregator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; + +public class DoubleColumnStatsAggregator implements ColumnStatsAggregator { + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + DoubleColumnStatsData aggregateData = aggregateColStats.getStatsData().getDoubleStats(); + DoubleColumnStatsData newData = newColStats.getStatsData().getDoubleStats(); + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/LongColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/LongColumnStatsAggregator.java new file mode 100644 index 0000000..714c473 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/LongColumnStatsAggregator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; + +public class LongColumnStatsAggregator implements ColumnStatsAggregator { + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + LongColumnStatsData aggregateData = aggregateColStats.getStatsData().getLongStats(); + LongColumnStatsData newData = newColStats.getStatsData().getLongStats(); + aggregateData.setLowValue(Math.min(aggregateData.getLowValue(), newData.getLowValue())); + aggregateData.setHighValue(Math.max(aggregateData.getHighValue(), newData.getHighValue())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/StringColumnStatsAggregator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/StringColumnStatsAggregator.java new file mode 100644 index 0000000..0a9143b --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/aggregator/StringColumnStatsAggregator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.aggregator; + +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; + +public class StringColumnStatsAggregator implements ColumnStatsAggregator { + + @Override + public void aggregate(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + StringColumnStatsData aggregateData = aggregateColStats.getStatsData().getStringStats(); + StringColumnStatsData newData = newColStats.getStatsData().getStringStats(); + aggregateData.setMaxColLen(Math.max(aggregateData.getMaxColLen(), newData.getMaxColLen())); + aggregateData.setAvgColLen(Math.max(aggregateData.getAvgColLen(), newData.getAvgColLen())); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java new file mode 100644 index 0000000..20af350 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BitVector.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.utils; + +import java.util.Arrays; + +/** + * Barebones fixed length bit vector using a byte array + */ +public class BitVector { + // We'll use this as the bit vector container + private byte data[]; + public static int ELEMENT_SIZE = Byte.SIZE; + + public BitVector(int size) { + data = new byte[size/ELEMENT_SIZE]; + } + + /** + * Total bits -> num elements * size of each element + * + */ + public long getSize() { + return data.length * ELEMENT_SIZE; + } + + /** + * Set the bit at the given index to 1 + * + * @param bitIndex + */ + public void setBit(int bitIndex) { + validateBitIndex(bitIndex); + int dataIndex = bitIndex / ELEMENT_SIZE; + int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1; + // Set the elementIndex'th bit of data[dataIndex]'th element + data[dataIndex] = (byte) (data[dataIndex] | (1 << elementIndex)); + } + + /** + * Set the bit at the given index to 0 + * + * @param bitIndex + */ + public void unSetBit(int bitIndex) { + validateBitIndex(bitIndex); + int dataIndex = bitIndex / ELEMENT_SIZE; + int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1; + // Unset the elementIndex'th bit of data[dataIndex]'th element + data[dataIndex] = (byte) (data[dataIndex] & ~(1 << elementIndex)); + } + + /** + * Check if a bit at the given index is 1 + * @param bitIndex + */ + public boolean isBitSet(int bitIndex) { + validateBitIndex(bitIndex); + int dataIndex = bitIndex / ELEMENT_SIZE; + int elementIndex = ELEMENT_SIZE - bitIndex % ELEMENT_SIZE - 1; + if ((data[dataIndex] & (1 << elementIndex)) > 0) { + return true; + } + return false; + } + + /** + * Set all bits to 0 + * + */ + public void clearAll() { + Arrays.fill(data, (byte) 0x00); + } + + /** + * Set all bits to 1 + * + */ + public void setAll() { + Arrays.fill(data, (byte) 0xFF); + } + + /** + * Prints the bit vector as a string of bit values (e.g. 01010111) + */ + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + for(byte b : data) { + str.append(Integer.toBinaryString((b & 0xFF) + 0x100).substring(1)); + } + return str.toString(); + } + + /** + * Check if queried bitIndex is in valid range + * @param bitIndex + */ + private void validateBitIndex(int bitIndex) { + if ((bitIndex >= getSize()) || (bitIndex < 0)) { + throw new IllegalArgumentException("Bit index out of range: " + bitIndex); + } + } + +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BloomFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BloomFilter.java new file mode 100644 index 0000000..5606596 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/utils/BloomFilter.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.utils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Hash; + +/** + * Barebones bloom filter implementation + */ +public class BloomFilter { + private static final Log LOG = LogFactory.getLog(BloomFilter.class.getName()); + // The cardinality of the set for which we are generating the bloom filter + // Default size is 10000 + private int setSize = 10000; + // The probability of false positives we are ready to tolerate + // Default is 1% + private double falsePositiveProbability = 0.01; + // Number of bits used for the filter + // Formula: -n*ln(p) / (ln(2)^2) + private int numBits; + // Number of hashing functions + // Formula: m/n * ln(2) + private int numHashFunctions; + private final Hash hash; + private BitVector bitVector; + + public BloomFilter(int setSize, double falsePositiveProbability) { + this.setSize = setSize; + this.falsePositiveProbability = falsePositiveProbability; + this.numBits = calculateFilterSize(this.setSize, this.falsePositiveProbability); + this.numHashFunctions = calculateHashFunctions(this.setSize, this.numBits); + // Create a bit vector of size numBits + this.bitVector = new BitVector(numBits); + // Use MurmurHash3 + hash = Hash.getInstance(Hash.MURMUR_HASH3); + } + + /** + * Calculate the number of bits in the filter + * Also align size to BitVector.HOLDER_SIZE + * @param setSize + * @param falsePositiveProbability + * @return numBits + */ + private int calculateFilterSize(int setSize, double falsePositiveProbability) { + int numBits = (int) (-setSize * Math.log(falsePositiveProbability) / (Math.log(2) * Math.log(2))); + numBits = numBits + (BitVector.ELEMENT_SIZE - (numBits % BitVector.ELEMENT_SIZE)); + LOG.info("Bloom Filter size: " + numBits); + return numBits; + } + + /** + * Calculate the number of hash functions needed by the BloomFilter + * @param setSize + * @param numBits + * @return numHashFunctions + */ + private int calculateHashFunctions(int setSize, int numBits) { + int numHashFunctions = Math.max(1, (int) Math.round((double) numBits / setSize * Math.log(2))); + LOG.info("Number of hashing functions: " + numHashFunctions); + return numHashFunctions; + } + + /** + * @return the underlying BitVector object + */ + public BitVector getBitVector() { + return bitVector; + } + + public int getFilterSize() { + return numBits; + } + + + public int getNumHashFunctions() { + return numHashFunctions; + } + + /** + * Add an item to the filter + * + * @param item to add + */ + public void addToFilter(byte[] item) { + int bitIndex; + // Hash the item numHashFunctions times + for (int i = 0; i < numHashFunctions; i++) { + bitIndex = getBitIndex(item, i); + // Set the bit at this index + bitVector.setBit(bitIndex); + } + } + + /** + * Check whether the item is present in the filter + * @param candidate + * @return hasItem (true if the bloom filter contains the item) + */ + public boolean contains(byte[] item) { + int bitIndex; + boolean hasItem = true; + // Hash the item numHashFunctions times + for (int i = 0; i < numHashFunctions; i++) { + bitIndex = getBitIndex(item, i); + hasItem = hasItem && bitVector.isBitSet(bitIndex); + if (!hasItem) { + return hasItem; + } + } + return hasItem; + } + + /** + * Hash the item using the i as the seed and return its potential position in the bit vector + * Also we negate a negative hash value + * + * @param item + * @param i (for the i-th hash function) + * @return position of item in unerlying bit vector + */ + private int getBitIndex(byte[] item, int i) { + return Math.abs(hash.hash(item, i) % (numBits)); + } +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestAggregateStatsCache.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestAggregateStatsCache.java new file mode 100644 index 0000000..3ae6d1e --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestAggregateStatsCache.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.hbase.AggregateStatsCache.AggrColStats; +import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestAggregateStatsCache { + static int MAX_CACHE_NODES = 10; + static int MAX_PARTITIONS_PER_CACHE_NODE = 10; + static String TIME_TO_LIVE = "20s"; + static float FALSE_POSITIVE_PROBABILITY = (float) 0.01; + static float MAX_VARIANCE = (float) 0.1; + static AggregateStatsCache cache; + static String dbName = "db"; + static String tablePrefix = "tab"; + static String partitionPrefix = "part"; + static String columnPrefix = "col"; + static int numTables = 2; + static int numPartitions = 20; + static int numColumns = 5; + static List tables = new ArrayList(); + static List tabParts = new ArrayList(); + static List tabCols = new ArrayList(); + + @BeforeClass + public static void beforeTest() { + // All data intitializations + initializeTables(); + initializePartitions(); + initializeColumns(); + } + + private static void initializeTables() { + for (int i = 1; i <= numTables; i++) { + // tab1, tab2 + tables.add(tablePrefix + i); + } + } + + private static void initializePartitions() { + for (int i = 1; i <= numPartitions; i++) { + // part1 ... part20 + tabParts.add(partitionPrefix + i); + } + } + + private static void initializeColumns() { + for (int i = 1; i <= numColumns; i++) { + // part1 ... part20 + tabCols.add(columnPrefix + i); + } + } + + @AfterClass + public static void afterTest() { + } + + @Before + public void setUp() { + HiveConf hiveConf = new HiveConf(); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_SIZE, + MAX_CACHE_NODES); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS, + MAX_PARTITIONS_PER_CACHE_NODE); + hiveConf.setFloatVar( + HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_FALSE_POSITIVE_PROBABILITY, + FALSE_POSITIVE_PROBABILITY); + hiveConf.setFloatVar(HiveConf.ConfVars.METASTORE_HBASE_AGGREGATE_STATS_CACHE_MAX_VARIANCE, + MAX_VARIANCE); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_HBASE_CACHE_TIME_TO_LIVE, TIME_TO_LIVE); + cache = AggregateStatsCache.getInstance(hiveConf); + + } + + @After + public void tearDown() { + } + + @Test + public void testBasicAddAndGet() { + // Add a dummy aggregate stats object for parts 1-9 of tab1 for col1 + int minPart = 1; + int maxPart = 9; + String tblName = tables.get(0); + String colName = tabCols.get(0); + ColumnStatisticsObj aggrColStats = getDummyLongColStat(colName); + // Prepare the bloom filter + BloomFilter bloomFilter = + new BloomFilter(MAX_PARTITIONS_PER_CACHE_NODE, FALSE_POSITIVE_PROBABILITY); + List partNames = new ArrayList(); + for (int i = minPart; i <= maxPart; i++) { + String partName = tabParts.get(i); + partNames.add(partName); + bloomFilter.addToFilter(partName.getBytes()); + } + // Now add to cache + cache.add(dbName, tblName, colName, maxPart-minPart+1, aggrColStats, bloomFilter); + // Now get from cache + AggrColStats aggrStatsCached = cache.get(dbName, tblName, colName, partNames); + Assert.assertNotNull(aggrStatsCached); + ColumnStatisticsObj aggrColStatsCached = aggrStatsCached.getColStats(); + Assert.assertEquals(aggrColStats, aggrColStatsCached); + // Now get a non-existant entry + aggrStatsCached = cache.get("dbNotThere", tblName, colName, partNames); + Assert.assertNull(aggrStatsCached); + } + + @Test + public void testAddGetWithVariance() { + // Add a dummy aggregate stats object for parts 1-9 of tab1 for col1 + int minPart = 1; + int maxPart = 9; + String tblName = tables.get(0); + String colName = tabCols.get(0); + ColumnStatisticsObj aggrColStats = getDummyLongColStat(colName); + // Prepare the bloom filter + BloomFilter bloomFilter = + new BloomFilter(MAX_PARTITIONS_PER_CACHE_NODE, FALSE_POSITIVE_PROBABILITY); + // The paritions we'll eventually request from the cache + List partNames = new ArrayList(); + for (int i = minPart; i <= maxPart; i++) { + String partName = tabParts.get(i); + // Only add 50% partitions to partnames so that we can see if the request fails + if (i < maxPart / 2) { + partNames.add(partName); + } + bloomFilter.addToFilter(partName.getBytes()); + } + // Now add to cache + cache.add(dbName, tblName, colName, maxPart-minPart+1, aggrColStats, bloomFilter); + // Now get from cache + AggrColStats aggrStatsCached = cache.get(dbName, tblName, colName, partNames); + Assert.assertNull(aggrStatsCached); + } + + @Test + public void testMultiThreaded() { + } + + private ColumnStatisticsObj getDummyLongColStat(String colName) { + ColumnStatisticsObj aggrColStats = new ColumnStatisticsObj(); + aggrColStats.setColName(colName); + aggrColStats.setColType("long"); + LongColumnStatsData longStatsData = new LongColumnStatsData(); + // Set some random values + int highVal = 100; + int lowVal = 10; + int numDVs = 50; + int numNulls = 5; + longStatsData.setHighValue(highVal); + longStatsData.setLowValue(lowVal); + longStatsData.setNumDVs(numDVs); + longStatsData.setNumNulls(numNulls); + ColumnStatisticsData aggrColStatsData = new ColumnStatisticsData(); + aggrColStatsData.setLongStats(longStatsData); + aggrColStats.setStatsData(aggrColStatsData); + return aggrColStats; + } +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestStatsCache.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestStatsCache.java deleted file mode 100644 index fb6e573..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestStatsCache.java +++ /dev/null @@ -1,683 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hive.metastore.hbase; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; -import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class TestStatsCache { - private static final Log LOG = LogFactory.getLog(TestStatsCache.class.getName()); - - @Mock HTableInterface htable; - static Put[] puts = new Put[2]; - HBaseReadWrite hrw; - - @Before - public void before() throws IOException { - MockitoAnnotations.initMocks(this); - - // For reasons I don't understand we have to do the mockito setup here in before, so we allow - // each method to place one put in puts[], and then we return that. - Mockito.when(htable.get(Mockito.any(Get.class))).thenAnswer(new Answer() { - @Override - public Result answer(InvocationOnMock invocation) throws Throwable { - List cells = new ArrayList(); - if (puts[0] == null) return new Result(); - for (Cell cell : puts[0].getFamilyCellMap().firstEntry().getValue()) { - cells.add(cell); - } - return Result.create(cells); - } - }); - - Mockito.when(htable.get(Mockito.anyList())).thenAnswer(new Answer() { - @Override - public Result[] answer(InvocationOnMock invocation) throws Throwable { - Result[] results = new Result[2]; - for (int i = 0; i < 2; i++) { - List cells = new ArrayList(); - if (puts[i] == null) { - results[i] = new Result(); - } else { - for (Cell cell : puts[i].getFamilyCellMap().firstEntry().getValue()) { - cells.add(cell); - } - results[i] = Result.create(cells); - } - } - return results; - } - }); - - HBaseConnection hconn = Mockito.mock(HBaseConnection.class); - Mockito.when(hconn.getHBaseTable(Mockito.anyString())).thenReturn(htable); - HiveConf conf = new HiveConf(); - conf.setIntVar(HiveConf.ConfVars.METASTORE_HBASE_CACHE_SIZE, 30); - conf.setVar(HiveConf.ConfVars.METASTORE_HBASE_CONNECTION_CLASS, HBaseReadWrite.TEST_CONN); - HBaseReadWrite.setTestConnection(hconn); - hrw = HBaseReadWrite.getInstance(conf); - StatsCache.getInstance(conf).clear(); - puts[0] = puts[1] = null; - } - - @Test - public void tableAllHit() throws IOException { - String dbName = "default"; - String tableName = "mytable"; - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col3"); - obj.setColType("double"); - data = new ColumnStatisticsData(); - data.setDoubleStats(new DoubleColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - // Now, ask for all 3 of these. We should hit all on the cache. We'll know if we don't - // because we've mocked hbase and it will return null on the get. - cs = hrw.getTableStatistics(dbName, tableName, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(now, cs.getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, cs.getStatsDesc().getDbName()); - Assert.assertEquals(tableName, cs.getStatsDesc().getTableName()); - Assert.assertTrue(cs.getStatsDesc().isIsTblLevel()); - - // There's no need to check every last field in each obj, as the objects aren't de/serialized - // in the cache. Just make sure we found the objects we expected. - Assert.assertEquals(3, cs.getStatsObjSize()); - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col3")) { - Assert.assertEquals(ColumnStatisticsData._Fields.DOUBLE_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - - @Test - public void tableAllMiss() throws IOException { - String dbName = "default"; - String tableName = "misstable"; - long now = System.currentTimeMillis(); - - // Build a column stats object to return from mockito hbase - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - byte[] serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - - // The easiest way to get this into hbase format is to shove it into a put and then pull out - // the result for mockito to return. - Put put = new Put(HBaseUtils.buildKey(dbName, tableName)); - put.add(HBaseReadWrite.STATS_CF, "col1".getBytes(HBaseUtils.ENCODING), serialized); - - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - put.add(HBaseReadWrite.STATS_CF, "col2".getBytes(HBaseUtils.ENCODING), serialized); - puts[0] = put; - - // Now, ask for all 3 of these. We should miss all on the cache. - cs = hrw.getTableStatistics(dbName, tableName, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(now, cs.getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, cs.getStatsDesc().getDbName()); - Assert.assertEquals(tableName, cs.getStatsDesc().getTableName()); - Assert.assertTrue(cs.getStatsDesc().isIsTblLevel()); - - // There's no need to check every last field in each obj, as the objects aren't de/serialized - // in the cache. Just make sure we found the objects we expected. - Assert.assertEquals(2, cs.getStatsObjSize()); - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - - @Test - public void tableSomeHit() throws IOException { - String dbName = "default"; - String tableName = "sometable"; - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col3"); - obj.setColType("double"); - data = new ColumnStatisticsData(); - data.setDoubleStats(new DoubleColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - Put put = new Put(HBaseUtils.buildKey(dbName, tableName)); - byte[] serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - put.add(HBaseReadWrite.STATS_CF, "col3".getBytes(HBaseUtils.ENCODING), serialized); - puts[0] = put; - - // Now, ask for all 3 of these. We should hit the first two on the cache and the third from - // the get - cs = hrw.getTableStatistics(dbName, tableName, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(now, cs.getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, cs.getStatsDesc().getDbName()); - Assert.assertEquals(tableName, cs.getStatsDesc().getTableName()); - Assert.assertTrue(cs.getStatsDesc().isIsTblLevel()); - - // There's no need to check every last field in each obj, as the objects aren't de/serialized - // in the cache. Just make sure we found the objects we expected. - Assert.assertEquals(3, cs.getStatsObjSize()); - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col3")) { - Assert.assertEquals(ColumnStatisticsData._Fields.DOUBLE_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - - @Test - public void tableTimeout() throws Exception { - String dbName = "default"; - String tableName = "timeouttable"; - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - StatsCache.getInstance(null).makeWayOld(); - - // Now, ask for all 3 of these. We should hit all on the cache. We'll know if we don't - // because we've mocked hbase and it will return null on the get. - cs = hrw.getTableStatistics(dbName, tableName, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(0, cs.getStatsObjSize()); - } - - @Test - public void partAllHit() throws IOException { - String dbName = "default"; - String tableName = "partallhit"; - List partVals1 = Arrays.asList("today"); - List partVals2 = Arrays.asList("yesterday"); - List partNames = Arrays.asList("ds=today", "ds=yeserday"); - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(0)); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, partNames.get(0), partVals1, cs); - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(0)); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, partNames.get(0), partVals1, cs); - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(1)); - cs.setStatsDesc(desc); - cs.addToStatsObj(obj); - - obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, partNames.get(1), partVals2, cs); - - // Now, ask for all 3 of these. We should hit all on the cache. We'll know if we don't - // because we've mocked hbase and it will return null on the get. - List results = hrw.getPartitionStatistics(dbName, tableName, - partNames, Arrays.asList(partVals1, partVals2), Arrays.asList("col1", "col2")); - - Assert.assertEquals(2, results.size()); - for (int i = 0; i < results.size(); i++) { - Assert.assertEquals(now, results.get(i).getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, results.get(i).getStatsDesc().getDbName()); - Assert.assertEquals(tableName, results.get(i).getStatsDesc().getTableName()); - Assert.assertFalse(cs.getStatsDesc().isIsTblLevel()); - - Assert.assertEquals(partNames.get(i), results.get(i).getStatsDesc().getPartName()); - Assert.assertEquals(2, results.get(i).getStatsObjSize()); - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - } - - @Test - public void partAllMiss() throws IOException { - String dbName = "default"; - String tableName = "misspart"; - List> partVals = Arrays.asList(Arrays.asList("today"), Arrays.asList("yesterday")); - List partNames = Arrays.asList("ds=today", "ds=yeserday"); - long now = System.currentTimeMillis(); - - // Build a column stats object to return from mockito hbase - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(0)); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - byte[] serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - - // The easiest way to get this into hbase format is to shove it into a put and then pull out - // the result for mockito to return. - Put put = new Put(HBaseUtils.buildKey(dbName, tableName, partNames.get(0))); - put.add(HBaseReadWrite.STATS_CF, "col1".getBytes(HBaseUtils.ENCODING), serialized); - puts[0] = put; - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(1)); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - put = new Put(HBaseUtils.buildKey(dbName, tableName, partNames.get(1))); - put.add(HBaseReadWrite.STATS_CF, "col2".getBytes(HBaseUtils.ENCODING), serialized); - puts[1] = put; - - List results = hrw.getPartitionStatistics(dbName, tableName, partNames, - partVals, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(2, results.size()); - for (int i = 0; i < results.size(); i++) { - Assert.assertEquals(now, results.get(i).getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, results.get(i).getStatsDesc().getDbName()); - Assert.assertEquals(tableName, results.get(i).getStatsDesc().getTableName()); - Assert.assertEquals(partNames.get(i), results.get(i).getStatsDesc().getPartName()); - Assert.assertFalse(cs.getStatsDesc().isIsTblLevel()); - - Assert.assertEquals(1, results.get(i).getStatsObjSize()); - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - } - - @Test - public void partSomeHit() throws IOException { - String dbName = "default"; - String tableName = "partialpart"; - List> partVals = Arrays.asList(Arrays.asList("today"), Arrays.asList("yesterday")); - List partNames = Arrays.asList("ds=today", "ds=yeserday"); - long now = System.currentTimeMillis(); - - // Build a column stats object to return from mockito hbase - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(0)); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - byte[] serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - - // The easiest way to get this into hbase format is to shove it into a put and then pull out - // the result for mockito to return. - Put put = new Put(HBaseUtils.buildKey(dbName, tableName, partNames.get(0))); - put.add(HBaseReadWrite.STATS_CF, "col1".getBytes(HBaseUtils.ENCODING), serialized); - puts[0] = put; - - - // col2 partition 1 goes into the cache - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(0)); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, partNames.get(0), partVals.get(0), cs); - - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partNames.get(1)); - cs.setStatsDesc(desc); - obj = new ColumnStatisticsObj(); - obj.setColName("col2"); - obj.setColType("long"); - data = new ColumnStatisticsData(); - data.setLongStats(new LongColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - serialized = HBaseUtils.serializeStatsForOneColumn(cs, obj); - put = new Put(HBaseUtils.buildKey(dbName, tableName, partNames.get(1))); - put.add(HBaseReadWrite.STATS_CF, "col2".getBytes(HBaseUtils.ENCODING), serialized); - puts[1] = put; - - // Now, ask for all 3 of these. We should miss all on the cache. - List results = hrw.getPartitionStatistics(dbName, tableName, partNames, - partVals, Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(2, results.size()); - Assert.assertEquals(2, results.get(0).getStatsObjSize()); - Assert.assertEquals(1, results.get(1).getStatsObjSize()); - - for (int i = 0; i < results.size(); i++) { - Assert.assertEquals(now, results.get(i).getStatsDesc().getLastAnalyzed()); - Assert.assertEquals(dbName, results.get(i).getStatsDesc().getDbName()); - Assert.assertEquals(tableName, results.get(i).getStatsDesc().getTableName()); - Assert.assertEquals(partNames.get(i), results.get(i).getStatsDesc().getPartName()); - Assert.assertFalse(cs.getStatsDesc().isIsTblLevel()); - - for (ColumnStatisticsObj csobj : cs.getStatsObj()) { - if (csobj.getColName().equals("col1")) { - Assert.assertEquals(ColumnStatisticsData._Fields.BOOLEAN_STATS, - csobj.getStatsData().getSetField()); - } else if (csobj.getColName().equals("col2")) { - Assert.assertEquals(ColumnStatisticsData._Fields.LONG_STATS, - csobj.getStatsData().getSetField()); - } else { - Assert.fail("Unknown column"); - } - } - } - } - - @Test - public void partTimeout() throws Exception { - String dbName = "default"; - String tableName = "timeoutpart"; - String partName = "ds=today"; - List partVals = Arrays.asList("today"); - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); - desc.setLastAnalyzed(now); - desc.setPartName(partName); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col1"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, partName, partVals, cs); - - StatsCache.getInstance(null).makeWayOld(); - - // Because our mocked 'get' always returns two results, I have to pass two part names, even - // though both will return nothing. - List results = hrw.getPartitionStatistics(dbName, tableName, - Arrays.asList(partName, "fred"), Arrays.asList(partVals, Arrays.asList("fred")), - Arrays.asList("col1", "col2", "col3")); - - Assert.assertEquals(2, results.size()); - Assert.assertEquals(0, results.get(0).getStatsObjSize()); - Assert.assertEquals(0, results.get(1).getStatsObjSize()); - } - - @Test - public void cleaning() throws Exception { - String dbName = "default"; - String tableName = "cleaning"; - long now = System.currentTimeMillis(); - - ColumnStatistics cs = new ColumnStatistics(); - ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - for (int i = 0; i < 15; i++) { - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col" + i); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - } - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - Assert.assertEquals(15, StatsCache.getInstance(null).cacheSize()); - - StatsCache.getInstance(null).makeWayOld(); - - // Put one more in. This should throw it over the edge and cause it to clean. - cs = new ColumnStatistics(); - desc = new ColumnStatisticsDesc(true, dbName, tableName); - desc.setLastAnalyzed(now); - cs.setStatsDesc(desc); - ColumnStatisticsObj obj = new ColumnStatisticsObj(); - obj.setColName("col16"); - obj.setColType("boolean"); - ColumnStatisticsData data = new ColumnStatisticsData(); - data.setBooleanStats(new BooleanColumnStatsData()); - obj.setStatsData(data); - cs.addToStatsObj(obj); - - hrw.updateStatistics(dbName, tableName, null, null, cs); - - while (StatsCache.getInstance(null).cleaning()) { - Thread.sleep(250); - } - - Assert.assertEquals(1, StatsCache.getInstance(null).cacheSize()); - } - // TODO test cleaning - -} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBitVector.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBitVector.java new file mode 100644 index 0000000..559fbad --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBitVector.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.utils; + +import org.apache.hadoop.hive.metastore.hbase.utils.BitVector; +import org.junit.Assert; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBitVector { + + static int BIT_VECTOR_SIZE = 32; + BitVector bitVector; + + @BeforeClass + public static void beforeTest() { + } + + @AfterClass + public static void afterTest() { + } + + @Before + public void setUp() { + // Create a new BitVector + bitVector = new BitVector(BIT_VECTOR_SIZE); + } + + + @After + public void tearDown() { + } + + @Test + public void testSetAll() { + // Set bits + bitVector.setAll(); + Assert.assertEquals("11111111111111111111111111111111", bitVector.toString()); + } + + @Test + public void testClearAll() { + // Clear all bits + bitVector.clearAll(); + Assert.assertEquals("00000000000000000000000000000000", bitVector.toString()); + } + + @Test + public void testSetUnsetBit() { + // Set 3rd bit + bitVector.setBit(2); + Assert.assertEquals("00100000000000000000000000000000", bitVector.toString()); + // Now check if 3rd bit is set + Assert.assertTrue(bitVector.isBitSet(2)); + // Now set 30th bit + bitVector.setBit(29); + Assert.assertEquals("00100000000000000000000000000100", bitVector.toString()); + // Now check if 30th bit is set + Assert.assertTrue(bitVector.isBitSet(29)); + + // Now unset 3rd bit + bitVector.unSetBit(2); + Assert.assertEquals("00000000000000000000000000000100", bitVector.toString()); + // Now check if 3rd bit is unset + Assert.assertFalse(bitVector.isBitSet(2)); + // Now unset 30th bit + bitVector.unSetBit(29); + Assert.assertEquals("00000000000000000000000000000000", bitVector.toString()); + // Now check if 30th bit is unset + Assert.assertFalse(bitVector.isBitSet(29)); + } +} diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBloomFilter.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBloomFilter.java new file mode 100644 index 0000000..5175efd --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/utils/TestBloomFilter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.utils; + +import org.apache.hadoop.hive.metastore.hbase.utils.BloomFilter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBloomFilter { + static final int SET_SIZE = 50; + static final double FALSE_POSITIVE_PROBABILITY = 0.01; + // Pre-calculated for the above set size and fpp + static final int FILTER_SIZE = 480; + static final int NUM_HASH_FUNCTIONS = 7; + BloomFilter bloomFilter; + // Items that we'll add to the filter + String[] items = {"Part1=Val1", "Part2=Val2", "Part3=Val3", "Part4=Val4", "Part5=Val5"}; + + @BeforeClass + public static void beforeTest() { + } + + @AfterClass + public static void afterTest() { + } + + @Before + public void setUp() { + bloomFilter = new BloomFilter(SET_SIZE, FALSE_POSITIVE_PROBABILITY); + } + + @After + public void tearDown() { + } + + @Test + public void testFilterAndHashSize() { + Assert.assertEquals(bloomFilter.getFilterSize(), FILTER_SIZE); + Assert.assertEquals(bloomFilter.getNumHashFunctions(), NUM_HASH_FUNCTIONS); + } + + @Test + public void testFilterFunctions() { + // Add all items to the bloom filter + // (since bloom filter returns false positives, no point testing for negative cases) + for (String item: items) { + bloomFilter.addToFilter(item.getBytes()); + } + // Test for presence + for (String item: items) { + Assert.assertTrue(bloomFilter.contains(item.getBytes())); + } + // Clear all bits + bloomFilter.getBitVector().clearAll(); + // Test for presence now - should fail + for (String item: items) { + Assert.assertFalse(bloomFilter.contains(item.getBytes())); + } + } + +} diff --git a/pom.xml b/pom.xml index c147d45..7fdef60 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ 3.4 1.7.5 0.8.0.RELEASE - 1.0.0-incubating-SNAPSHOT + 1.0.0-incubating 3.2.6 3.2.10 3.2.9