From 2ba4f94e5de5b194991b3e433bd7ce719165c987 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 5 Mar 2015 14:47:59 -0800 Subject: [PATCH] HBASE-13170 Allow block cache to be external Summary: Add MemcachedBlockCache to allow the L2 block cache to be backed by memcached. This should allow the block cache to persist on restarts, region movement, and crashes. Test Plan: Tested locally with PE and running memcached. Differential Revision: https://reviews.facebook.net/D34635 --- hbase-server/pom.xml | 4 + .../hbase/tmpl/regionserver/BlockCacheTmpl.jamon | 76 +++--- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 59 ++++- .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 47 ++-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 14 +- .../io/hfile/InclusiveCombinedBlockCache.java | 58 +++++ .../hadoop/hbase/io/hfile/LruBlockCache.java | 32 ++- .../hadoop/hbase/io/hfile/MemcachedBlockCache.java | 258 +++++++++++++++++++++ ...ressureAwareCompactionThroughputController.java | 18 +- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 4 +- pom.xml | 6 + 11 files changed, 487 insertions(+), 89 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 67a1e20..1da4c06 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -490,6 +490,10 @@ io.netty netty-all + + net.spy + spymemcached + org.apache.htrace diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index e4ff70f..0419196 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -206,60 +206,68 @@ org.apache.hadoop.util.StringUtils; -<%def bc_stats> +<%def hits_tmpl> <%args> - CacheConfig cacheConfig; + BlockCache bc; -<%if cacheConfig == null || cacheConfig.getBlockCache() == null %> -

CacheConfig is null

-<%else> - - - - - - - - - - - - - - - - - - - - - - <& evictions_tmpl; bc = cacheConfig.getBlockCache(); &> - + - + - + - + - + + + + +<%def bc_stats> +<%args> + CacheConfig cacheConfig; + +<%if cacheConfig == null || cacheConfig.getBlockCache() == null %> +

CacheConfig is null

+<%else> +
AttributeValueDescription
Size<% StringUtils.humanReadableInt(cacheConfig.getBlockCache().getCurrentSize()) %>Current size of block cache in use (bytes)
Free<% StringUtils.humanReadableInt(cacheConfig.getBlockCache().getFreeSize()) %>The total free memory currently available to store more cache entries (bytes)
Count<% String.format("%,d", cacheConfig.getBlockCache().getBlockCount()) %>Number of blocks in block cache
Hits<% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCount()) %><% String.format("%,d", bc.getStats().getHitCount()) %> Number requests that were cache hits
Hits Caching<% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCachingCount()) %><% String.format("%,d", bc.getStats().getHitCachingCount()) %> Cache hit block requests but only requests set to cache block if a miss
Misses<% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %><% String.format("%,d", bc.getStats().getMissCount()) %> Block requests that were cache misses but set to cache missed blocks
Misses Caching<% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %><% String.format("%,d", bc.getStats().getMissCount()) %> Block requests that were cache misses but only requests set to use block cache
Hit Ratio<% String.format("%,.2f", cacheConfig.getBlockCache().getStats().getHitRatio() * 100) %><% "%" %><% String.format("%,.2f", bc.getStats().getHitRatio() * 100) %><% "%" %> Hit Count divided by total requests count
+ + + + + + + + + + + + + + + + + + + + + <& evictions_tmpl; bc = cacheConfig.getBlockCache(); &> + <& hits_tmpl; bc = cacheConfig.getBlockCache(); &>
AttributeValueDescription
Size<% StringUtils.humanReadableInt(cacheConfig.getBlockCache().getCurrentSize()) %>Current size of block cache in use (bytes)
Free<% StringUtils.humanReadableInt(cacheConfig.getBlockCache().getFreeSize()) %>The total free memory currently available to store more cache entries (bytes)
Count<% String.format("%,d", cacheConfig.getBlockCache().getBlockCount()) %>Number of blocks in block cache

If block cache is made up of more than one cache -- i.e. a L1 and a L2 -- then the above are combined counts. Request count is sum of hits and misses.

