commit e6eb4a0bfd82d059a9df5b1c81807a4ff2b3a215 Author: stack Date: Thu Nov 3 16:17:42 2016 -0700 Hacked tinylru diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 16c8849..358354a 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -750,6 +750,11 @@ possible configurations would overwhelm and obscure the important. The default thread pool size if parallel-seeking feature enabled. + hfile.block.cache.policy + LRU + The eviction policy for the L1 block cache (LRU or TinyLFU). + + hfile.block.cache.size 0.4 Percentage of maximum heap (-Xmx setting) to allocate to block cache diff --git a/hbase-resource-bundle/src/main/resources/supplemental-models.xml b/hbase-resource-bundle/src/main/resources/supplemental-models.xml index d6237d0..9828ec6 100644 --- a/hbase-resource-bundle/src/main/resources/supplemental-models.xml +++ b/hbase-resource-bundle/src/main/resources/supplemental-models.xml @@ -152,6 +152,20 @@ under the License. + com.github.ben-manes.caffeine + caffeine + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + com.lmax disruptor @@ -1668,7 +1682,7 @@ Mozilla Public License Version 2.0 means any form of the work other than Source Code Form. 1.7. "Larger Work" - means a work that combines Covered Software with other material, in + means a work that combines Covered Software with other material, in a separate file or files, that is not Covered Software. 1.8. "License" diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index c1a5329..60603ce 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -359,6 +359,10 @@ + com.github.ben-manes.caffeine + caffeine + + org.apache.hbase hbase-common diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 8fe81eb..0edbed3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.util.concurrent.ForkJoinPool; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +46,12 @@ public class CacheConfig { private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName()); /** + * Configuration key to cache block policy (Lru, TinyLfu). + */ + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy"; + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; + + /** * Configuration key to cache data blocks on write. There are separate * switches for bloom blocks and non-root index blocks. */ @@ -91,7 +98,7 @@ public class CacheConfig { * is an in-memory map that needs to be persisted across restarts. Where to store this * in-memory state is what you supply here: e.g. /tmp/bucketcache.map. */ - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; /** @@ -99,11 +106,11 @@ public class CacheConfig { * as indices and blooms are kept in the lru blockcache and the data blocks in the * bucket cache). */ - public static final String BUCKET_CACHE_COMBINED_KEY = + public static final String BUCKET_CACHE_COMBINED_KEY = "hbase.bucketcache.combinedcache.enabled"; public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; /** @@ -148,6 +155,7 @@ public class CacheConfig { memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache"); // TODO(eclark): Consider more. Redis, etc. Class clazz; + @SuppressWarnings("unchecked") ExternalBlockCaches(String clazzName) { try { clazz = (Class) Class.forName(clazzName); @@ -442,7 +450,9 @@ public class CacheConfig { * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise */ public boolean shouldCacheCompressed(BlockCategory category) { - if (!isBlockCacheEnabled()) return false; + if (!isBlockCacheEnabled()) { + return false; + } switch (category) { case DATA: return this.cacheDataCompressed; @@ -523,26 +533,51 @@ public class CacheConfig { */ // Clear this if in tests you'd make more than one block cache instance. @VisibleForTesting - static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; /** Boolean whether we have disabled the block cache entirely. */ @VisibleForTesting static boolean blockCacheDisabled = false; - static long getLruCacheSize(final Configuration conf, final MemoryUsage mu) { +// static long getLruCacheSize(final Configuration conf, final MemoryUsage mu) { + static long getFirstLevelCacheSize(final Configuration conf, final long xmx) { +// private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { + /* + if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; + if (blockCacheDisabled) return null; + if (cacheSize < 0) return null; + + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); + LOG.info("Allocating BlockCache size=" + + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); + + if (policy.equalsIgnoreCase("LRU")) { + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); + } else if (policy.equalsIgnoreCase("TinyLFU")) { + GLOBAL_L1_CACHE_INSTANCE = new TinyLfuBlockCache( + cacheSize, blockSize, ForkJoinPool.commonPool(), c); + } else { + throw new IllegalArgumentException("Unknown policy: " + policy); + } + return GLOBAL_L1_CACHE_INSTANCE; + */ float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, - HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); if (cachePercentage <= 0.0001f) { - blockCacheDisabled = true; - return -1; + blockCacheDisabled = true; + return -1; + } if (cachePercentage > 1.0) { - throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + - " must be between 0.0 and 1.0, and not > 1.0"); + throw new IllegalArgumentException(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + + " must be between 0.0 and 1.0, and not > 1.0"); + } - // Calculate the amount of heap to give the heap. - return (long) (mu.getMax() * cachePercentage); + // Calculate the amount of heap to give the heap. + return (long) (xmx * cachePercentage); + } /** @@ -550,6 +585,11 @@ public class CacheConfig { * @param mu JMX Memory Bean * @return An L1 instance. Currently an instance of LruBlockCache. */ + public static FirstLevelBlockCache getL1(final Configuration c) { + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + long l1CacheSize = getFirstLevelCacheSize(c, xmx); + return getL1(l1CacheSize, c); + /* private static LruBlockCache getL1(final Configuration c, final MemoryUsage mu) { long lruCacheSize = getLruCacheSize(c, mu); if (lruCacheSize < 0) return null; @@ -557,6 +597,33 @@ public class CacheConfig { LOG.info("Allocating LruBlockCache size=" + StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); return new LruBlockCache(lruCacheSize, blockSize, true, c); + */ + } + + private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) { + if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; + if (blockCacheDisabled) return null; + if (cacheSize < 0) return null; + + String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); + LOG.info("Allocating BlockCache size=" + + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); + + if (policy.equalsIgnoreCase("LRU")) { + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c); + + } else if (policy.equalsIgnoreCase("TinyLFU")) { + GLOBAL_L1_CACHE_INSTANCE = new TinyLfuBlockCache( + cacheSize, blockSize, ForkJoinPool.commonPool(), c + ); + + } else { + throw new IllegalArgumentException("Unknown policy: " + policy); + + } + return GLOBAL_L1_CACHE_INSTANCE; + } /** @@ -582,7 +649,7 @@ public class CacheConfig { } private static BlockCache getExternalBlockcache(Configuration c) { - Class klass = null; + Class klass = null; // Get the class, from the config. s try { @@ -610,7 +677,9 @@ public class CacheConfig { private static BlockCache getBucketCache(Configuration c, MemoryUsage mu) { // Check for L2. ioengine name must be non-null. String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { + return null; + } int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); @@ -660,34 +729,44 @@ public class CacheConfig { * @return The block cache or null. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; - if (blockCacheDisabled) return null; - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - LruBlockCache l1 = getL1(conf, mu); - // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. - if (blockCacheDisabled) return null; - BlockCache l2 = getL2(conf, mu); + if (GLOBAL_L1_CACHE_INSTANCE != null) { + return GLOBAL_L1_CACHE_INSTANCE; + } + if (blockCacheDisabled) { + return null; + } + // blockCacheDisabled is set as a side-effect of getFirstLevelCacheSize() + // so check it again after the call + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + long l1CacheSize = getFirstLevelCacheSize(conf, xmx); + if (blockCacheDisabled) { + return null; + } + BlockCache l2 = null; // getL2(conf, xmx); + + FirstLevelBlockCache l1 = getL1(l1CacheSize, conf); if (l2 == null) { - GLOBAL_BLOCK_CACHE_INSTANCE = l1; + GLOBAL_L1_CACHE_INSTANCE = l1; } else { boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); if (useExternal) { - GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); + // GLOBAL_L1_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { - if (combinedWithLru) { - GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); + if (combinedWithL1) { + // GLOBAL_L1_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler // mechanism. It is a little ugly but works according to the following: when the // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get // a block from the L1 cache, if not in L1, we will search L2. - GLOBAL_BLOCK_CACHE_INSTANCE = l1; + // GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } - l1.setVictimCache(l2); + // l1.setVictimCache(l2); + throw new RuntimeException("SHOULD NOT BE HERE"); } - return GLOBAL_BLOCK_CACHE_INSTANCE; + return GLOBAL_L1_CACHE_INSTANCE; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 4a1c2c7..0b24f6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -28,24 +28,24 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; /** * CombinedBlockCache is an abstraction layer that combines - * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used + * {@link FirstLevelBlockCache} and {@link BucketCache}. The smaller lruCache is used * to cache bloom blocks and index blocks. The larger l2Cache is used to * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads - * first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted - * from lruCache are put into the bucket cache. + * first from the smaller l1Cache before looking for the block in the l2Cache. Blocks evicted + * from l1Cache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. - * + * */ @InterfaceAudience.Private public class CombinedBlockCache implements ResizableBlockCache, HeapSize { - protected final LruBlockCache lruCache; + protected final FirstLevelBlockCache l1Cache; protected final BlockCache l2Cache; protected final CombinedCacheStats combinedCacheStats; - public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) { - this.lruCache = lruCache; + public CombinedBlockCache(FirstLevelBlockCache l1Cache, BlockCache l2Cache) { + this.l1Cache = l1Cache; this.l2Cache = l2Cache; - this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), + this.combinedCacheStats = new CombinedCacheStats(l1Cache.getStats(), l2Cache.getStats()); } @@ -55,7 +55,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { if (l2Cache instanceof HeapSize) { l2size = ((HeapSize) l2Cache).heapSize(); } - return lruCache.heapSize() + l2size; + return l1Cache.heapSize() + l2size; } @Override @@ -63,7 +63,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { final boolean cacheDataInL1) { boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; if (metaBlock || cacheDataInL1) { - lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); + l1Cache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); } else { l2Cache.cacheBlock(cacheKey, buf, inMemory, false); } @@ -79,19 +79,19 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { boolean repeat, boolean updateCacheMetrics) { // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock // we end up calling l2Cache.getBlock. - return lruCache.containsBlock(cacheKey)? - lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics): + return l1Cache.containsBlock(cacheKey)? + l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics): l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } @Override public boolean evictBlock(BlockCacheKey cacheKey) { - return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); + return l1Cache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); } @Override public int evictBlocksByHfileName(String hfileName) { - return lruCache.evictBlocksByHfileName(hfileName) + return l1Cache.evictBlocksByHfileName(hfileName) + l2Cache.evictBlocksByHfileName(hfileName); } @@ -102,28 +102,28 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public void shutdown() { - lruCache.shutdown(); + l1Cache.shutdown(); l2Cache.shutdown(); } @Override public long size() { - return lruCache.size() + l2Cache.size(); + return l1Cache.size() + l2Cache.size(); } @Override public long getFreeSize() { - return lruCache.getFreeSize() + l2Cache.getFreeSize(); + return l1Cache.getFreeSize() + l2Cache.getFreeSize(); } @Override public long getCurrentSize() { - return lruCache.getCurrentSize() + l2Cache.getCurrentSize(); + return l1Cache.getCurrentSize() + l2Cache.getCurrentSize(); } @Override public long getBlockCount() { - return lruCache.getBlockCount() + l2Cache.getBlockCount(); + return l1Cache.getBlockCount() + l2Cache.getBlockCount(); } public static class CombinedCacheStats extends CacheStats { @@ -308,7 +308,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { lruCacheStats.rollMetricsPeriod(); bucketCacheStats.rollMetricsPeriod(); } - + @Override public long getFailedInserts() { return lruCacheStats.getFailedInserts() + bucketCacheStats.getFailedInserts(); @@ -319,13 +319,13 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { return lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats.getSumHitCountsPastNPeriods(); } - + @Override public long getSumRequestCountsPastNPeriods() { return lruCacheStats.getSumRequestCountsPastNPeriods() + bucketCacheStats.getSumRequestCountsPastNPeriods(); } - + @Override public long getSumHitCachingCountsPastNPeriods() { return lruCacheStats.getSumHitCachingCountsPastNPeriods() @@ -346,11 +346,11 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public BlockCache[] getBlockCaches() { - return new BlockCache [] {this.lruCache, this.l2Cache}; + return new BlockCache [] {this.l1Cache, this.l2Cache}; } @Override public void setMaxSize(long size) { - this.lruCache.setMaxSize(size); + this.l1Cache.setMaxSize(size); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java new file mode 100644 index 0000000..6becd0e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; + +/** + * In-memory BlockCache that may be backed by secondary layer(s). + */ +@InterfaceAudience.Private +public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize { + + /** + * Whether the cache contains the block with specified cacheKey + * + * @param cacheKey + * @return true if it contains the block + */ + boolean containsBlock(BlockCacheKey cacheKey); + + /** + * Specifies the secondary cache. An entry that is evicted from this cache due to a size + * constraint will be inserted into the victim cache. + * + * @param victimCache the second level cache + * @throws IllegalArgumentException if the victim cache had already been set + */ + void setVictimCache(BlockCache victimCache); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java index 667e7b4..160714b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache { - public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) { + public InclusiveCombinedBlockCache(FirstLevelBlockCache l1, BlockCache l2) { super(l1,l2); } @@ -34,7 +34,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B // On all external cache set ups the lru should have the l2 cache set as the victimHandler // Because of that all requests that miss inside of the lru block cache will be // tried in the l2 block cache. - return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + return l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } /** @@ -50,7 +50,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B final boolean cacheDataInL1) { // This is the inclusive part of the combined block cache. // Every block is placed into both block caches. - lruCache.cacheBlock(cacheKey, buf, inMemory, true); + l1Cache.cacheBlock(cacheKey, buf, inMemory, true); // This assumes that insertion into the L2 block cache is either async or very fast. l2Cache.cacheBlock(cacheKey, buf, inMemory, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index f427e04..63aa06e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static java.util.Objects.requireNonNull; + import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.EnumMap; @@ -97,7 +99,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ @InterfaceAudience.Private @JsonIgnoreProperties({"encodingCountsForTest"}) -public class LruBlockCache implements ResizableBlockCache, HeapSize { +public class LruBlockCache implements FirstLevelBlockCache { private static final Log LOG = LogFactory.getLog(LruBlockCache.class); @@ -240,8 +242,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { DEFAULT_MEMORY_FACTOR, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, - DEFAULT_MAX_BLOCK_SIZE - ); + DEFAULT_MAX_BLOCK_SIZE); } public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { @@ -256,8 +257,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), - conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) - ); + conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); } public LruBlockCache(long maxSize, long blockSize, Configuration conf) { @@ -324,6 +324,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } @Override + public void setVictimCache(BlockCache victimCache) { + if (victimHandler != null) { + throw new IllegalArgumentException("The victim cache has already been set"); + } + victimHandler = requireNonNull(victimCache); + } + + @Override public void setMaxSize(long maxSize) { this.maxSize = maxSize; if(this.size.get() > acceptableSize() && !evictionInProgress) { @@ -436,6 +444,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param cacheKey block's cache key * @param buf block buffer */ + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false, false); } @@ -497,6 +506,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param cacheKey * @return true if contains the block */ + @Override public boolean containsBlock(BlockCacheKey cacheKey) { return map.containsKey(cacheKey); } @@ -780,6 +790,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return totalSize; } + @Override public int compareTo(BlockBucket that) { return Long.compare(this.overflow(), that.overflow()); } @@ -941,6 +952,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { *

