diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index a436901..03ce242 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -80,6 +80,7 @@ public class HBaseConfiguration extends Configuration { conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); int bcul = (int)(blockCacheUpperLimit * CONVERT_TO_PERCENTAGE); + // TODO: should also consider an on-heap CombinedBlockCache configuration. if (CONVERT_TO_PERCENTAGE - (gml + bcul) < (int)(CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 461e009..fb92351 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -26,7 +26,8 @@ import org.apache.hadoop.conf.Configuration; /** * Block cache interface. Anything that implements the {@link Cacheable} - * interface can be put in the cache. + * interface can be put in the cache. Implementations MUST provide a public + * static create(Configuration) method for instantiation via {@link CacheConfig}. */ @InterfaceAudience.Private public interface BlockCache { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 2276543..0ce51f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,10 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; -import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.DirectMemoryUtils; -import org.apache.hadoop.util.StringUtils; /** * Stores all of the cache objects and configuration for a single HFile. @@ -75,26 +70,9 @@ public class CacheConfig { "hbase.rs.evictblocksonclose"; /** - * Configuration keys for Bucket cache + * The BlockCache implementation or strategy to use. */ - public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine"; - public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; - public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = - "hbase.bucketcache.persistent.path"; - public static final String BUCKET_CACHE_COMBINED_KEY = - "hbase.bucketcache.combinedcache.enabled"; - public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = - "hbase.bucketcache.percentage.in.combinedcache"; - public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; - public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = - "hbase.bucketcache.writer.queuelength"; - /** - * Defaults for Bucket cache - */ - public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true; - public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3; - public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; - public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f; + public static final String BLOCK_CACHE_IMPL_KEY = "hbase.block.cache.impl"; // Defaults @@ -105,6 +83,8 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final String DEFAULT_BLOCK_CACHE_IMPL = + LruBlockCache.class.getCanonicalName(); /** Local reference to the block cache, null if completely disabled */ private final BlockCache blockCache; @@ -353,7 +333,7 @@ public class CacheConfig { float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - if (cachePercentage == 0L) { + if (cachePercentage <= 0L) { blockCacheDisabled = true; return null; } @@ -362,60 +342,18 @@ public class CacheConfig { " must be between 0.0 and 1.0, and not > 1.0"); } - // Calculate the amount of heap to give the heap. - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - long lruCacheSize = (long) (mu.getMax() * cachePercentage); - int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); - long offHeapCacheSize = - (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * - DirectMemoryUtils.getDirectMemorySize()); - if (offHeapCacheSize <= 0) { - String bucketCacheIOEngineName = conf.get(BUCKET_CACHE_IOENGINE_KEY, null); - float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); - // A percentage of max heap size or a absolute value with unit megabytes - long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax() - * bucketCachePercentage : bucketCachePercentage * 1024 * 1024); - - boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, - DEFAULT_BUCKET_CACHE_COMBINED); - BucketCache bucketCache = null; - if (bucketCacheIOEngineName != null && bucketCacheSize > 0) { - int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY, - DEFAULT_BUCKET_CACHE_WRITER_THREADS); - int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY, - DEFAULT_BUCKET_CACHE_WRITER_QUEUE); - String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY); - float combinedPercentage = conf.getFloat( - BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, - DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE); - if (combinedWithLru) { - lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize); - bucketCacheSize = (long) (combinedPercentage * bucketCacheSize); - } - try { - int ioErrorsTolerationDuration = conf.getInt( - "hbase.bucketcache.ioengine.errors.tolerated.duration", - BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); - bucketCache = new BucketCache(bucketCacheIOEngineName, - bucketCacheSize, blockSize, writerThreads, writerQueueLen, persistentPath, - ioErrorsTolerationDuration); - } catch (IOException ioex) { - LOG.error("Can't instantiate bucket cache", ioex); - throw new RuntimeException(ioex); - } - } - LOG.info("Allocating LruBlockCache with maximum size " + - StringUtils.humanReadableInt(lruCacheSize)); - LruBlockCache lruCache = new LruBlockCache(lruCacheSize, blockSize); - lruCache.setVictimCache(bucketCache); - if (bucketCache != null && combinedWithLru) { - globalBlockCache = new CombinedBlockCache(lruCache, bucketCache); - } else { - globalBlockCache = lruCache; - } - } else { - globalBlockCache = new DoubleBlockCache( - lruCacheSize, offHeapCacheSize, blockSize, blockSize, conf); + try { + Class implClass = Class.forName(conf.get(BLOCK_CACHE_IMPL_KEY, DEFAULT_BLOCK_CACHE_IMPL)); + Method create = implClass.getDeclaredMethod("create", Configuration.class); + globalBlockCache = (BlockCache) create.invoke(null, conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } return globalBlockCache; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 2aae578..7cf2dad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -21,17 +21,19 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; /** * CombinedBlockCache is an abstraction layer that combines - * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used + * {@link LruBlockCache} and {@link BucketCache}. The smaller lruBlockCache is used * to cache bloom blocks and index blocks , the larger bucketCache is used to - * cache data blocks. getBlock reads first from the smaller lruCache before + * cache data blocks. getBlock reads first from the smaller lruBlockCache before * looking for the block in the bucketCache. Metrics are the combined size and * hits and misses of both caches. * @@ -39,27 +41,59 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; @InterfaceAudience.Private public class CombinedBlockCache implements BlockCache, HeapSize { - private final LruBlockCache lruCache; + public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = + "hbase.bucketcache.percentage.in.combinedcache"; + + /* Assume direct or file mode as default operation. */ + public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 1.0f; + + private final LruBlockCache lruBlockCache; private final BucketCache bucketCache; private final CombinedCacheStats combinedCacheStats; - public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) { - this.lruCache = lruCache; + /** + * Construct a BlockCache instance from Configuration. + */ + public static CombinedBlockCache create(Configuration conf) { + long configuredBucketCacheSize = BucketCache.getBucketCacheSize(conf); + float combinedPercentage = conf.getFloat( + BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, + DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE); + /* + * hbase.bucketcache.percentage.in.combinedcache = 1.0 means LruBlockCache + * instance would be 0 bytes. Instead, fallback to default heap caclulation + * for LruBlockCache creation. This is the default action as on-heap + * BucketCache probably not the common usecase. + */ + long lruCacheSize = (long) ((1 - combinedPercentage) * configuredBucketCacheSize); + long bucketCacheSize = (long) (combinedPercentage * configuredBucketCacheSize); + // make a copy so that nothing is modified externally + conf = HBaseConfiguration.create(conf); + if (lruCacheSize != 0) { + conf.setLong(LruBlockCache.LRU_CACHE_SIZE_KEY, lruCacheSize); + } + conf.setLong(BucketCache.BUCKET_CACHE_SIZE_KEY, bucketCacheSize); + return new CombinedBlockCache(LruBlockCache.create(conf), BucketCache.create(conf)); + } + + @VisibleForTesting + protected CombinedBlockCache(LruBlockCache lruBlockCache, BucketCache bucketCache) { + this.lruBlockCache = lruBlockCache; this.bucketCache = bucketCache; - this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), + this.combinedCacheStats = new CombinedCacheStats(lruBlockCache.getStats(), bucketCache.getStats()); } @Override public long heapSize() { - return lruCache.heapSize() + bucketCache.heapSize(); + return lruBlockCache.heapSize() + bucketCache.heapSize(); } @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; if (isMetaBlock) { - lruCache.cacheBlock(cacheKey, buf, inMemory); + lruBlockCache.cacheBlock(cacheKey, buf, inMemory); } else { bucketCache.cacheBlock(cacheKey, buf, inMemory); } @@ -73,20 +107,20 @@ public class CombinedBlockCache implements BlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { - if (lruCache.containsBlock(cacheKey)) { - return lruCache.getBlock(cacheKey, caching, repeat); + if (lruBlockCache.containsBlock(cacheKey)) { + return lruBlockCache.getBlock(cacheKey, caching, repeat); } return bucketCache.getBlock(cacheKey, caching, repeat); } @Override public boolean evictBlock(BlockCacheKey cacheKey) { - return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); + return lruBlockCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); } @Override public int evictBlocksByHfileName(String hfileName) { - return lruCache.evictBlocksByHfileName(hfileName) + return lruBlockCache.evictBlocksByHfileName(hfileName) + bucketCache.evictBlocksByHfileName(hfileName); } @@ -97,33 +131,33 @@ public class CombinedBlockCache implements BlockCache, HeapSize { @Override public void shutdown() { - lruCache.shutdown(); + lruBlockCache.shutdown(); bucketCache.shutdown(); } @Override public long size() { - return lruCache.size() + bucketCache.size(); + return lruBlockCache.size() + bucketCache.size(); } @Override public long getFreeSize() { - return lruCache.getFreeSize() + bucketCache.getFreeSize(); + return lruBlockCache.getFreeSize() + bucketCache.getFreeSize(); } @Override public long getCurrentSize() { - return lruCache.getCurrentSize() + bucketCache.getCurrentSize(); + return lruBlockCache.getCurrentSize() + bucketCache.getCurrentSize(); } @Override public long getEvictedCount() { - return lruCache.getEvictedCount() + bucketCache.getEvictedCount(); + return lruBlockCache.getEvictedCount() + bucketCache.getEvictedCount(); } @Override public long getBlockCount() { - return lruCache.getBlockCount() + bucketCache.getBlockCount(); + return lruBlockCache.getBlockCount() + bucketCache.getBlockCount(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java index 0e32b4b..36bcd5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java @@ -21,13 +21,13 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.slab.SlabCache; -import org.apache.hadoop.util.StringUtils; /** * DoubleBlockCache is an abstraction layer that combines two caches, the @@ -47,34 +47,16 @@ public class DoubleBlockCache implements ResizableBlockCache, HeapSize { private final CacheStats stats; /** - * Default constructor. Specify maximum size and expected average block size - * (approximation is fine). - *