@@ -349,7 +357,9 @@ are combined counts. Request count is sum of hits and misses.

Size of DATA Blocks -<%if evictions %><& evictions_tmpl; bc = bc; &> +<& evictions_tmpl; bc = bc; &> +<& hits_tmpl; bc = bc; &> + <%if bucketCache %> Hits per Second diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index f212f14..487aed8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -126,8 +127,12 @@ public class CacheConfig { */ public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize"; + private static final String EXTERNAL_BLOCKCACHE_KEY = "hbase.blockcache.use.external"; + private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false; + + private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class"; + // Defaults - public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false; public static final boolean DEFAULT_IN_MEMORY = false; @@ -478,7 +483,34 @@ public class CacheConfig { * @return Returns L2 block cache instance (for now it is BucketCache BlockCache all the time) * or null if not supposed to be a L2. */ - private static BucketCache getL2(final Configuration c, final MemoryUsage mu) { + private static BlockCache getL2(final Configuration c, final MemoryUsage mu) { + final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " l2 cache" ); + } + + // If we want to use an external block cache then create that. + if (useExternal) { + return getExternalBlockcache(c); + } + + // otherwise use the bucket cache. + return getBucketCache(c, mu); + + } + + private static BlockCache getExternalBlockcache(Configuration c) { + try { + Class klass = c.getClass(EXTERNAL_BLOCKCACHE_CLASS_KEY, MemcachedBlockCache.class); + LOG.info("Creating external block cache of type: " + klass); + return (BlockCache) ReflectionUtils.newInstance(klass, c); + } catch (Exception e) { + LOG.warn("Error creating external block cache", e); + } + return null; + } + + private static BlockCache getBucketCache(Configuration c, MemoryUsage mu) { // Check for L2. ioengine name must be non-null. String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; @@ -533,22 +565,27 @@ public class CacheConfig { LruBlockCache l1 = getL1(conf, mu); // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. if (blockCacheDisabled) return null; - BucketCache l2 = getL2(conf, mu); + BlockCache l2 = getL2(conf, mu); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { + boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); - if (combinedWithLru) { - GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); + if (useExternal) { + GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler - // mechanism. It is a little ugly but works according to the following: when the - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get - // a block from the L1 cache, if not in L1, we will search L2. - l1.setVictimCache(l2); - GLOBAL_BLOCK_CACHE_INSTANCE = l1; + if (combinedWithLru) { + GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); + } else { + // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler + // mechanism. It is a little ugly but works according to the following: when the + // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get + // a block from the L1 cache, if not in L1, we will search L2. + GLOBAL_BLOCK_CACHE_INSTANCE = l1; + } } + l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 52a5793..d5bb4d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -25,32 +25,37 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; + /** * CombinedBlockCache is an abstraction layer that combines * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used - * to cache bloom blocks and index blocks. The larger bucketCache is used to + * to cache bloom blocks and index blocks. The larger l2Cache is used to * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads - * first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted + * first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted * from lruCache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. * */ @InterfaceAudience.Private public class CombinedBlockCache implements ResizableBlockCache, HeapSize { - private final LruBlockCache lruCache; - private final BucketCache bucketCache; - private final CombinedCacheStats combinedCacheStats; + protected final LruBlockCache lruCache; + protected final BlockCache l2Cache; + protected final CombinedCacheStats combinedCacheStats; - public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) { + public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) { this.lruCache = lruCache; - this.bucketCache = bucketCache; + this.l2Cache = l2Cache; this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), - bucketCache.getStats()); + l2Cache.getStats()); } @Override public long heapSize() { - return lruCache.heapSize() + bucketCache.heapSize(); + long l2size = 0; + if (l2Cache instanceof HeapSize ) { + l2size = ((HeapSize) l2Cache).heapSize(); + } + return lruCache.heapSize() + l2size; } @Override @@ -60,7 +65,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { if (isMetaBlock || cacheDataInL1) { lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); } else { - bucketCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); + l2Cache.cacheBlock(cacheKey, buf, inMemory, false); } } @@ -73,22 +78,24 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock - // we end up calling bucketCache.getBlock. + // we end up calling l2Cache.getBlock. if (lruCache.containsBlock(cacheKey)) { return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } - return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + Cacheable result = l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + + return result; } @Override public boolean evictBlock(BlockCacheKey cacheKey) { - return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); + return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); } @Override public int evictBlocksByHfileName(String hfileName) { return lruCache.evictBlocksByHfileName(hfileName) - + bucketCache.evictBlocksByHfileName(hfileName); + + l2Cache.evictBlocksByHfileName(hfileName); } @Override @@ -99,27 +106,27 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public void shutdown() { lruCache.shutdown(); - bucketCache.shutdown(); + l2Cache.shutdown(); } @Override public long size() { - return lruCache.size() + bucketCache.size(); + return lruCache.size() + l2Cache.size(); } @Override public long getFreeSize() { - return lruCache.getFreeSize() + bucketCache.getFreeSize(); + return lruCache.getFreeSize() + l2Cache.getFreeSize(); } @Override public long getCurrentSize() { - return lruCache.getCurrentSize() + bucketCache.getCurrentSize(); + return lruCache.getCurrentSize() + l2Cache.getCurrentSize(); } @Override public long getBlockCount() { - return lruCache.getBlockCount() + bucketCache.getBlockCount(); + return lruCache.getBlockCount() + l2Cache.getBlockCount(); } private static class CombinedCacheStats extends CacheStats { @@ -205,7 +212,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public BlockCache[] getBlockCaches() { - return new BlockCache [] {this.lruCache, this.bucketCache}; + return new BlockCache [] {this.lruCache, this.l2Cache}; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index b096185..6b86d0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -116,7 +116,7 @@ public class HFileBlock implements Cacheable { */ static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; - private static final CacheableDeserializer blockDeserializer = + static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); @@ -130,13 +130,13 @@ public class HFileBlock implements Cacheable { buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); - ourBuffer.offset = buf.getLong(); - ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); - if (ourBuffer.hasNextBlockHeader()) { - ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize()); + HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum); + hFileBlock.offset = buf.getLong(); + hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt(); + if (hFileBlock.hasNextBlockHeader()) { + hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize()); } - return ourBuffer; + return hFileBlock; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java new file mode 100644 index 0000000..667e7b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java @@ -0,0 +1,58 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache { + public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) { + super(l1,l2); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat, boolean updateCacheMetrics) { + // On all external cache set ups the lru should have the l2 cache set as the victimHandler + // Because of that all requests that miss inside of the lru block cache will be + // tried in the l2 block cache. + return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + } + + /** + * + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory. This parameter is only useful for + * the L1 lru cache. + * @param cacheDataInL1 This is totally ignored. + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + final boolean cacheDataInL1) { + // This is the inclusive part of the combined block cache. + // Every block is placed into both block caches. + lruCache.cacheBlock(cacheKey, buf, inMemory, true); + + // This assumes that insertion into the L2 block cache is either async or very fast. + l2Cache.cacheBlock(cacheKey, buf, inMemory, true); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 82df5f7..d3b1aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -197,8 +197,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { private boolean forceInMemory; /** Where to send victims (blocks evicted/missing from the cache) */ - // TODO: Fix it so this is not explicit reference to a particular BlockCache implementation. - private BucketCache victimHandler = null; + private BlockCache victimHandler = null; /** * Default constructor. Specify maximum size and expected average block @@ -419,8 +418,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { LruCachedBlock cb = map.get(cacheKey); if (cb == null) { if (!repeat && updateCacheMetrics) stats.miss(caching); - if (victimHandler != null) { - return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + // If there is another block cache then try and read there. + // However if this is a retry ( second time in double checked locking ) + // And it's already a miss then the l2 will also be a miss. + if (victimHandler != null && !repeat) { + Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + + // Promote this to L1. + if (result != null && caching) { + cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true); + } + return result; } return null; } @@ -489,10 +497,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } stats.evicted(block.getCachedTime()); if (evictedByEvictionProcess && victimHandler != null) { - boolean wait = getCurrentSize() < acceptableSize(); - boolean inMemory = block.getPriority() == BlockPriority.MEMORY; - victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), - inMemory, wait); + if (victimHandler instanceof BucketCache) { + boolean wait = getCurrentSize() < acceptableSize(); + boolean inMemory = block.getPriority() == BlockPriority.MEMORY; + ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), + inMemory, wait); + } else { + victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); + } } return block.heapSize(); } @@ -1057,7 +1069,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } - public void setVictimCache(BucketCache handler) { + public void setVictimCache(BlockCache handler) { assert victimHandler == null; victimHandler = handler; } @@ -1067,7 +1079,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return map; } - BucketCache getVictimHandler() { + BlockCache getVictimHandler() { return this.victimHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java new file mode 100644 index 0000000..69ca699 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -0,0 +1,258 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import net.spy.memcached.CachedData; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.transcoders.Transcoder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Class to store blocks into memcached. + * This should only be used on a cluster of Memcached daemons that are tuned well and have a + * good network connection to the HBase regionservers. Any other use will likely slow down HBase + * greatly. + */ +@InterfaceAudience.Private +public class MemcachedBlockCache implements BlockCache { + private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName()); + + // Some memcache versions won't take more than 1024 * 1024. So set the limit below + // that just in case this client is used with those versions. + public static final int MAX_SIZE = 1020 * 1024; + + // Config key for what memcached servers to use. + // They should be specified in a comma sperated list with ports. + // like: + // + // host1:11211,host3:8080,host4:11211 + public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; + + private final MemcachedClient client; + private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); + private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); + + public MemcachedBlockCache(Configuration c) throws IOException { + LOG.info("Creating MemcachedBlockCache"); + ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder() + .setOpTimeout(TimeUnit.SECONDS.toMillis(1)) + .setOpQueueMaxBlockTime(1500) // Cap the max time before anything times out + .setFailureMode(FailureMode.Redistribute) + .setShouldOptimize(true) // When regions move lots of reads happen together + // So combining them into single requests is nice. + .setDaemon(true) // Don't keep threads around past the end of days. + .setUseNagleAlgorithm(false) // Ain't nobody got time for that + .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024 ); // 4 times larger than the + // default block just in case + + + // Assume only the localhost is serving memecached. + // A la mcrouter or co-locating memcached with split regionservers. + // + // If this config is a pool of memecached servers they will all be used according to the + // default hashing scheme defined by the memcache client. Spy Memecache client in this + // case. + String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); + String[] servers = serverListString.split(","); + List serverAddresses = new ArrayList(servers.length); + for (String s:servers) { + serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); + } + + client = new MemcachedClient(builder.build(), serverAddresses); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, + Cacheable buf, + boolean inMemory, + boolean cacheDataInL1) { + cacheBlock(cacheKey, buf); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + if (buf instanceof HFileBlock) { + client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("MemcachedBlockCache can not cache Cacheable's of type " + + buf.getClass().toString()); + } + } + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat, boolean updateCacheMetrics) { + // Assume that nothing is the block cache + HFileBlock result = null; + + try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) { + result = client.get(cacheKey.toString(), tc); + } catch (Exception e) { + // Catch a pretty broad set of exceptions to limit any changes in the memecache client + // and how it handles failures from leaking into the read path. + LOG.warn("Exception pulling from memcached [ " + cacheKey.toString() + " ]", e); + result = null; + } finally { + // Update stats if this request doesn't have it turned off 100% of the time + if (updateCacheMetrics) { + if (result == null) { + cacheStats.miss(caching); + } else { + cacheStats.hit(caching); + } + } + } + + + return result; + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + try { + cacheStats.evict(); + return client.delete(cacheKey.toString()).get(); + } catch (InterruptedException e) { + LOG.warn("Error deleting " + cacheKey.toString(), e); + } catch (ExecutionException e) { + LOG.warn("Error deleting " + cacheKey.toString(), e); + } + return false; + } + + /** + * This method does nothing so that memcached can handle all evictions. + */ + @Override + public int evictBlocksByHfileName(String hfileName) { + return 0; + } + + @Override + public CacheStats getStats() { + return cacheStats; + } + + @Override + public void shutdown() { + client.shutdown(); + } + + @Override + public long size() { + return 0; + } + + @Override + public long getFreeSize() { + return 0; + } + + @Override + public long getCurrentSize() { + return 0; + } + + @Override + public long getBlockCount() { + return 0; + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public CachedBlock next() { + return null; + } + + @Override + public void remove() { + + } + }; + } + + @Override + public BlockCache[] getBlockCaches() { + return null; + } + + /** + * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. + */ + private static class HFileBlockTranscoder implements Transcoder { + + @Override + public boolean asyncDecode(CachedData d) { + return false; + } + + @Override + public CachedData encode(HFileBlock block) { + ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); + block.serialize(bb); + return new CachedData(0, bb.array(), CachedData.MAX_SIZE); + } + + @Override + public HFileBlock decode(CachedData d) { + try { + ByteBuffer buf = ByteBuffer.wrap(d.getData()); + return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true); + } catch (IOException e) { + LOG.warn("Error deserializing data from memcached"); + } + return null; + } + + @Override + public int getMaxSize() { + return MAX_SIZE; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java index 11ab568..5b079b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java @@ -198,6 +198,10 @@ public class PressureAwareCompactionThroughputController extends Configured impl @Override public long control(String compactionName, long size) throws InterruptedException { ActiveCompaction compaction = activeCompactions.get(compactionName); + if (compaction == null) { + return 0; + } + compaction.totalSize += size; long deltaSize = compaction.totalSize - compaction.lastControlSize; if (deltaSize < controlPerSize) { @@ -235,12 +239,14 @@ public class PressureAwareCompactionThroughputController extends Configured impl @Override public void finish(String compactionName) { ActiveCompaction compaction = activeCompactions.remove(compactionName); - long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime); - LOG.info(compactionName + " average throughput is " - + throughputDesc(compaction.totalSize, elapsedTime) + ", slept " - + compaction.numberOfSleeps + " time(s) and total slept time is " - + compaction.totalSleepTime + " ms. " + activeCompactions.size() - + " active compactions remaining, total limit is " + throughputDesc(maxThroughput)); + if (compaction != null) { + long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime); + LOG.info(compactionName + " average throughput is " + + throughputDesc(compaction.totalSize, elapsedTime) + ", slept " + + compaction.numberOfSleeps + " time(s) and total slept time is " + + compaction.totalSleepTime + " ms. " + activeCompactions.size() + + " active compactions remaining, total limit is " + throughputDesc(maxThroughput)); + } } private volatile boolean stopped = false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index c5fcc3c..ce78a37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -283,9 +283,9 @@ public class TestCacheConfig { // TODO: Assert sizes allocated are right and proportions. LruBlockCache lbc = (LruBlockCache)cc.getBlockCache(); assertEquals(lruExpectedSize, lbc.getMaxSize()); - BucketCache bc = lbc.getVictimHandler(); + BlockCache bc = lbc.getVictimHandler(); // getMaxSize comes back in bytes but we specified size in MB - assertEquals(bcExpectedSize, bc.getMaxSize()); + assertEquals(bcExpectedSize, ((BucketCache) bc).getMaxSize()); // Test the L1+L2 deploy works as we'd expect with blocks evicted from L1 going to L2. long initialL1BlockCount = lbc.getBlockCount(); long initialL2BlockCount = bc.getBlockCount(); diff --git a/pom.xml b/pom.xml index 3f94abc..2ce148e 100644 --- a/pom.xml +++ b/pom.xml @@ -1174,6 +1174,7 @@ 4.0.23.Final 2.1.2 1.0.8 + 2.11.6 2.4 1.6 @@ -1665,6 +1666,11 @@ disruptor ${disruptor.version}
+ + net.spy + spymemcached + ${spy.version} + org.jmock jmock-junit4 -- 2.3.0