From a80386ea928f8087ad7f3b5b50b7aaf7b6d17004 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 27 Mar 2016 04:24:42 -0700 Subject: [PATCH] HBASE-15560 W-TinyLFU based BlockCache --- .../java/org/apache/hadoop/hbase/HConstants.java | 5 + hbase-common/src/main/resources/hbase-default.xml | 5 + hbase-server/pom.xml | 4 + .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 86 +++-- .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 44 +-- .../hbase/io/hfile/FirstLevelBlockCache.java | 36 ++ .../io/hfile/InclusiveCombinedBlockCache.java | 6 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 39 ++- .../hadoop/hbase/io/hfile/TinyLfuBlockCache.java | 384 +++++++++++++++++++++ .../hadoop/hbase/io/hfile/TestCacheConfig.java | 8 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 8 +- .../io/hfile/TestLazyDataBlockDecompression.java | 28 +- .../hadoop/hbase/io/hfile/TestLruBlockCache.java | 26 +- .../hbase/io/hfile/TestTinyLfuBlockCache.java | 310 +++++++++++++++++ pom.xml | 6 + 15 files changed, 895 insertions(+), 100 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4a8f55c..265046d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -933,6 +933,11 @@ public final class HConstants { public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; + public static final String HFILE_BLOCK_CACHE_POLICY_KEY = + "hfile.block.cache.policy"; + + public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU"; + /* * Minimum percentage of free heap necessary for a successful cluster startup. */ diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 5b0700b..513a010 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -814,6 +814,11 @@ possible configurations would overwhelm and obscure the important. The default thread pool size if parallel-seeking feature enabled. + hfile.block.cache.policy + LRU + The eviction policy for the L1 block cache (LRU or TinyLFU). + + hfile.block.cache.size 0.4 Percentage of maximum heap (-Xmx setting) to allocate to block cache diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index a431006..d036357 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -435,6 +435,10 @@ true + com.github.ben-manes.caffeine + caffeine + + io.dropwizard.metrics metrics-core diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 321f72c..977d1d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -22,13 +22,15 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.Optional; +import java.util.concurrent.ForkJoinPool; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -96,7 +98,7 @@ public class CacheConfig { * is an in-memory map that needs to be persisted across restarts. Where to store this * in-memory state is what you supply here: e.g. /tmp/bucketcache.map. */ - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; /** @@ -104,11 +106,11 @@ public class CacheConfig { * as indices and blooms are kept in the lru blockcache and the data blocks in the * bucket cache). */ - public static final String BUCKET_CACHE_COMBINED_KEY = + public static final String BUCKET_CACHE_COMBINED_KEY = "hbase.bucketcache.combinedcache.enabled"; public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; /** @@ -449,7 +451,9 @@ public class CacheConfig { * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise */ public boolean shouldCacheCompressed(BlockCategory category) { - if (!isBlockCacheEnabled()) return false; + if (!isBlockCacheEnabled()) { + return false; + } switch (category) { case DATA: return this.cacheDataOnRead && this.cacheDataCompressed; @@ -531,13 +535,13 @@ public class CacheConfig { // Clear this if in tests you'd make more than one block cache instance. @VisibleForTesting static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; - private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE; + private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE; /** Boolean whether we have disabled the block cache entirely. */ @VisibleForTesting static boolean blockCacheDisabled = false; - static long getLruCacheSize(final Configuration conf, final long xmx) { + static long getFirstLevelCacheSize(final Configuration conf, final long xmx) { float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); if (cachePercentage <= 0.0001f) { @@ -555,26 +559,39 @@ public class CacheConfig { /** * @param c Configuration to use. - * @return An L1 instance. Currently an instance of LruBlockCache. + * @return An L1 instance */ - public static LruBlockCache getL1(final Configuration c) { - return getL1(c, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()); + public static FirstLevelBlockCache getL1(final Configuration c) { + long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + long l1CacheSize = getFirstLevelCacheSize(c, xmx); + return getL1(l1CacheSize, c, Optional.empty()); } /** * @param c Configuration to use. * @param xmx Max heap memory - * @return An L1 instance. Currently an instance of LruBlockCache. + * @return An L1 instance. */ - private synchronized static LruBlockCache getL1(final Configuration c, final long xmx) { + private synchronized static FirstLevelBlockCache getL1(long cacheSize, + Configuration c, Optional victimCache) { if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; if (blockCacheDisabled) return null; - long lruCacheSize = getLruCacheSize(c, xmx); - if (lruCacheSize < 0) return null; + if (cacheSize < 0) return null; + + String policy = c.get(HConstants.HFILE_BLOCK_CACHE_POLICY_KEY, + HConstants.HFILE_BLOCK_CACHE_POLICY_DEFAULT); int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); - LOG.info("Allocating LruBlockCache size=" + - StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); - GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c); + LOG.info("Allocating BlockCache size=" + + StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); + + if (policy.equalsIgnoreCase("LRU")) { + GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c, victimCache); + } else if (policy.equalsIgnoreCase("TinyLFU")) { + GLOBAL_L1_CACHE_INSTANCE = new TinyLfuBlockCache( + cacheSize, blockSize, ForkJoinPool.commonPool(), c, victimCache); + } else { + throw new IllegalArgumentException("Unknown policy: " + policy); + } return GLOBAL_L1_CACHE_INSTANCE; } @@ -629,7 +646,9 @@ public class CacheConfig { private static BlockCache getBucketCache(Configuration c, long xmx) { // Check for L2. ioengine name must be non-null. String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); - if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; + if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) { + return null; + } int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); @@ -679,33 +698,40 @@ public class CacheConfig { * @return The block cache or null. */ public static synchronized BlockCache instantiateBlockCache(Configuration conf) { - if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; - if (blockCacheDisabled) return null; + if (GLOBAL_BLOCK_CACHE_INSTANCE != null) { + return GLOBAL_BLOCK_CACHE_INSTANCE; + } + if (blockCacheDisabled) { + return null; + } + // blockCacheDisabled is set as a side-effect of getFirstLevelCacheSize() + // so check it again after the call long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - LruBlockCache l1 = getL1(conf, xmx); - // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. - if (blockCacheDisabled) return null; + long l1CacheSize = getFirstLevelCacheSize(conf, xmx); + if (blockCacheDisabled) { + return null; + } BlockCache l2 = getL2(conf, xmx); + FirstLevelBlockCache l1 = getL1(l1CacheSize, conf, Optional.ofNullable(l2)); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, + boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); if (useExternal) { GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { - if (combinedWithLru) { + if (combinedWithL1) { GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); } else { - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler - // mechanism. It is a little ugly but works according to the following: when the - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get - // a block from the L1 cache, if not in L1, we will search L2. + // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache + // victimhandler mechanism. It is a little ugly but works according to the following: + // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when + // we get a block from the L1 cache, if not in L1, we will search L2. GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } - l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 4ceda39..9ecdc443 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -34,20 +34,20 @@ import com.google.common.annotations.VisibleForTesting; * to cache bloom blocks and index blocks. The larger l2Cache is used to * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads * first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted - * from lruCache are put into the bucket cache. + * from lruCache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. - * + * */ @InterfaceAudience.Private public class CombinedBlockCache implements ResizableBlockCache, HeapSize { - protected final LruBlockCache lruCache; + protected final FirstLevelBlockCache l1Cache; protected final BlockCache l2Cache; protected final CombinedCacheStats combinedCacheStats; - public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) { - this.lruCache = lruCache; + public CombinedBlockCache(FirstLevelBlockCache l1Cache, BlockCache l2Cache) { + this.l1Cache = l1Cache; this.l2Cache = l2Cache; - this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), + this.combinedCacheStats = new CombinedCacheStats(l1Cache.getStats(), l2Cache.getStats()); } @@ -57,7 +57,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { if (l2Cache instanceof HeapSize) { l2size = ((HeapSize) l2Cache).heapSize(); } - return lruCache.heapSize() + l2size; + return l1Cache.heapSize() + l2size; } @Override @@ -65,7 +65,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { final boolean cacheDataInL1) { boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; if (metaBlock || cacheDataInL1) { - lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); + l1Cache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); } else { l2Cache.cacheBlock(cacheKey, buf, inMemory, false); } @@ -81,19 +81,19 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { boolean repeat, boolean updateCacheMetrics) { // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock // we end up calling l2Cache.getBlock. - return lruCache.containsBlock(cacheKey)? - lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics): + return l1Cache.containsBlock(cacheKey)? + l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics): l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } @Override public boolean evictBlock(BlockCacheKey cacheKey) { - return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); + return l1Cache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); } @Override public int evictBlocksByHfileName(String hfileName) { - return lruCache.evictBlocksByHfileName(hfileName) + return l1Cache.evictBlocksByHfileName(hfileName) + l2Cache.evictBlocksByHfileName(hfileName); } @@ -104,28 +104,28 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public void shutdown() { - lruCache.shutdown(); + l1Cache.shutdown(); l2Cache.shutdown(); } @Override public long size() { - return lruCache.size() + l2Cache.size(); + return l1Cache.size() + l2Cache.size(); } @Override public long getFreeSize() { - return lruCache.getFreeSize() + l2Cache.getFreeSize(); + return l1Cache.getFreeSize() + l2Cache.getFreeSize(); } @Override public long getCurrentSize() { - return lruCache.getCurrentSize() + l2Cache.getCurrentSize(); + return l1Cache.getCurrentSize() + l2Cache.getCurrentSize(); } @Override public long getBlockCount() { - return lruCache.getBlockCount() + l2Cache.getBlockCount(); + return l1Cache.getBlockCount() + l2Cache.getBlockCount(); } public static class CombinedCacheStats extends CacheStats { @@ -310,7 +310,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { lruCacheStats.rollMetricsPeriod(); bucketCacheStats.rollMetricsPeriod(); } - + @Override public long getFailedInserts() { return lruCacheStats.getFailedInserts() + bucketCacheStats.getFailedInserts(); @@ -321,13 +321,13 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { return lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats.getSumHitCountsPastNPeriods(); } - + @Override public long getSumRequestCountsPastNPeriods() { return lruCacheStats.getSumRequestCountsPastNPeriods() + bucketCacheStats.getSumRequestCountsPastNPeriods(); } - + @Override public long getSumHitCachingCountsPastNPeriods() { return lruCacheStats.getSumHitCachingCountsPastNPeriods() @@ -348,12 +348,12 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public BlockCache[] getBlockCaches() { - return new BlockCache [] {this.lruCache, this.l2Cache}; + return new BlockCache [] {this.l1Cache, this.l2Cache}; } @Override public void setMaxSize(long size) { - this.lruCache.setMaxSize(size); + this.l1Cache.setMaxSize(size); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java new file mode 100644 index 0000000..7a4070a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; + +/** + * In-memory BlockCache that may be backed by secondary layer(s). + */ +@InterfaceAudience.Private +public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize { + + /** + * Whether the cache contains the block with specified cacheKey + * + * @param cacheKey + * @return true if it contains the block + */ + boolean containsBlock(BlockCacheKey cacheKey); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java index 667e7b4..160714b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache { - public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) { + public InclusiveCombinedBlockCache(FirstLevelBlockCache l1, BlockCache l2) { super(l1,l2); } @@ -34,7 +34,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B // On all external cache set ups the lru should have the l2 cache set as the victimHandler // Because of that all requests that miss inside of the lru block cache will be // tried in the l2 block cache. - return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + return l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } /** @@ -50,7 +50,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B final boolean cacheDataInL1) { // This is the inclusive part of the combined block cache. // Every block is placed into both block caches. - lruCache.cacheBlock(cacheKey, buf, inMemory, true); + l1Cache.cacheBlock(cacheKey, buf, inMemory, true); // This assumes that insertion into the L2 block cache is either async or very fast. l2Cache.cacheBlock(cacheKey, buf, inMemory, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 99b67ba..e6f166e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -24,6 +24,7 @@ import java.util.EnumMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.PriorityQueue; import java.util.SortedSet; import java.util.TreeSet; @@ -57,15 +58,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving * constant-time {@link #cacheBlock} and {@link #getBlock} operations.

* - * Contains three levels of block priority to allow for scan-resistance and in-memory families - * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column + * Contains three levels of block priority to allow for scan-resistance and in-memory families + * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column * family is a column family that should be served from memory if possible): * single-access, multiple-accesses, and in-memory priority. * A block is added with an in-memory priority flag if * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a * single access priority the first time it is read into this block cache. If a block is * accessed again while in cache, it is marked as a multiple access priority block. This - * delineation of blocks is used to prevent scans from thrashing the cache adding a + * delineation of blocks is used to prevent scans from thrashing the cache adding a * least-frequently-used element to the eviction algorithm.