- * All other factors will be calculated based on defaults specified in this - * class. - * - * @param onHeapSize maximum size of the onHeapCache, in bytes. - * @param offHeapSize maximum size of the offHeapCache, in bytes. - * @param onHeapBlockSize average block size of the on heap cache. - * @param offHeapBlockSize average block size for the off heap cache - * @param conf configuration file. currently used only by the off heap cache. + * Construct a BlockCache instance from Configuration. */ - public DoubleBlockCache(long onHeapSize, long offHeapSize, - long onHeapBlockSize, long offHeapBlockSize, Configuration conf) { - - LOG.info("Creating on-heap cache of size " - + StringUtils.humanReadableInt(onHeapSize) - + "bytes with an average block size of " - + StringUtils.humanReadableInt(onHeapBlockSize) + " bytes."); - onHeapCache = new LruBlockCache(onHeapSize, onHeapBlockSize, conf); - - LOG.info("Creating off-heap cache of size " - + StringUtils.humanReadableInt(offHeapSize) - + "bytes with an average block size of " - + StringUtils.humanReadableInt(offHeapBlockSize) + " bytes."); - offHeapCache = new SlabCache(offHeapSize, offHeapBlockSize); - - offHeapCache.addSlabByConf(conf); + public static DoubleBlockCache create(Configuration conf) { + return new DoubleBlockCache(LruBlockCache.create(conf), SlabCache.create(conf)); + } + + @VisibleForTesting + protected DoubleBlockCache(LruBlockCache lruBlockCache, SlabCache slabCache) { + this.onHeapCache = lruBlockCache; + this.offHeapCache = slabCache; this.stats = new CacheStats(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 54fed36..00eb0cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -37,12 +39,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority; @@ -99,6 +104,15 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { static final Log LOG = LogFactory.getLog(LruBlockCache.class); + /** + * Override the cache size calculation with an explicit value. Internal + * configuration parameter used by multilevel BlockCache strategies. Users + * should continue to use "hfile.block.cache.size" for compatibility with + * {@link HBaseConfiguration#checkForClusterFreeMemoryLimit(Configuration)} + * and o.a.h.h.regionserver.HeapMemoryManager. + */ + static final String LRU_CACHE_SIZE_KEY = "hbase.lru.blockcache.size"; + static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor"; static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage"; @@ -195,6 +209,24 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { private BucketCache victimHandler = null; /** + * Construct a BlockCache instance from Configuration. + */ + public static LruBlockCache create(Configuration conf) { + // respect the internal conf point. + long lruCacheSize = conf.getLong(LRU_CACHE_SIZE_KEY, -1); + if (lruCacheSize == -1) { + // Calculate the amount of heap to give the BlockCache. + float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + lruCacheSize = (long) (mu.getMax() * cachePercentage); + } + LOG.info("Allocating LruBlockCache with maximum size " + + StringUtils.humanReadableInt(lruCacheSize)); + return new LruBlockCache(lruCacheSize, HConstants.DEFAULT_BLOCKSIZE); + } + + /** * Default constructor. Specify maximum size and expected average block * size (approximation is fine). * @@ -203,14 +235,16 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param maxSize maximum size of cache, in bytes * @param blockSize approximate size of each block, in bytes */ - public LruBlockCache(long maxSize, long blockSize) { + @VisibleForTesting + protected LruBlockCache(long maxSize, long blockSize) { this(maxSize, blockSize, true); } /** * Constructor used for testing. Allows disabling of the eviction thread. */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { + @VisibleForTesting + protected LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { this(maxSize, blockSize, evictionThread, (int)Math.ceil(1.2*maxSize/blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, @@ -222,7 +256,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { ); } - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { + @VisibleForTesting + protected LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { this(maxSize, blockSize, evictionThread, (int)Math.ceil(1.2*maxSize/blockSize), DEFAULT_LOAD_FACTOR, @@ -236,7 +271,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { ); } - public LruBlockCache(long maxSize, long blockSize, Configuration conf) { + @VisibleForTesting + protected LruBlockCache(long maxSize, long blockSize, Configuration conf) { this(maxSize, blockSize, true, conf); } @@ -254,7 +290,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param multiFactor percentage of total size for multiple-access blocks * @param memoryFactor percentage of total size for in-memory blocks */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, + @VisibleForTesting + protected LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor, boolean forceInMemory) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 1527c12..9127ab5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; @@ -45,10 +47,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; @@ -59,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFileBlock; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; @@ -91,7 +94,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class BucketCache implements BlockCache, HeapSize { static final Log LOG = LogFactory.getLog(BucketCache.class); - /** Priority buckets */ + /* Configuration keys for Bucket cache */ + + public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine"; + public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = + "hbase.bucketcache.persistent.path"; + public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; + public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = + "hbase.bucketcache.writer.queuelength"; + + /* Defaults for Bucket cache */ + + public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3; + public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64; + + /* Priority buckets */ + private static final float DEFAULT_SINGLE_FACTOR = 0.25f; private static final float DEFAULT_MULTI_FACTOR = 0.50f; private static final float DEFAULT_MEMORY_FACTOR = 0.25f; @@ -125,8 +144,6 @@ public class BucketCache implements BlockCache, HeapSize { new ArrayList>(); WriterThread writerThreads[]; - - /** Volatile boolean to track if free space is in process or not */ private volatile boolean freeInProgress = false; private Lock freeSpaceLock = new ReentrantLock(); @@ -196,17 +213,70 @@ public class BucketCache implements BlockCache, HeapSize { // Allocate or free space for the block private BucketAllocator bucketAllocator; - - public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, + + /** + * Calculate the size of the BucketCache according to configuration. + * + * Visible for CombinedBlockCache. + */ + public static long getBucketCacheSize(Configuration conf) { + float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); + // A percentage of max heap size or a absolute value with unit megabytes + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + return (long) (bucketCachePercentage < 1 ? mu.getMax() + * bucketCachePercentage : bucketCachePercentage * 1024 * 1024); + } + + /** + * Construct a BlockCache instance from Configuration. + */ + public static BucketCache create(Configuration conf) { + String bucketCacheIOEngineName = conf.get(BUCKET_CACHE_IOENGINE_KEY, null); + if (bucketCacheIOEngineName == null) { + throw new IllegalArgumentException( + "BucketCache enabled but no engine provided in " + BUCKET_CACHE_IOENGINE_KEY); + } + long bucketCacheSize = getBucketCacheSize(conf); + if (bucketCacheSize <= 0) { + throw new IllegalArgumentException( + "Invalid configuration value specified for parameter " + BUCKET_CACHE_SIZE_KEY); + } + // TODO: is this the correct config to check? + int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + + int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY, + DEFAULT_BUCKET_CACHE_WRITER_THREADS); + int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY, + DEFAULT_BUCKET_CACHE_WRITER_QUEUE); + String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY); + + BucketCache bucketCache = null; + try { + int ioErrorsTolerationDuration = conf.getInt( + "hbase.bucketcache.ioengine.errors.tolerated.duration", + BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); + bucketCache = new BucketCache(bucketCacheIOEngineName, + bucketCacheSize, blockSize, writerThreads, writerQueueLen, persistentPath, + ioErrorsTolerationDuration); + } catch (IOException ioex) { + LOG.error("Can't instantiate bucket cache", ioex); + throw new RuntimeException(ioex); + } + return bucketCache; + } + + @VisibleForTesting + protected BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { this(ioEngineName, capacity, blockSize, writerThreadNum, writerQLen, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); } - - public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, + + @VisibleForTesting + protected BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) - throws FileNotFoundException, IOException { + throws IOException { this.ioEngine = getIOEngineFromName(ioEngineName, capacity); this.writerThreads = new WriterThread[writerThreadNum]; this.cacheWaitSignals = new Object[writerThreadNum]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index cecd683..abcb530 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferArray; +import org.apache.hadoop.hbase.util.DirectMemoryUtils; /** * IO engine that stores data on the memory using an array of ByteBuffers @@ -30,18 +33,30 @@ import org.apache.hadoop.hbase.util.ByteBufferArray; */ @InterfaceAudience.Private public class ByteBufferIOEngine implements IOEngine { + static final Log LOG = LogFactory.getLog(ByteBufferIOEngine.class); private ByteBufferArray bufferArray; /** * Construct the ByteBufferIOEngine with the given capacity * @param capacity - * @param direct true if allocate direct buffer + * @param isDirect true if allocate isDirect buffer * @throws IOException */ - public ByteBufferIOEngine(long capacity, boolean direct) + public ByteBufferIOEngine(long capacity, boolean isDirect) throws IOException { - bufferArray = new ByteBufferArray(capacity, direct); + long directSize = DirectMemoryUtils.getDirectMemorySize(); + if (isDirect && capacity > directSize) { + throw new IllegalArgumentException("Requested direct memory cache size (" + + capacity + " bytes) is larger than available direct memory (" + + directSize + " bytes)."); + } + if (isDirect && (capacity + 64) > directSize) { + LOG.warn("Requested direct memory cache size (" + capacity + " bytes) " + + "does not leave room for JVM direct memory needs. Consider leaving " + + "64m for JVM."); + } + bufferArray = new ByteBufferArray(capacity, isDirect); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 9b4621e..bb53160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -29,10 +29,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.DirectMemoryUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.util.StringUtils; @@ -71,14 +74,31 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase( SlabCache.class, false); + public static SlabCache create(Configuration conf) { + int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + long offHeapCacheSize = + (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) * + DirectMemoryUtils.getDirectMemorySize()); + + LOG.info("Creating SlabCache of size " + + StringUtils.humanReadableInt(offHeapCacheSize) + + "bytes with an average block size of " + + StringUtils.humanReadableInt(blockSize) + " bytes."); + + SlabCache inst = new SlabCache(offHeapCacheSize, blockSize); + // TODO: RAII would be nice. + inst.addSlabByConf(conf); + return inst; + } + /** * Default constructor, creates an empty SlabCache. * * @param size Total size allocated to the SlabCache. (Bytes) * @param avgBlockSize Average size of a block being cached. - **/ - - public SlabCache(long size, long avgBlockSize) { + */ + @VisibleForTesting + protected SlabCache(long size, long avgBlockSize) { this.avgBlockSize = avgBlockSize; this.size = size; this.stats = new CacheStats(); @@ -89,7 +109,6 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { sizer = new TreeMap(); this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS); - } /**