Includes: total accesses, hits, misses, evicted blocks, and runs * of the eviction processes. */ + @Override public CacheStats getStats() { return this.stats; } @@ -1069,6 +1081,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor); } + @Override public void shutdown() { if (victimHandler != null) victimHandler.shutdown(); @@ -1117,7 +1130,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { Map counts = new EnumMap(BlockType.class); for (LruCachedBlock cb : map.values()) { - BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType(); + BlockType blockType = cb.getBuffer().getBlockType(); Integer count = counts.get(blockType); counts.put(blockType, (count == null ? 0 : count) + 1); } @@ -1137,11 +1150,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } - public void setVictimCache(BlockCache handler) { - assert victimHandler == null; - victimHandler = handler; - } - @VisibleForTesting Map getMapForTests() { return map; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java new file mode 100644 index 0000000..7a0bbb2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static java.util.Objects.requireNonNull; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.util.StringUtils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy.Eviction; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU + * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide + * O(1) read and write operations. + *

    + *
  • W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf
  • + *
  • Caffeine: https://github.com/ben-manes/caffeine
  • + *
  • Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html
  • + *
+ */ +@InterfaceAudience.Private +public final class TinyLfuBlockCache implements FirstLevelBlockCache { + private static final Log LOG = LogFactory.getLog(TinyLfuBlockCache.class); + + private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size"; + private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; + private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60; + + private final Eviction policy; + private final ScheduledExecutorService statsThreadPool; + private final long maxBlockSize; + private final CacheStats stats; + + private BlockCache victimCache; + + @VisibleForTesting + final Cache cache; + + /** + * Creates a block cache. + * + * @param maximumSizeInBytes maximum size of this cache, in bytes + * @param avgBlockSize expected average size of blocks, in bytes + * @param executor the cache's executor + * @param conf additional configuration + */ + public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, + Executor executor, Configuration conf) { + this(maximumSizeInBytes, avgBlockSize, + conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor); + } + + /** + * Creates a block cache. + * + * @param maximumSizeInBytes maximum size of this cache, in bytes + * @param avgBlockSize expected average size of blocks, in bytes + * @param maxBlockSize maximum size of a block, in bytes + * @param executor the cache's executor + */ + public TinyLfuBlockCache(long maximumSizeInBytes, + long avgBlockSize, long maxBlockSize, Executor executor) { + this.cache = Caffeine.newBuilder() + .executor(executor) + .maximumWeight(maximumSizeInBytes) + .removalListener(new EvictionListener()) + .weigher((BlockCacheKey key, Cacheable value) -> + (int) Math.min(value.heapSize(), Integer.MAX_VALUE)) + .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)) + .build(); + this.maxBlockSize = maxBlockSize; + this.policy = cache.policy().eviction().get(); + this.stats = new CacheStats(getClass().getSimpleName()); + + statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build()); + statsThreadPool.scheduleAtFixedRate(this::logStats, + STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS); + } + + @Override + public void setVictimCache(BlockCache victimCache) { + if (this.victimCache != null) { + throw new IllegalArgumentException("The victim cache has already been set"); + } + this.victimCache = requireNonNull(victimCache); + } + + @Override + public long size() { + return policy.getMaximum(); + } + + @Override + public long getFreeSize() { + return size() - getCurrentSize(); + } + + @Override + public long getCurrentSize() { + return policy.weightedSize().getAsLong(); + } + + @Override + public long getBlockCount() { + return cache.estimatedSize(); + } + + @Override + public long heapSize() { + return getCurrentSize(); + } + + @Override + public void setMaxSize(long size) { + policy.setMaximum(size); + } + + @Override + public boolean containsBlock(BlockCacheKey cacheKey) { + return cache.asMap().containsKey(cacheKey); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, + boolean caching, boolean repeat, boolean updateCacheMetrics) { + Cacheable value = cache.getIfPresent(cacheKey); + if (value == null) { + if (repeat) { + return null; + } + if (updateCacheMetrics) { + stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + if (victimCache != null) { + value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + if ((value != null) && caching) { + /* + if ((value instanceof HFileBlock) && ((HFileBlock) value).usesSharedMemory()) { + value = ((HFileBlock) value).deepClone(); + } + */ + cacheBlock(cacheKey, value); + } + } + } else if (updateCacheMetrics) { + stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + return value; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, + boolean inMemory, boolean cacheDataInL1) { + cacheBlock(cacheKey, value); + } + + @Override + public void cacheBlock(BlockCacheKey key, Cacheable value) { + if (value.heapSize() > maxBlockSize) { + // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged) + if (stats.failInsert() % 50 == 0) { + LOG.warn(String.format( + "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d", + key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE)); + } + } else { + cache.put(key, value); + } + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + Cacheable value = cache.asMap().remove(cacheKey); + return (value != null); + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + int evicted = 0; + for (BlockCacheKey key : cache.asMap().keySet()) { + if (key.getHfileName().equals(hfileName) && evictBlock(key)) { + evicted++; + } + } + if (victimCache != null) { + evicted += victimCache.evictBlocksByHfileName(hfileName); + } + return evicted; + } + + @Override + public CacheStats getStats() { + return stats; + } + + @Override + public void shutdown() { + if (victimCache != null) { + victimCache.shutdown(); + } + statsThreadPool.shutdown(); + } + + @Override + public BlockCache[] getBlockCaches() { + return null; + } + + @Override + public Iterator iterator() { + long now = System.nanoTime(); + return cache.asMap().entrySet().stream() + .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now)) + .iterator(); + } + + // @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + // There is no SHARED type here in L1. But the block might have been served from the L2 victim + // cache (when the Combined mode = false). So just try return this block to the victim cache. + // Note : In case of CombinedBlockCache we will have this victim cache configured for L1 + // cache. But CombinedBlockCache will only call returnBlock on L2 cache. + // if (victimCache != null) { + // victimCache.returnBlock(cacheKey, block); + // } + } + + private void logStats() { + LOG.info( + "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + + "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " + + "max=" + StringUtils.byteDesc(size()) + ", " + + "blockCount=" + getBlockCount() + ", " + + "accesses=" + stats.getRequestCount() + ", " + + "hits=" + stats.getHitCount() + ", " + + "hitRatio=" + (stats.getHitCount() == 0 ? + "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") + + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + + "cachingHits=" + stats.getHitCachingCount() + ", " + + "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? + "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + + "evictions=" + stats.getEvictionCount() + ", " + + "evicted=" + stats.getEvictedCount()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", getCurrentSize()) + .add("freeSize", getFreeSize()) + .add("maxSize", size()) + .add("heapSize", heapSize()) + .add("victimCache", (victimCache != null)) + .toString(); + } + + /** A removal listener to asynchronously record evictions and populate the victim cache. */ + private final class EvictionListener implements RemovalListener { + + @Override + public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) { + if (!cause.wasEvicted()) { + // An explicit eviction (invalidation) is not added to the victim cache as the data may + // no longer be valid for subsequent queries. + return; + } + + recordEviction(); + + if (victimCache == null) { + return; + } else if (victimCache instanceof BucketCache) { + BucketCache victimBucketCache = (BucketCache) victimCache; + victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true); + } else { + victimCache.cacheBlock(key, value); + } + } + } + + /** + * Records an eviction. The number of eviction operations and evicted blocks are identical, as + * an eviction is triggered immediately when the capacity has been exceeded. An eviction is + * performed asynchronously. See the library's documentation for details on write buffers, + * batching, and maintenance behavior. + */ + private void recordEviction() { + // FIXME: Currently does not capture the insertion time + stats.evicted(Long.MAX_VALUE, true); + stats.evict(); + } + + private static final class CachedBlockView implements CachedBlock { + private static final Comparator COMPARATOR = Comparator + .comparing(CachedBlock::getFilename) + .thenComparing(CachedBlock::getOffset) + .thenComparing(CachedBlock::getCachedTime); + + private final BlockCacheKey key; + private final Cacheable value; + private final long now; + + public CachedBlockView(BlockCacheKey key, Cacheable value, long now) { + this.now = now; + this.key = key; + this.value = value; + } + + @Override + public BlockPriority getBlockPriority() { + // This does not appear to be used in any meaningful way and is irrelevant to this cache + return BlockPriority.MEMORY; + } + + @Override + public BlockType getBlockType() { + return value.getBlockType(); + } + + @Override + public long getOffset() { + return key.getOffset(); + } + + @Override + public long getSize() { + return value.heapSize(); + } + + @Override + public long getCachedTime() { + // This does not appear to be used in any meaningful way, so not captured + return 0L; + } + + @Override + public String getFilename() { + return key.getHfileName(); + } + + @Override + public int compareTo(CachedBlock other) { + return COMPARATOR.compare(this, other); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (!(obj instanceof CachedBlock)) { + return false; + } + CachedBlock other = (CachedBlock) obj; + return compareTo(other) == 0; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return BlockCacheUtil.toString(this, now); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java index 0e6bc4f..22abe40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java @@ -25,12 +25,5 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface FlushRequestListener { - - /** - * Callback which will get called when a flush request is made for a region. - * - * @param type The type of flush. (ie. Whether a normal flush or flush because of global heap preassure) - * @param region The region for which flush is requested - */ - void flushRequested(FlushType type, Region region); + void flushRequested(FlushType type, org.apache.hadoop.hbase.regionserver.Region region); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java deleted file mode 100644 index 3b9161c..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.io.hfile.TestCacheConfig.DataCacheEntry; -import org.apache.hadoop.hbase.io.hfile.TestCacheConfig.IndexCacheEntry; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(SmallTests.class) -public class TestBlockCacheReporting { - private static final Log LOG = LogFactory.getLog(TestBlockCacheReporting.class); - private Configuration conf; - - @Before - public void setUp() throws Exception { - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - this.conf = HBaseConfiguration.create(); - } - - @After - public void tearDown() throws Exception { - // Let go of current block cache. - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - } - - private void addDataAndHits(final BlockCache bc, final int count) { - Cacheable dce = new DataCacheEntry(); - Cacheable ice = new IndexCacheEntry(); - for (int i = 0; i < count; i++) { - BlockCacheKey bckd = new BlockCacheKey("f", i); - BlockCacheKey bcki = new BlockCacheKey("f", i + count); - bc.getBlock(bckd, true, false, true); - bc.cacheBlock(bckd, dce); - bc.cacheBlock(bcki, ice); - bc.getBlock(bckd, true, false, true); - bc.getBlock(bcki, true, false, true); - } - assertEquals(2 * count /*Data and Index blocks*/, bc.getStats().getHitCount()); - BlockCacheKey bckd = new BlockCacheKey("f", 0); - BlockCacheKey bcki = new BlockCacheKey("f", 0 + count); - bc.evictBlock(bckd); - bc.evictBlock(bcki); - bc.getStats().getEvictedCount(); - } - - @Test - public void testBucketCache() throws JsonGenerationException, JsonMappingException, IOException { - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 100); - CacheConfig cc = new CacheConfig(this.conf); - assertTrue(cc.getBlockCache() instanceof CombinedBlockCache); - logPerBlock(cc.getBlockCache()); - final int count = 3; - addDataAndHits(cc.getBlockCache(), count); - // The below has no asserts. It is just exercising toString and toJSON code. - LOG.info(cc.getBlockCache().getStats()); - BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(cc.getBlockCache()); - LOG.info(cbsbf); - logPerFile(cbsbf); - bucketCacheReport(cc.getBlockCache()); - LOG.info(BlockCacheUtil.toJSON(cbsbf)); - } - - @Test - public void testLruBlockCache() throws JsonGenerationException, JsonMappingException, IOException { - CacheConfig cc = new CacheConfig(this.conf); - assertTrue(cc.isBlockCacheEnabled()); - assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory()); - assertTrue(cc.getBlockCache() instanceof LruBlockCache); - logPerBlock(cc.getBlockCache()); - addDataAndHits(cc.getBlockCache(), 3); - // The below has no asserts. It is just exercising toString and toJSON code. - BlockCache bc = cc.getBlockCache(); - LOG.info("count=" + bc.getBlockCount() + ", currentSize=" + bc.getCurrentSize() + - ", freeSize=" + bc.getFreeSize() ); - LOG.info(cc.getBlockCache().getStats()); - BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(cc.getBlockCache()); - LOG.info(cbsbf); - logPerFile(cbsbf); - bucketCacheReport(cc.getBlockCache()); - LOG.info(BlockCacheUtil.toJSON(cbsbf)); - } - - private void bucketCacheReport(final BlockCache bc) { - LOG.info(bc.getClass().getSimpleName() + ": " + bc.getStats()); - BlockCache [] bcs = bc.getBlockCaches(); - if (bcs != null) { - for (BlockCache sbc: bc.getBlockCaches()) { - bucketCacheReport(sbc); - } - } - } - - private void logPerFile(final BlockCacheUtil.CachedBlocksByFile cbsbf) - throws JsonGenerationException, JsonMappingException, IOException { - for (Map.Entry> e: - cbsbf.getCachedBlockStatsByFile().entrySet()) { - int count = 0; - long size = 0; - int countData = 0; - long sizeData = 0; - for (CachedBlock cb: e.getValue()) { - count++; - size += cb.getSize(); - BlockType bt = cb.getBlockType(); - if (bt != null && bt.isData()) { - countData++; - sizeData += cb.getSize(); - } - } - LOG.info("filename=" + e.getKey() + ", count=" + count + ", countData=" + countData + - ", size=" + size + ", sizeData=" + sizeData); - LOG.info(BlockCacheUtil.toJSON(e.getKey(), e.getValue())); - } - } - - private BlockCacheUtil.CachedBlocksByFile logPerBlock(final BlockCache bc) - throws JsonGenerationException, JsonMappingException, IOException { - BlockCacheUtil.CachedBlocksByFile cbsbf = new BlockCacheUtil.CachedBlocksByFile(); - for (CachedBlock cb: bc) { - LOG.info(cb.toString()); - LOG.info(BlockCacheUtil.toJSON(bc)); - cbsbf.update(cb); - } - return cbsbf; - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java deleted file mode 100644 index 4671f3a..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ /dev/null @@ -1,346 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests that {@link CacheConfig} does as expected. - */ -// This test is marked as a large test though it runs in a short amount of time -// (seconds). It is large because it depends on being able to reset the global -// blockcache instance which is in a global variable. Experience has it that -// tests clash on the global variable if this test is run as small sized test. -@Category(LargeTests.class) -public class TestCacheConfig { - private static final Log LOG = LogFactory.getLog(TestCacheConfig.class); - private Configuration conf; - - static class Deserializer implements CacheableDeserializer { - private final Cacheable cacheable; - private int deserializedIdentifier = 0; - - Deserializer(final Cacheable c) { - deserializedIdentifier = CacheableDeserializerIdManager.registerDeserializer(this); - this.cacheable = c; - } - - @Override - public int getDeserialiserIdentifier() { - return deserializedIdentifier; - } - - @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) throws IOException { - LOG.info("Deserialized " + b + ", reuse=" + reuse); - return cacheable; - } - - @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { - LOG.info("Deserialized " + b); - return cacheable; - } - }; - - static class IndexCacheEntry extends DataCacheEntry { - private static IndexCacheEntry SINGLETON = new IndexCacheEntry(); - - public IndexCacheEntry() { - super(SINGLETON); - } - - @Override - public BlockType getBlockType() { - return BlockType.ROOT_INDEX; - } - } - - static class DataCacheEntry implements Cacheable { - private static final int SIZE = 1; - private static DataCacheEntry SINGLETON = new DataCacheEntry(); - final CacheableDeserializer deserializer; - - DataCacheEntry() { - this(SINGLETON); - } - - DataCacheEntry(final Cacheable c) { - this.deserializer = new Deserializer(c); - } - - @Override - public String toString() { - return "size=" + SIZE + ", type=" + getBlockType(); - }; - - @Override - public long heapSize() { - return SIZE; - } - - @Override - public int getSerializedLength() { - return SIZE; - } - - @Override - public void serialize(ByteBuffer destination) { - LOG.info("Serialized " + this + " to " + destination); - } - - @Override - public CacheableDeserializer getDeserializer() { - return this.deserializer; - } - - @Override - public BlockType getBlockType() { - return BlockType.DATA; - } - }; - - static class MetaCacheEntry extends DataCacheEntry { - @Override - public BlockType getBlockType() { - return BlockType.INTERMEDIATE_INDEX; - } - } - - @Before - public void setUp() throws Exception { - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - this.conf = HBaseConfiguration.create(); - } - - @After - public void tearDown() throws Exception { - // Let go of current block cache. - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - } - - /** - * @param cc - * @param doubling If true, addition of element ups counter by 2, not 1, because element added - * to onheap and offheap caches. - * @param sizing True if we should run sizing test (doesn't always apply). - */ - void basicBlockCacheOps(final CacheConfig cc, final boolean doubling, - final boolean sizing) { - assertTrue(cc.isBlockCacheEnabled()); - assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory()); - BlockCache bc = cc.getBlockCache(); - BlockCacheKey bck = new BlockCacheKey("f", 0); - Cacheable c = new DataCacheEntry(); - // Do asserts on block counting. - long initialBlockCount = bc.getBlockCount(); - bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); - assertEquals(doubling? 2: 1, bc.getBlockCount() - initialBlockCount); - bc.evictBlock(bck); - assertEquals(initialBlockCount, bc.getBlockCount()); - // Do size accounting. Do it after the above 'warm-up' because it looks like some - // buffers do lazy allocation so sizes are off on first go around. - if (sizing) { - long originalSize = bc.getCurrentSize(); - bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); - assertTrue(bc.getCurrentSize() > originalSize); - bc.evictBlock(bck); - long size = bc.getCurrentSize(); - assertEquals(originalSize, size); - } - } - - /** - * @param cc - * @param filename - * @return - */ - private long cacheDataBlock(final CacheConfig cc, final String filename) { - BlockCacheKey bck = new BlockCacheKey(filename, 0); - Cacheable c = new DataCacheEntry(); - // Do asserts on block counting. - cc.getBlockCache().cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); - return cc.getBlockCache().getBlockCount(); - } - - @Test - public void testCacheConfigDefaultLRUBlockCache() { - CacheConfig cc = new CacheConfig(this.conf); - assertTrue(cc.isBlockCacheEnabled()); - assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory()); - basicBlockCacheOps(cc, false, true); - assertTrue(cc.getBlockCache() instanceof LruBlockCache); - } - - /** - * Assert that the caches are deployed with CombinedBlockCache and of the appropriate sizes. - */ - @Test - public void testOffHeapBucketCacheConfig() { - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - doBucketCacheConfigTest(); - } - - @Test - public void testOnHeapBucketCacheConfig() { - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "heap"); - doBucketCacheConfigTest(); - } - - @Test - public void testFileBucketCacheConfig() throws IOException { - HBaseTestingUtility htu = new HBaseTestingUtility(this.conf); - try { - Path p = new Path(htu.getDataTestDir(), "bc.txt"); - FileSystem fs = FileSystem.get(this.conf); - fs.create(p).close(); - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "file:" + p); - doBucketCacheConfigTest(); - } finally { - htu.cleanupTestDir(); - } - } - - private void doBucketCacheConfigTest() { - final int bcSize = 100; - this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, bcSize); - CacheConfig cc = new CacheConfig(this.conf); - basicBlockCacheOps(cc, false, false); - assertTrue(cc.getBlockCache() instanceof CombinedBlockCache); - // TODO: Assert sizes allocated are right and proportions. - CombinedBlockCache cbc = (CombinedBlockCache)cc.getBlockCache(); - BlockCache [] bcs = cbc.getBlockCaches(); - assertTrue(bcs[0] instanceof LruBlockCache); - LruBlockCache lbc = (LruBlockCache)bcs[0]; - assertEquals(CacheConfig.getLruCacheSize(this.conf, - ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()), lbc.getMaxSize()); - assertTrue(bcs[1] instanceof BucketCache); - BucketCache bc = (BucketCache)bcs[1]; - // getMaxSize comes back in bytes but we specified size in MB - assertEquals(bcSize, bc.getMaxSize() / (1024 * 1024)); - } - - /** - * Assert that when BUCKET_CACHE_COMBINED_KEY is false, the non-default, that we deploy - * LruBlockCache as L1 with a BucketCache for L2. - */ - @Test (timeout=10000) - public void testBucketCacheConfigL1L2Setup() { - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - // Make lru size is smaller than bcSize for sure. Need this to be true so when eviction - // from L1 happens, it does not fail because L2 can't take the eviction because block too big. - this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.001f); - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - long lruExpectedSize = CacheConfig.getLruCacheSize(this.conf, mu); - final int bcSize = 100; - long bcExpectedSize = 100 * 1024 * 1024; // MB. - assertTrue(lruExpectedSize < bcExpectedSize); - this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, bcSize); - this.conf.setBoolean(CacheConfig.BUCKET_CACHE_COMBINED_KEY, false); - CacheConfig cc = new CacheConfig(this.conf); - basicBlockCacheOps(cc, false, false); - assertTrue(cc.getBlockCache() instanceof LruBlockCache); - // TODO: Assert sizes allocated are right and proportions. - LruBlockCache lbc = (LruBlockCache)cc.getBlockCache(); - assertEquals(lruExpectedSize, lbc.getMaxSize()); - BlockCache bc = lbc.getVictimHandler(); - // getMaxSize comes back in bytes but we specified size in MB - assertEquals(bcExpectedSize, ((BucketCache) bc).getMaxSize()); - // Test the L1+L2 deploy works as we'd expect with blocks evicted from L1 going to L2. - long initialL1BlockCount = lbc.getBlockCount(); - long initialL2BlockCount = bc.getBlockCount(); - Cacheable c = new DataCacheEntry(); - BlockCacheKey bck = new BlockCacheKey("bck", 0); - lbc.cacheBlock(bck, c, false, false); - assertEquals(initialL1BlockCount + 1, lbc.getBlockCount()); - assertEquals(initialL2BlockCount, bc.getBlockCount()); - // Force evictions by putting in a block too big. - final long justTooBigSize = lbc.acceptableSize() + 1; - lbc.cacheBlock(new BlockCacheKey("bck2", 0), new DataCacheEntry() { - @Override - public long heapSize() { - return justTooBigSize; - } - - @Override - public int getSerializedLength() { - return (int)heapSize(); - } - }); - // The eviction thread in lrublockcache needs to run. - while (initialL1BlockCount != lbc.getBlockCount()) Threads.sleep(10); - assertEquals(initialL1BlockCount, lbc.getBlockCount()); - long count = bc.getBlockCount(); - assertTrue(initialL2BlockCount + 1 <= count); - } - - /** - * Test the cacheDataInL1 flag. When set, data blocks should be cached in the l1 tier, up in - * LruBlockCache when using CombinedBlockCcahe. - */ - @Test - public void testCacheDataInL1() { - this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 100); - CacheConfig cc = new CacheConfig(this.conf); - assertTrue(cc.getBlockCache() instanceof CombinedBlockCache); - CombinedBlockCache cbc = (CombinedBlockCache)cc.getBlockCache(); - // Add a data block. Should go into L2, into the Bucket Cache, not the LruBlockCache. - cacheDataBlock(cc, "1"); - LruBlockCache lrubc = (LruBlockCache)cbc.getBlockCaches()[0]; - assertDataBlockCount(lrubc, 0); - // Enable our test flag. - cc.setCacheDataInL1(true); - cacheDataBlock(cc, "2"); - assertDataBlockCount(lrubc, 1); - cc.setCacheDataInL1(false); - cacheDataBlock(cc, "3"); - assertDataBlockCount(lrubc, 1); - } - - private void assertDataBlockCount(final LruBlockCache bc, final int expected) { - Map blocks = bc.getBlockTypeCountsForTest(); - assertEquals(expected, blocks == null? 0: - blocks.get(BlockType.DATA) == null? 0: - blocks.get(BlockType.DATA).intValue()); - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java deleted file mode 100644 index c72c039..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ /dev/null @@ -1,457 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.io.hfile; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumMap; -import java.util.List; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.BloomFilterFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.common.collect.Lists; - -/** - * Tests {@link HFile} cache-on-write functionality for the following block - * types: data blocks, non-root index blocks, and Bloom filter blocks. - */ -@RunWith(Parameterized.class) -@Category(MediumTests.class) -public class TestCacheOnWrite { - - private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class); - - private static final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); - private Configuration conf; - private CacheConfig cacheConf; - private FileSystem fs; - private Random rand = new Random(12983177L); - private Path storeFilePath; - private BlockCache blockCache; - private String testDescription; - - private final CacheOnWriteType cowType; - private final Compression.Algorithm compress; - private final boolean cacheCompressedData; - - private static final int DATA_BLOCK_SIZE = 2048; - private static final int NUM_KV = 25000; - private static final int INDEX_BLOCK_SIZE = 512; - private static final int BLOOM_BLOCK_SIZE = 4096; - private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; - private static final int CKBYTES = 512; - - /** The number of valid key types possible in a store file */ - private static final int NUM_VALID_KEY_TYPES = - KeyValue.Type.values().length - 2; - - private static enum CacheOnWriteType { - DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, - BlockType.DATA, BlockType.ENCODED_DATA), - BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - BlockType.BLOOM_CHUNK), - INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - BlockType.LEAF_INDEX, BlockType.INTERMEDIATE_INDEX); - - private final String confKey; - private final BlockType blockType1; - private final BlockType blockType2; - - private CacheOnWriteType(String confKey, BlockType blockType) { - this(confKey, blockType, blockType); - } - - private CacheOnWriteType(String confKey, BlockType blockType1, - BlockType blockType2) { - this.blockType1 = blockType1; - this.blockType2 = blockType2; - this.confKey = confKey; - } - - public boolean shouldBeCached(BlockType blockType) { - return blockType == blockType1 || blockType == blockType2; - } - - public void modifyConf(Configuration conf) { - for (CacheOnWriteType cowType : CacheOnWriteType.values()) { - conf.setBoolean(cowType.confKey, cowType == this); - } - } - } - - public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, - boolean cacheCompressedData, BlockCache blockCache) { - this.cowType = cowType; - this.compress = compress; - this.cacheCompressedData = cacheCompressedData; - this.blockCache = blockCache; - testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + - ", cacheCompressedData=" + cacheCompressedData + "]"; - LOG.info(testDescription); - } - - private static List getBlockCaches() throws IOException { - Configuration conf = TEST_UTIL.getConfiguration(); - List blockcaches = new ArrayList(); - // default - blockcaches.add(new CacheConfig(conf).getBlockCache()); - - //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 - TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); - // memory - BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); - blockcaches.add(lru); - - // bucket cache - FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); - int[] bucketSizes = - { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; - BlockCache bucketcache = - new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); - blockcaches.add(bucketcache); - return blockcaches; - } - - @Parameters - public static Collection getParameters() throws IOException { - List params = new ArrayList(); - for (BlockCache blockCache : getBlockCaches()) { - for (CacheOnWriteType cowType : CacheOnWriteType.values()) { - for (Compression.Algorithm compress : HBaseTestingUtility.COMPRESSION_ALGORITHMS) { - for (boolean cacheCompressedData : new boolean[] { false, true }) { - params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); - } - } - } - } - return params; - } - - private void clearBlockCache(BlockCache blockCache) throws InterruptedException { - if (blockCache instanceof LruBlockCache) { - ((LruBlockCache) blockCache).clearCache(); - } else { - // BucketCache may not return all cached blocks(blocks in write queue), so check it here. - for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { - if (clearCount > 0) { - LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " - + blockCache.getBlockCount() + " blocks remaining"); - Thread.sleep(10); - } - for (CachedBlock block : Lists.newArrayList(blockCache)) { - BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); - // CombinedBucketCache may need evict two times. - for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { - if (evictCount > 1) { - LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount - + " times, maybe a bug here"); - } - } - } - } - } - } - - @Before - public void setUp() throws IOException { - conf = TEST_UTIL.getConfiguration(); - this.conf.set("dfs.datanode.data.dir.perm", "700"); - conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); - conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); - conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, - BLOOM_BLOCK_SIZE); - conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); - cowType.modifyConf(conf); - fs = HFileSystem.get(conf); - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = blockCache; - cacheConf = - new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), - cowType.shouldBeCached(BlockType.LEAF_INDEX), - cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, - false, false, false); - } - - @After - public void tearDown() throws IOException, InterruptedException { - clearBlockCache(blockCache); - } - - @AfterClass - public static void afterClass() throws IOException { - TEST_UTIL.cleanupTestDir(); - } - - private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { - writeStoreFile(useTags); - readStoreFile(useTags); - } - - private void readStoreFile(boolean useTags) throws IOException { - AbstractHFileReader reader; - if (useTags) { - reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf); - } else { - reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); - } - LOG.info("HFile information: " + reader); - HFileContext meta = new HFileContextBuilder().withCompression(compress) - .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) - .withBlockSize(DATA_BLOCK_SIZE) - .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) - .withIncludesTags(useTags).build(); - final boolean cacheBlocks = false; - final boolean pread = false; - HFileScanner scanner = reader.getScanner(cacheBlocks, pread); - assertTrue(testDescription, scanner.seekTo()); - - long offset = 0; - EnumMap blockCountByType = - new EnumMap(BlockType.class); - - DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); - while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - // Flags: don't cache the block, use pread, this is not a compaction. - // Also, pass null for expected block type to avoid checking it. - HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, - encodingInCache); - BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), - offset); - HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); - boolean isCached = fromCache != null; - boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); - assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + - "isCached: " + isCached + "\n" + - "Test description: " + testDescription + "\n" + - "block: " + block + "\n" + - "encodingInCache: " + encodingInCache + "\n" + - "blockCacheKey: " + blockCacheKey, - shouldBeCached == isCached); - if (isCached) { - if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { - if (compress != Compression.Algorithm.NONE) { - assertFalse(fromCache.isUnpacked()); - } - fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); - } else { - assertTrue(fromCache.isUnpacked()); - } - // block we cached at write-time and block read from file should be identical - assertEquals(block.getChecksumType(), fromCache.getChecksumType()); - assertEquals(block.getBlockType(), fromCache.getBlockType()); - assertNotEquals(block.getBlockType(), BlockType.ENCODED_DATA); - assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); - assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); - assertEquals( - block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); - } - offset += block.getOnDiskSizeWithHeader(); - BlockType bt = block.getBlockType(); - Integer count = blockCountByType.get(bt); - blockCountByType.put(bt, (count == null ? 0 : count) + 1); - } - - LOG.info("Block count by type: " + blockCountByType); - String countByType = blockCountByType.toString(); - if (useTags) { - assertEquals("{" + BlockType.DATA - + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", countByType); - } else { - assertEquals("{" + BlockType.DATA - + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", countByType); - } - - // iterate all the keyvalue from hfile - while (scanner.next()) { - scanner.getKeyValue(); - } - reader.close(); - } - - public static KeyValue.Type generateKeyType(Random rand) { - if (rand.nextBoolean()) { - // Let's make half of KVs puts. - return KeyValue.Type.Put; - } else { - KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; - if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { - throw new RuntimeException("Generated an invalid key type: " + keyType + ". " - + "Probably the layout of KeyValue.Type has changed."); - } - return keyType; - } - } - - private void writeStoreFile(boolean useTags) throws IOException { - if(useTags) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } else { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); - } - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), - "test_cache_on_write"); - HFileContext meta = new HFileContextBuilder().withCompression(compress) - .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) - .withBlockSize(DATA_BLOCK_SIZE) - .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) - .withIncludesTags(useTags).build(); - StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs) - .withOutputDir(storeFileParentDir).withComparator(KeyValue.COMPARATOR) - .withFileContext(meta) - .withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); - byte[] cf = Bytes.toBytes("fam"); - for (int i = 0; i < NUM_KV; ++i) { - byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i); - byte[] qualifier = TestHFileWriterV2.randomRowOrQualifier(rand); - byte[] value = TestHFileWriterV2.randomValue(rand); - KeyValue kv; - if(useTags) { - Tag t = new Tag((byte) 1, "visibility"); - List tagList = new ArrayList(); - tagList.add(t); - Tag[] tags = new Tag[1]; - tags[0] = t; - kv = - new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, - rand.nextLong(), generateKeyType(rand), value, 0, value.length, tagList); - } else { - kv = - new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, - rand.nextLong(), generateKeyType(rand), value, 0, value.length); - } - sfw.append(kv); - } - - sfw.close(); - storeFilePath = sfw.getPath(); - } - - private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) - throws IOException, InterruptedException { - if (useTags) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } else { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); - } - // TODO: need to change this test if we add a cache size threshold for - // compactions, or if we implement some other kind of intelligent logic for - // deciding what blocks to cache-on-write on compaction. - final String table = "CompactionCacheOnWrite"; - final String cf = "myCF"; - final byte[] cfBytes = Bytes.toBytes(cf); - final int maxVersions = 3; - Region region = TEST_UTIL.createTestRegion(table, - new HColumnDescriptor(cf) - .setCompressionType(compress) - .setBloomFilterType(BLOOM_TYPE) - .setMaxVersions(maxVersions) - .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) - ); - int rowIdx = 0; - long ts = EnvironmentEdgeManager.currentTime(); - for (int iFile = 0; iFile < 5; ++iFile) { - for (int iRow = 0; iRow < 500; ++iRow) { - String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + - iRow; - Put p = new Put(Bytes.toBytes(rowStr)); - ++rowIdx; - for (int iCol = 0; iCol < 10; ++iCol) { - String qualStr = "col" + iCol; - String valueStr = "value_" + rowStr + "_" + qualStr; - for (int iTS = 0; iTS < 5; ++iTS) { - if (useTags) { - Tag t = new Tag((byte) 1, "visibility"); - Tag[] tags = new Tag[1]; - tags[0] = t; - KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), - HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); - p.add(kv); - } else { - p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); - } - } - } - p.setDurability(Durability.ASYNC_WAL); - region.put(p); - } - region.flush(true); - } - clearBlockCache(blockCache); - assertEquals(0, blockCache.getBlockCount()); - region.compact(false); - LOG.debug("compactStores() returned"); - - for (CachedBlock block: blockCache) { - assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); - assertNotEquals(BlockType.DATA, block.getBlockType()); - } - ((HRegion)region).close(); - } - - @Test - public void testStoreFileCacheOnWrite() throws IOException { - testStoreFileCacheOnWriteInternals(false); - testStoreFileCacheOnWriteInternals(true); - } - - @Test - public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { - testNotCachingDataBlocksDuringCompactionInternals(false); - testNotCachingDataBlocksDuringCompactionInternals(true); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java deleted file mode 100644 index 16583d9..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -/** - * Make sure we always cache important block types, such as index blocks, as - * long as we have a block cache, even though block caching might be disabled - * for the column family. - * - *