* * Each priority is given its own chunk of the total cache to ensure @@ -95,7 +96,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ @InterfaceAudience.Private @JsonIgnoreProperties({"encodingCountsForTest"}) -public class LruBlockCache implements ResizableBlockCache, HeapSize { +public class LruBlockCache implements FirstLevelBlockCache { private static final Log LOG = LogFactory.getLog(LruBlockCache.class); @@ -238,11 +239,13 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { DEFAULT_MEMORY_FACTOR, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, - DEFAULT_MAX_BLOCK_SIZE + DEFAULT_MAX_BLOCK_SIZE, + Optional.empty() ); } - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { + public LruBlockCache(long maxSize, long blockSize, + boolean evictionThread, Configuration conf, Optional victimCache) { this(maxSize, blockSize, evictionThread, (int)Math.ceil(1.2*maxSize/blockSize), DEFAULT_LOAD_FACTOR, @@ -254,12 +257,13 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), - conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) - ); + conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), + victimCache); } - public LruBlockCache(long maxSize, long blockSize, Configuration conf) { - this(maxSize, blockSize, true, conf); + public LruBlockCache(long maxSize, long blockSize, + Configuration conf, Optional victimCache) { + this(maxSize, blockSize, true, conf, victimCache); } /** @@ -280,7 +284,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor, - boolean forceInMemory, long maxBlockSize) { + boolean forceInMemory, long maxBlockSize, Optional victimCache) { this.maxBlockSize = maxBlockSize; if(singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { @@ -296,6 +300,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { this.maxSize = maxSize; this.blockSize = blockSize; this.forceInMemory = forceInMemory; + this.victimHandler = victimCache.orElse(null); map = new ConcurrentHashMap(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); this.minFactor = minFactor; @@ -434,6 +439,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param cacheKey block's cache key * @param buf block buffer */ + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false, false); } @@ -495,6 +501,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param cacheKey * @return true if contains the block */ + @Override public boolean containsBlock(BlockCacheKey cacheKey) { return map.containsKey(cacheKey); } @@ -782,6 +789,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return totalSize; } + @Override public int compareTo(BlockBucket that) { return Long.compare(this.overflow(), that.overflow()); } @@ -943,6 +951,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { *

Includes: total accesses, hits, misses, evicted blocks, and runs * of the eviction processes. */ + @Override public CacheStats getStats() { return this.stats; } @@ -1071,6 +1080,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor); } + @Override public void shutdown() { if (victimHandler != null) victimHandler.shutdown(); @@ -1119,7 +1129,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { Map counts = new EnumMap(BlockType.class); for (LruCachedBlock cb : map.values()) { - BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType(); + BlockType blockType = cb.getBuffer().getBlockType(); Integer count = counts.get(blockType); counts.put(blockType, (count == null ? 0 : count) + 1); } @@ -1139,11 +1149,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } - public void setVictimCache(BlockCache handler) { - assert victimHandler == null; - victimHandler = handler; - } - @VisibleForTesting Map getMapForTests() { return map; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java new file mode 100644 index 0000000..9f78d57 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.util.StringUtils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy.Eviction; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU + * eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide + * O(1) read and write operations. + *

    + *
  • W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf
  • + *
  • Caffeine: https://github.com/ben-manes/caffeine
  • + *
  • Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html
  • + *
+ */ +@InterfaceAudience.Private +public final class TinyLfuBlockCache implements FirstLevelBlockCache { + private static final Log LOG = LogFactory.getLog(TinyLfuBlockCache.class); + + private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size"; + private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; + private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60; + + private final Eviction policy; + private final ScheduledExecutorService statsThreadPool; + private final BlockCache victimCache; + private final long maxBlockSize; + private final CacheStats stats; + + + @VisibleForTesting + final Cache cache; + + /** + * Creates a block cache. + * + * @param maximumSizeInBytes maximum size of this cache, in bytes + * @param avgBlockSize expected average size of blocks, in bytes + * @param executor the cache's executor + * @param conf additional configuration + * @param victimCache the second level cache + */ + public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, + Executor executor, Configuration conf, Optional victimCache) { + this(maximumSizeInBytes, avgBlockSize, + conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor, victimCache); + } + + /** + * Creates a block cache. + * + * @param maximumSizeInBytes maximum size of this cache, in bytes + * @param avgBlockSize expected average size of blocks, in bytes + * @param maxBlockSize maximum size of a block, in bytes + * @param executor the cache's executor + * @param victimCache the second level cache + */ + public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize, + long maxBlockSize, Executor executor, Optional victimCache) { + this.cache = Caffeine.newBuilder() + .executor(executor) + .maximumWeight(maximumSizeInBytes) + .removalListener(new EvictionListener()) + .weigher((BlockCacheKey key, Cacheable value) -> + (int) Math.min(value.heapSize(), Integer.MAX_VALUE)) + .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize)) + .build(); + this.maxBlockSize = maxBlockSize; + this.victimCache = victimCache.orElse(null); + this.policy = cache.policy().eviction().get(); + this.stats = new CacheStats(getClass().getSimpleName()); + + statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build()); + statsThreadPool.scheduleAtFixedRate(this::logStats, + STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS); + } + + @Override + public long size() { + return policy.getMaximum(); + } + + @Override + public long getFreeSize() { + return size() - getCurrentSize(); + } + + @Override + public long getCurrentSize() { + return policy.weightedSize().getAsLong(); + } + + @Override + public long getBlockCount() { + return cache.estimatedSize(); + } + + @Override + public long heapSize() { + return getCurrentSize(); + } + + @Override + public void setMaxSize(long size) { + policy.setMaximum(size); + } + + @Override + public boolean containsBlock(BlockCacheKey cacheKey) { + return cache.asMap().containsKey(cacheKey); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, + boolean caching, boolean repeat, boolean updateCacheMetrics) { + Cacheable value = cache.getIfPresent(cacheKey); + if (value == null) { + if (repeat) { + return null; + } + if (updateCacheMetrics) { + stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + if (victimCache != null) { + value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + if ((value != null) && caching) { + cacheBlock(cacheKey, value); + } + } + } else if (updateCacheMetrics) { + stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + return value; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, + boolean inMemory, boolean cacheDataInL1) { + cacheBlock(cacheKey, value); + } + + @Override + public void cacheBlock(BlockCacheKey key, Cacheable value) { + if ((value.heapSize() > maxBlockSize)) { + // If there are a lot of blocks that are too big this can make the logs too noisy (2% logged) + if (stats.failInsert() % 50 == 0) { + LOG.warn(String.format( + "Trying to cache too large a block %s @ %,d is %,d which is larger than %,d", + key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE)); + } + } else { + cache.put(key, value); + } + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + Cacheable value = cache.asMap().remove(cacheKey); + return (value != null); + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + int evicted = 0; + for (BlockCacheKey key : cache.asMap().keySet()) { + if (key.getHfileName().equals(hfileName) && evictBlock(key)) { + evicted++; + } + } + if (victimCache != null) { + evicted += victimCache.evictBlocksByHfileName(hfileName); + } + return evicted; + } + + @Override + public CacheStats getStats() { + return stats; + } + + @Override + public void shutdown() { + if (victimCache != null) { + victimCache.shutdown(); + } + statsThreadPool.shutdown(); + } + + @Override + public BlockCache[] getBlockCaches() { + return null; + } + + @Override + public Iterator iterator() { + long now = System.nanoTime(); + return cache.asMap().entrySet().stream() + .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now)) + .iterator(); + } + + @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + // There is no SHARED type here in L1. But the block might have been served from the L2 victim + // cache (when the Combined mode = false). So just try return this block to the victim cache. + // Note : In case of CombinedBlockCache we will have this victim cache configured for L1 + // cache. But CombinedBlockCache will only call returnBlock on L2 cache. + if (victimCache != null) { + victimCache.returnBlock(cacheKey, block); + } + } + + private void logStats() { + LOG.info( + "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + + "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " + + "max=" + StringUtils.byteDesc(size()) + ", " + + "blockCount=" + getBlockCount() + ", " + + "accesses=" + stats.getRequestCount() + ", " + + "hits=" + stats.getHitCount() + ", " + + "hitRatio=" + (stats.getHitCount() == 0 ? + "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") + + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + + "cachingHits=" + stats.getHitCachingCount() + ", " + + "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? + "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + + "evictions=" + stats.getEvictionCount() + ", " + + "evicted=" + stats.getEvictedCount()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", getCurrentSize()) + .add("freeSize", getFreeSize()) + .add("maxSize", size()) + .add("heapSize", heapSize()) + .add("victimCache", (victimCache != null)) + .toString(); + } + + /** A removal listener to asynchronously record evictions and populate the victim cache. */ + private final class EvictionListener implements RemovalListener { + + @Override + public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) { + if (!cause.wasEvicted()) { + // FIXME: Currently does not capture the insertion time + stats.evicted(/* cachedTime */ 0L, key.isPrimary()); + + // An explicit eviction (invalidation) is not added to the victim cache as the data may + // no longer be valid for subsequent queries. + return; + } + + stats.evict(); + stats.evicted(Long.MAX_VALUE, true); + if (victimCache == null) { + return; + } else if (victimCache instanceof BucketCache) { + BucketCache victimBucketCache = (BucketCache) victimCache; + victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true); + } else { + victimCache.cacheBlock(key, value); + } + } + } + + private static final class CachedBlockView implements CachedBlock { + private static final Comparator COMPARATOR = Comparator + .comparing(CachedBlock::getFilename) + .thenComparing(CachedBlock::getOffset) + .thenComparing(CachedBlock::getCachedTime); + + private final BlockCacheKey key; + private final Cacheable value; + private final long now; + + public CachedBlockView(BlockCacheKey key, Cacheable value, long now) { + this.now = now; + this.key = key; + this.value = value; + } + + @Override + public BlockPriority getBlockPriority() { + // This does not appear to be used in any meaningful way and is irrelevant to this cache + return BlockPriority.MEMORY; + } + + @Override + public BlockType getBlockType() { + return value.getBlockType(); + } + + @Override + public long getOffset() { + return key.getOffset(); + } + + @Override + public long getSize() { + return value.heapSize(); + } + + @Override + public long getCachedTime() { + // This does not appear to be used in any meaningful way, so not captured + return 0L; + } + + @Override + public String getFilename() { + return key.getHfileName(); + } + + @Override + public int compareTo(CachedBlock other) { + return COMPARATOR.compare(this, other); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (!(obj instanceof CachedBlock)) { + return false; + } + CachedBlock other = (CachedBlock) obj; + return compareTo(other) == 0; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return BlockCacheUtil.toString(this, now); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index d9d5261..bf44d33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Before; @@ -328,7 +328,7 @@ public class TestCacheConfig { BlockCache [] bcs = cbc.getBlockCaches(); assertTrue(bcs[0] instanceof LruBlockCache); LruBlockCache lbc = (LruBlockCache)bcs[0]; - assertEquals(CacheConfig.getLruCacheSize(this.conf, + assertEquals(CacheConfig.getFirstLevelCacheSize(this.conf, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()), lbc.getMaxSize()); assertTrue(bcs[1] instanceof BucketCache); BucketCache bc = (BucketCache)bcs[1]; @@ -347,7 +347,7 @@ public class TestCacheConfig { // from L1 happens, it does not fail because L2 can't take the eviction because block too big. this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.001f); MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - long lruExpectedSize = CacheConfig.getLruCacheSize(this.conf, mu.getMax()); + long lruExpectedSize = CacheConfig.getFirstLevelCacheSize(this.conf, mu.getMax()); final int bcSize = 100; long bcExpectedSize = 100 * 1024 * 1024; // MB. assertTrue(lruExpectedSize < bcExpectedSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 8f9c4f7..8cc5e5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import org.apache.commons.logging.Log; @@ -161,7 +162,8 @@ public class TestCacheOnWrite { //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f); // memory - BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); + BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, + TEST_UTIL.getConfiguration(), Optional.empty()); blockcaches.add(lru); // bucket cache @@ -411,7 +413,7 @@ public class TestCacheOnWrite { final String cf = "myCF"; final byte[] cfBytes = Bytes.toBytes(cf); final int maxVersions = 3; - Region region = TEST_UTIL.createTestRegion(table, + Region region = TEST_UTIL.createTestRegion(table, new HColumnDescriptor(cf) .setCompressionType(compress) .setBloomFilterType(BLOOM_TYPE) @@ -422,7 +424,7 @@ public class TestCacheOnWrite { long ts = EnvironmentEdgeManager.currentTime(); for (int iFile = 0; iFile < 5; ++iFile) { for (int iRow = 0; iRow < 500; ++iRow) { - String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + + String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; Put p = new Put(Bytes.toBytes(rowStr)); ++rowIdx; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java index cf3c6ed..d5e77b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -17,7 +17,18 @@ */ package org.apache.hadoop.hbase.io.hfile; -import com.google.common.collect.Iterables; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,14 +50,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.*; +import com.google.common.collect.Iterables; /** * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, @@ -151,7 +155,8 @@ public class TestLazyDataBlockDecompression { lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = - new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled); + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, + false, lazyCompressDisabled, Optional.empty()); CacheConfig cc = new CacheConfig(lazyCompressDisabled); assertFalse(cc.shouldCacheDataCompressed()); assertTrue(cc.getBlockCache() instanceof LruBlockCache); @@ -186,7 +191,8 @@ public class TestLazyDataBlockDecompression { lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = - new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled); + new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, + false, lazyCompressEnabled, Optional.empty()); cc = new CacheConfig(lazyCompressEnabled); assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); assertTrue(cc.getBlockCache() instanceof LruBlockCache); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java index b4dfc0c..e36fcad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java @@ -24,16 +24,17 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -179,7 +180,7 @@ public class TestLruBlockCache { assertEquals( "Cache should ignore cache requests for blocks already in cache", expectedBlockCount, cache.getBlockCount()); - + // Verify correctly calculated cache heap size assertEquals(expectedCacheSize, cache.heapSize()); @@ -313,7 +314,8 @@ public class TestLruBlockCache { 0.34f, // memory 1.2f, // limit false, - 16 * 1024 * 1024); + 16 * 1024 * 1024, + Optional.empty()); CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single"); CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -435,7 +437,8 @@ public class TestLruBlockCache { 0.5f, // memory 1.2f, // limit true, - 16 * 1024 * 1024); + 16 * 1024 * 1024, + Optional.empty()); CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single"); CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); @@ -542,7 +545,8 @@ public class TestLruBlockCache { 0.34f, // memory 1.2f, // limit false, - 16 * 1024 * 1024); + 16 * 1024 * 1024, + Optional.empty()); CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single"); CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -606,7 +610,8 @@ public class TestLruBlockCache { 0.34f, // memory 1.2f, // limit false, - 1024); + 1024, + Optional.empty()); CachedItem [] tooLong = generateFixedBlocks(10, 1024+5, "long"); CachedItem [] small = generateFixedBlocks(15, 600, "small"); @@ -646,7 +651,8 @@ public class TestLruBlockCache { 0.34f, // memory 1.2f, // limit false, - 16 * 1024 * 1024); + 16 * 1024 * 1024, + Optional.empty()); CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single"); CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); @@ -811,7 +817,7 @@ public class TestLruBlockCache { ClassSize.CONCURRENT_HASHMAP + (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) + (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT); - long negateBlockSize = (long)(totalOverhead/numEntries); + long negateBlockSize = totalOverhead/numEntries; negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD; return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*0.99f)); } @@ -869,7 +875,7 @@ public class TestLruBlockCache { @Override public void serialize(ByteBuffer destination) { } - + @Override public BlockType getBlockType() { return BlockType.DATA; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java new file mode 100644 index 0000000..2c496f9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java @@ -0,0 +1,310 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.Random; + +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ClassSize; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests the concurrent TinyLfuBlockCache. + */ +@Category({IOTests.class, SmallTests.class}) +public class TestTinyLfuBlockCache { + + @Test + public void testCacheSimple() throws Exception { + + long maxSize = 1000000; + long blockSize = calculateBlockSizeDefault(maxSize, 101); + + TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, + blockSize, blockSize, Runnable::run, Optional.empty()); + + CachedItem [] blocks = generateRandomBlocks(100, blockSize); + + long expectedCacheSize = cache.heapSize(); + + // Confirm empty + for (CachedItem block : blocks) { + assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null); + } + + // Add blocks + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + expectedCacheSize += block.heapSize(); + } + + // Verify correctly calculated cache heap size + assertEquals(expectedCacheSize, cache.heapSize()); + + // Check if all blocks are properly cached and retrieved + for (CachedItem block : blocks) { + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); + assertTrue(buf != null); + assertEquals(buf.heapSize(), block.heapSize()); + } + + // Re-add same blocks and ensure nothing has changed + long expectedBlockCount = cache.getBlockCount(); + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + } + assertEquals( + "Cache should ignore cache requests for blocks already in cache", + expectedBlockCount, cache.getBlockCount()); + + // Verify correctly calculated cache heap size + assertEquals(expectedCacheSize, cache.heapSize()); + + // Check if all blocks are properly cached and retrieved + for (CachedItem block : blocks) { + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); + assertTrue(buf != null); + assertEquals(buf.heapSize(), block.heapSize()); + } + + // Expect no evictions + assertEquals(0, cache.getStats().getEvictionCount()); + } + + @Test + public void testCacheEvictionSimple() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSizeDefault(maxSize, 10); + + TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, + blockSize, blockSize, Runnable::run, Optional.empty()); + + CachedItem [] blocks = generateFixedBlocks(11, blockSize, "block"); + + // Add all the blocks + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + } + + // A single eviction run should have occurred + assertEquals(1, cache.getStats().getEvictionCount()); + + // The cache did not grow beyond max + assertTrue(cache.heapSize() < maxSize); + + // All blocks except one should be in the cache + assertEquals(10, cache.getBlockCount()); + } + + @Test + public void testScanResistance() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, + blockSize, blockSize, Runnable::run, Optional.empty()); + + CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single"); + CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); + + // Add 5 blocks from each + for(int i=0; i<5; i++) { + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]); + } + + // Add frequency + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 10; j++) { + CachedItem block = multiBlocks[i]; + cache.getBlock(block.cacheKey, true, false, true); + } + } + + // Let's keep "scanning" by adding single blocks. From here on we only + // expect evictions from the single bucket. + + for(int i=5;i<18;i++) { + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + } + + for (CachedItem block : multiBlocks) { + assertTrue(cache.cache.asMap().containsKey(block.cacheKey)); + } + + assertEquals(10, cache.getBlockCount()); + assertEquals(13, cache.getStats().getEvictionCount()); + + } + + @Test + public void testMaxBlockSize() throws Exception { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, + blockSize, blockSize, Runnable::run, Optional.empty()); + CachedItem [] tooLong = generateFixedBlocks(10, 2 * blockSize, "long"); + CachedItem [] small = generateFixedBlocks(15, blockSize / 2, "small"); + + for (CachedItem i:tooLong) { + cache.cacheBlock(i.cacheKey, i); + } + for (CachedItem i:small) { + cache.cacheBlock(i.cacheKey, i); + } + assertEquals(15,cache.getBlockCount()); + for (CachedItem i:small) { + assertNotNull(cache.getBlock(i.cacheKey, true, false, false)); + } + for (CachedItem i:tooLong) { + assertNull(cache.getBlock(i.cacheKey, true, false, false)); + } + + assertEquals(10, cache.getStats().getFailedInserts()); + } + + @Test + public void testResizeBlockCache() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, + blockSize, blockSize, Runnable::run, Optional.empty()); + + CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block"); + + for(CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + } + + // Do not expect any evictions yet + assertEquals(10, cache.getBlockCount()); + assertEquals(0, cache.getStats().getEvictionCount()); + + // Resize to half capacity plus an extra block (otherwise we evict an extra) + cache.setMaxSize(maxSize / 2); + + // And we expect 1/2 of the blocks to be evicted + assertEquals(5, cache.getBlockCount()); + assertEquals(5, cache.getStats().getEvictedCount()); + } + + private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) { + CachedItem [] blocks = new CachedItem[numBlocks]; + for(int i=0;i getDeserializer() { + return null; + } + + @Override + public void serialize(ByteBuffer destination) { + } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } + + @Override + public MemoryType getMemoryType() { + return MemoryType.EXCLUSIVE; + } + + } + +} + diff --git a/pom.xml b/pom.xml index c148b19..72638d4 100644 --- a/pom.xml +++ b/pom.xml @@ -1191,6 +1191,7 @@ 4.5.2 4.4.4 3.1.2 + 2.3.3 12.0.1 1.9.13 5.5.23 @@ -1465,6 +1466,11 @@ ${slf4j.version} + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + io.dropwizard.metrics metrics-core ${metrics-core.version} -- 2.9.0