TODO: This test writes a lot of data and only tests the most basic of metrics. Cache stats - * need to reveal more about what is being cached whether DATA or INDEX blocks and then we could - * do more verification in this test. - */ -@Category(MediumTests.class) -@RunWith(Parameterized.class) -public class TestForceCacheImportantBlocks { - private final HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); - - private static final String TABLE = "myTable"; - private static final String CF = "myCF"; - private static final byte[] CF_BYTES = Bytes.toBytes(CF); - private static final int MAX_VERSIONS = 3; - private static final int NUM_HFILES = 5; - - private static final int ROWS_PER_HFILE = 100; - private static final int NUM_ROWS = NUM_HFILES * ROWS_PER_HFILE; - private static final int NUM_COLS_PER_ROW = 50; - private static final int NUM_TIMESTAMPS_PER_COL = 50; - - /** Extremely small block size, so that we can get some index blocks */ - private static final int BLOCK_SIZE = 256; - - private static final Algorithm COMPRESSION_ALGORITHM = - Compression.Algorithm.GZ; - private static final BloomType BLOOM_TYPE = BloomType.ROW; - - @SuppressWarnings("unused") - // Currently unused. - private final int hfileVersion; - private final boolean cfCacheEnabled; - - @Parameters - public static Collection parameters() { - // HFile versions - return Arrays.asList( - new Object[] { 2, true }, - new Object[] { 2, false }, - new Object[] { 3, true }, - new Object[] { 3, false } - ); - } - - public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) { - this.hfileVersion = hfileVersion; - this.cfCacheEnabled = cfCacheEnabled; - TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion); - } - - @Before - public void setup() { - // Make sure we make a new one each time. - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - HFile.DATABLOCK_READ_COUNT.set(0); - } - - @Test - public void testCacheBlocks() throws IOException { - // Set index block size to be the same as normal block size. - TEST_UTIL.getConfiguration().setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, BLOCK_SIZE); - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(CF)).setMaxVersions(MAX_VERSIONS). - setCompressionType(COMPRESSION_ALGORITHM). - setBloomFilterType(BLOOM_TYPE); - hcd.setBlocksize(BLOCK_SIZE); - hcd.setBlockCacheEnabled(cfCacheEnabled); - Region region = TEST_UTIL.createTestRegion(TABLE, hcd); - BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache(); - CacheStats stats = cache.getStats(); - writeTestData(region); - assertEquals(0, stats.getHitCount()); - assertEquals(0, HFile.DATABLOCK_READ_COUNT.get()); - // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss - // count should go up. Otherwise, all should be cached and the miss count should not rise. - region.get(new Get(Bytes.toBytes("row" + 0))); - assertTrue(stats.getHitCount() > 0); - assertTrue(HFile.DATABLOCK_READ_COUNT.get() > 0); - long missCount = stats.getMissCount(); - region.get(new Get(Bytes.toBytes("row" + 0))); - if (this.cfCacheEnabled) assertEquals(missCount, stats.getMissCount()); - else assertTrue(stats.getMissCount() > missCount); - } - - private void writeTestData(Region region) throws IOException { - for (int i = 0; i < NUM_ROWS; ++i) { - Put put = new Put(Bytes.toBytes("row" + i)); - for (int j = 0; j < NUM_COLS_PER_ROW; ++j) { - for (long ts = 1; ts < NUM_TIMESTAMPS_PER_COL; ++ts) { - put.add(CF_BYTES, Bytes.toBytes("col" + j), ts, - Bytes.toBytes("value" + i + "_" + j + "_" + ts)); - } - } - region.put(put); - if ((i + 1) % ROWS_PER_HFILE == 0) { - region.flush(true); - } - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java deleted file mode 100644 index b7f7fa1..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import com.google.common.collect.Iterables; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.*; - -/** - * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, - * and {@link LruBlockCache}. - */ -@Category(SmallTests.class) -@RunWith(Parameterized.class) -public class TestLazyDataBlockDecompression { - private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private FileSystem fs; - - @Parameterized.Parameter(0) - public boolean cacheOnWrite; - - @Parameterized.Parameters - public static Iterable data() { - return Arrays.asList(new Object[][] { - { false }, - { true } - }); - } - - @Before - public void setUp() throws IOException { - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - fs = FileSystem.get(TEST_UTIL.getConfiguration()); - } - - @After - public void tearDown() { - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - fs = null; - } - - /** - * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row - * bytes of the KeyValues written, in the order they were written. - */ - private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, - HFileContext cxt, int entryCount) throws IOException { - HFileWriterV2 writer = (HFileWriterV2) - new HFileWriterV2.WriterFactoryV2(conf, cc) - .withPath(fs, path) - .withFileContext(cxt) - .create(); - - // write a bunch of random kv's - Random rand = new Random(9713312); // some seed. - final byte[] family = Bytes.toBytes("f"); - final byte[] qualifier = Bytes.toBytes("q"); - - for (int i = 0; i < entryCount; i++) { - byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i); - byte[] valueBytes = TestHFileWriterV2.randomValue(rand); - // make a real keyvalue so that hfile tool can examine it - writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); - } - writer.close(); - } - - /** - * Read all blocks from {@code path} to populate {@code blockCache}. - */ - private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, - Path path, HFileContext cxt) throws IOException { - FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); - long fileSize = fs.getFileStatus(path).getLen(); - FixedFileTrailer trailer = - FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); - HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig, - fsdis.getHfs(), conf); - reader.loadFileInfo(); - long offset = trailer.getFirstDataBlockOffset(), - max = trailer.getLastDataBlockOffset(); - List blocks = new ArrayList(4); - HFileBlock block; - while (offset <= max) { - block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, - /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); - offset += block.getOnDiskSizeWithHeader(); - blocks.add(block); - } - LOG.info("read " + Iterables.toString(blocks)); - } - - @Test - public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { - // enough room for 2 uncompressed block - int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); - Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), - "testCompressionIncreasesEffectiveBlockcacheSize"); - HFileContext context = new HFileContextBuilder() - .withCompression(Compression.Algorithm.GZ) - .build(); - LOG.info("context=" + context); - - // setup cache with lazy-decompression disabled. - Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = - new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled); - CacheConfig cc = new CacheConfig(lazyCompressDisabled); - assertFalse(cc.shouldCacheDataCompressed()); - assertTrue(cc.getBlockCache() instanceof LruBlockCache); - LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache(); - LOG.info("disabledBlockCache=" + disabledBlockCache); - assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); - assertTrue("eviction thread spawned unintentionally.", - disabledBlockCache.getEvictionThread() == null); - assertEquals("freshly created blockcache contains blocks.", - 0, disabledBlockCache.getBlockCount()); - - // 2000 kv's is ~3.6 full unencoded data blocks. - // Requires a conf and CacheConfig but should not be specific to this instance's cache settings - writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); - - // populate the cache - cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); - long disabledBlockCount = disabledBlockCache.getBlockCount(); - assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, - disabledBlockCount > 0); - long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); - for (Map.Entry e : - disabledBlockCache.getMapForTests().entrySet()) { - HFileBlock block = (HFileBlock) e.getValue().getBuffer(); - assertTrue("found a packed block, block=" + block, block.isUnpacked()); - } - - // count blocks with lazy decompression - Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); - lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); - CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = - new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled); - cc = new CacheConfig(lazyCompressEnabled); - assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); - assertTrue(cc.getBlockCache() instanceof LruBlockCache); - LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache(); - LOG.info("enabledBlockCache=" + enabledBlockCache); - assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); - assertTrue("eviction thread spawned unintentionally.", - enabledBlockCache.getEvictionThread() == null); - assertEquals("freshly created blockcache contains blocks.", - 0, enabledBlockCache.getBlockCount()); - - cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); - long enabledBlockCount = enabledBlockCache.getBlockCount(); - assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, - enabledBlockCount > 0); - long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); - int candidatesFound = 0; - for (Map.Entry e : - enabledBlockCache.getMapForTests().entrySet()) { - candidatesFound++; - HFileBlock block = (HFileBlock) e.getValue().getBuffer(); - if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { - assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" + - block.getBufferWithoutHeader().capacity(), block.isUnpacked()); - } - } - assertTrue("did not find any candidates for compressed caching. Invalid test.", - candidatesFound > 0); - - LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + - enabledBlockCount); - assertTrue("enabling compressed data blocks should increase the effective cache size. " + - "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + - enabledBlockCount, disabledBlockCount < enabledBlockCount); - - LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + - enabledEvictedCount); - assertTrue("enabling compressed data blocks should reduce the number of evictions. " + - "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" + - enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); - } -} diff --git a/pom.xml b/pom.xml index b1a06e1..28ac4aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1156,7 +1156,7 @@ yyyy-MM-dd'T'HH:mm ${maven.build.timestamp} - 1.7 + 1.8 3.0.3 ${compileSource} @@ -1186,6 +1186,7 @@ 3.1 4.4.4 2.2.0 + 2.3.3 12.0.1 1.9.13 5.5.23 @@ -1454,6 +1455,11 @@ ${slf4j.version} + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + com.yammer.metrics metrics-core ${metrics-core.version}