From a110aa0abec88f568a932e2d4532f87abca3db8d Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 3 Jun 2014 15:47:49 -0700 Subject: [PATCH] HBASE-10641 Configurable Bucket Sizes in bucketCache --- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 16 +++- .../hbase/io/hfile/bucket/BucketAllocator.java | 93 ++++++++++++---------- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 22 ++--- .../hbase/io/hfile/bucket/TestBucketCache.java | 87 +++++++++++++------- 4 files changed, 138 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index db09127..85ccd8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -88,6 +88,12 @@ public class CacheConfig { public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength"; + + /** + * A comma-delimited array of values for use as bucket sizes. + */ + public static final String BUCKET_CACHE_BUCKETS_KEY = "hbase.bucketcache.bucket.sizes"; + /** * Defaults for Bucket cache */ @@ -415,6 +421,14 @@ public class CacheConfig { float combinedPercentage = conf.getFloat( BUCKET_CACHE_COMBINED_PERCENTAGE_KEY, DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE); + String[] configuredBucketSizes = conf.getStrings(BUCKET_CACHE_BUCKETS_KEY); + int[] bucketSizes = null; + if (configuredBucketSizes != null) { + bucketSizes = new int[configuredBucketSizes.length]; + for (int i = 0; i < configuredBucketSizes.length; i++) { + bucketSizes[i] = Integer.parseInt(configuredBucketSizes[i]); + } + } if (combinedWithLru) { lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize); bucketCacheSize = (long) (combinedPercentage * bucketCacheSize); @@ -424,7 +438,7 @@ public class CacheConfig { "hbase.bucketcache.ioengine.errors.tolerated.duration", BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); bucketCache = new BucketCache(bucketCacheIOEngineName, - bucketCacheSize, blockSize, writerThreads, writerQueueLen, persistentPath, + bucketCacheSize, blockSize, bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration); } catch (IOException ioex) { LOG.error("Can't instantiate bucket cache", ioex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index c29edbb..39ef523 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -55,11 +57,11 @@ public final class BucketAllocator { sizeIndex = -1; } - void reconfigure(int sizeIndex) { + void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) { + Preconditions.checkElementIndex(sizeIndex, bucketSizes.length); this.sizeIndex = sizeIndex; - assert sizeIndex < BUCKET_SIZES.length; - itemAllocationSize = BUCKET_SIZES[sizeIndex]; - itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize); + itemAllocationSize = bucketSizes[sizeIndex]; + itemCount = (int) (bucketCapacity / (long) itemAllocationSize); freeCount = itemCount; usedCount = 0; freeList = new int[itemCount]; @@ -176,7 +178,7 @@ public final class BucketAllocator { public void instantiateBucket(Bucket b) { assert b.isUninstantiated() || b.isCompletelyFree(); - b.reconfigure(sizeIndex); + b.reconfigure(sizeIndex, bucketSizes, bucketCapacity); bucketList.add(b); freeBuckets.add(b); completelyFreeBuckets.add(b); @@ -246,15 +248,22 @@ public final class BucketAllocator { free += b.freeCount(); used += b.usedCount(); } - return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]); + return new IndexStatistics(free, used, bucketSizes[sizeIndex]); + } + + @Override + public String toString() { + return Objects.toStringHelper(this.getClass()) + .add("sizeIndex", sizeIndex) + .add("bucketSize", bucketSizes[sizeIndex]) + .toString(); } } // Default block size is 64K, so we choose more sizes near 64K, you'd better // reset it according to your cluster's block size distribution - // TODO Make these sizes configurable // TODO Support the view of block size distribution statistics - private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, + private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, @@ -263,42 +272,49 @@ public final class BucketAllocator { /** * Round up the given block size to bucket size, and get the corresponding * BucketSizeInfo - * @param blockSize - * @return BucketSizeInfo */ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { - for (int i = 0; i < BUCKET_SIZES.length; ++i) - if (blockSize <= BUCKET_SIZES[i]) + for (int i = 0; i < bucketSizes.length; ++i) + if (blockSize <= bucketSizes[i]) return bucketSizeInfos[i]; return null; } - static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead static public final int FEWEST_ITEMS_IN_BUCKET = 4; - // The capacity size for each bucket - static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE; + private final int[] bucketSizes; + private final int bigItemSize; + // The capacity size for each bucket + private final long bucketCapacity; private Bucket[] buckets; private BucketSizeInfo[] bucketSizeInfos; private final long totalSize; private long usedSize = 0; - BucketAllocator(long availableSpace) throws BucketAllocatorException { - buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)]; - if (buckets.length < BUCKET_SIZES.length) + BucketAllocator(long availableSpace, int[] bucketSizes) + throws BucketAllocatorException { + this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes; + int largestBucket = this.bucketSizes[0]; + for (int i : this.bucketSizes) { + largestBucket = Math.max(largestBucket, i); + } + this.bigItemSize = largestBucket; + this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize; + buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; + if (buckets.length < this.bucketSizes.length) throw new BucketAllocatorException( "Bucket allocator size too small - must have room for at least " - + BUCKET_SIZES.length + " buckets"); - bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length]; - for (int i = 0; i < BUCKET_SIZES.length; ++i) { + + this.bucketSizes.length + " buckets"); + bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; + for (int i = 0; i < this.bucketSizes.length; ++i) { bucketSizeInfos[i] = new BucketSizeInfo(i); } for (int i = 0; i < buckets.length; ++i) { - buckets[i] = new Bucket(BUCKET_CAPACITY * i); - bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1] + buckets[i] = new Bucket(bucketCapacity * i); + bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1] .instantiateBucket(buckets[i]); } - this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY; + this.totalSize = ((long) buckets.length) * bucketCapacity; } /** @@ -309,9 +325,9 @@ public final class BucketAllocator { * @param realCacheSize cached data size statistics for bucket cache * @throws BucketAllocatorException */ - BucketAllocator(long availableSpace, Map map, + BucketAllocator(long availableSpace, int[] bucketSizes, Map map, AtomicLong realCacheSize) throws BucketAllocatorException { - this(availableSpace); + this(availableSpace, bucketSizes); // each bucket has an offset, sizeindex. probably the buckets are too big // in our default state. so what we do is reconfigure them according to what @@ -322,8 +338,8 @@ public final class BucketAllocator { long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; - for (int i = 0; i < BUCKET_SIZES.length; ++i) { - if (foundLen <= BUCKET_SIZES[i]) { + for (int i = 0; i < bucketSizes.length; ++i) { + if (foundLen <= bucketSizes[i]) { bucketSizeIndex = i; break; } @@ -332,13 +348,13 @@ public final class BucketAllocator { throw new BucketAllocatorException( "Can't match bucket size for the block with size " + foundLen); } - int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY); + int bucketNo = (int) (foundOffset / bucketCapacity); if (bucketNo < 0 || bucketNo >= buckets.length) throw new BucketAllocatorException("Can't find bucket " + bucketNo + ", total buckets=" + buckets.length + "; did you shrink the cache?"); Bucket b = buckets[bucketNo]; - if (reconfigured[bucketNo] == true) { + if (reconfigured[bucketNo]) { if (b.sizeIndex() != bucketSizeIndex) throw new BucketAllocatorException( "Inconsistent allocation in bucket map;"); @@ -378,8 +394,7 @@ public final class BucketAllocator { } public long getFreeSize() { - long freeSize = this.totalSize - getUsedSize(); - return freeSize; + return this.totalSize - getUsedSize(); } public long getTotalSize() { @@ -404,7 +419,7 @@ public final class BucketAllocator { // Ask caller to free up space and try again! if (offset < 0) throw new CacheFullException(blockSize, bsi.sizeIndex()); - usedSize += BUCKET_SIZES[bsi.sizeIndex()]; + usedSize += bucketSizes[bsi.sizeIndex()]; return offset; } @@ -422,7 +437,7 @@ public final class BucketAllocator { * @return size freed */ public synchronized int freeBlock(long offset) { - int bucketNo = (int) (offset / (long) BUCKET_CAPACITY); + int bucketNo = (int) (offset / bucketCapacity); assert bucketNo >= 0 && bucketNo < buckets.length; Bucket targetBucket = buckets[bucketNo]; bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); @@ -431,23 +446,19 @@ public final class BucketAllocator { } public int sizeIndexOfAllocation(long offset) { - int bucketNo = (int) (offset / (long) BUCKET_CAPACITY); + int bucketNo = (int) (offset / bucketCapacity); assert bucketNo >= 0 && bucketNo < buckets.length; Bucket targetBucket = buckets[bucketNo]; return targetBucket.sizeIndex(); } public int sizeOfAllocation(long offset) { - int bucketNo = (int) (offset / (long) BUCKET_CAPACITY); + int bucketNo = (int) (offset / bucketCapacity); assert bucketNo >= 0 && bucketNo < buckets.length; Bucket targetBucket = buckets[bucketNo]; return targetBucket.itemAllocationSize(); } - static public int getMaximumAllocationIndex() { - return BUCKET_SIZES.length; - } - static class IndexStatistics { private long freeCount, usedCount, itemSize, totalCount; @@ -533,7 +544,7 @@ public final class BucketAllocator { } public IndexStatistics[] getIndexStatistics() { - IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length]; + IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; for (int i = 0; i < stats.length; ++i) stats[i] = bucketSizeInfos[i].statistics(); return stats; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 971470c..b1394d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -155,6 +155,7 @@ public class BucketCache implements BlockCache, HeapSize { private long cacheCapacity; /** Approximate block size */ private final long blockSize; + private final int[] bucketSizes; /** Duration of IO errors tolerated before we disable cache, 1 min as default */ private final int ioErrorsTolerationDuration; @@ -196,16 +197,16 @@ public class BucketCache implements BlockCache, HeapSize { // Allocate or free space for the block private BucketAllocator bucketAllocator; - - public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, - int writerQLen, String persistencePath) throws FileNotFoundException, + + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { - this(ioEngineName, capacity, blockSize, writerThreadNum, writerQLen, persistencePath, - DEFAULT_ERROR_TOLERATION_DURATION); + this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); } - public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum, - int writerQLen, String persistencePath, int ioErrorsTolerationDuration) + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException { this.ioEngine = getIOEngineFromName(ioEngineName, capacity); this.writerThreads = new WriterThread[writerThreadNum]; @@ -219,9 +220,10 @@ public class BucketCache implements BlockCache, HeapSize { this.cacheCapacity = capacity; this.persistencePath = persistencePath; this.blockSize = blockSize; + this.bucketSizes = bucketSizes; this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; - bucketAllocator = new BucketAllocator(capacity); + bucketAllocator = new BucketAllocator(capacity, bucketSizes); for (int i = 0; i < writerThreads.length; ++i) { writerQueues.add(new ArrayBlockingQueue(writerQLen)); this.cacheWaitSignals[i] = new Object(); @@ -815,8 +817,8 @@ public class BucketCache implements BlockCache, HeapSize { + ", expected:" + backingMap.getClass().getName()); UniqueIndexMap deserMap = (UniqueIndexMap) ois .readObject(); - BucketAllocator allocator = new BucketAllocator(cacheCapacity, - backingMap, this.realCacheSize); + BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, + backingMap, realCacheSize); backingMap = (ConcurrentHashMap) ois .readObject(); bucketAllocator = allocator; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 0ae716c..403f3d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -18,14 +18,15 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -36,6 +37,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Basic test of BucketCache.Puts and gets. @@ -43,9 +46,29 @@ import org.junit.experimental.categories.Category; * Tests will ensure that blocks' data correctness under several threads * concurrency */ +@RunWith(Parameterized.class) @Category(SmallTests.class) public class TestBucketCache { - static final Log LOG = LogFactory.getLog(TestBucketCache.class); + + private static final Random RAND = new Random(); + + @Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + { 8192, null }, // TODO: why is 8k the default blocksize for these tests? + { 16 * 1024, new int[] { + 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 } } + }); + } + + @Parameterized.Parameter(0) + public int constructedBlockSize; + + @Parameterized.Parameter(1) + public int[] constructedBlockSizes; + BucketCache cache; final int CACHE_SIZE = 1000000; final int NUM_BLOCKS = 100; @@ -61,11 +84,11 @@ public class TestBucketCache { private class MockedBucketCache extends BucketCache { - public MockedBucketCache(String ioEngineName, long capacity, - int writerThreads, - int writerQLen, String persistencePath) throws FileNotFoundException, - IOException { - super(ioEngineName, capacity, 8192, writerThreads, writerQLen, persistencePath); + public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreads, int writerQLen, String persistencePath) + throws FileNotFoundException, IOException { + super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, + persistencePath); super.wait_when_cache = true; } @@ -89,8 +112,8 @@ public class TestBucketCache { @Before public void setup() throws FileNotFoundException, IOException { - cache = new MockedBucketCache(ioEngineName, capacitySize, writeThreads, - writerQLen, persistencePath); + cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); } @After @@ -98,41 +121,49 @@ public class TestBucketCache { cache.shutdown(); } + /** + * Return a random element from {@code a}. + */ + private static T randFrom(List a) { + return a.get(RAND.nextInt(a.size())); + } + @Test public void testBucketAllocator() throws BucketAllocatorException { BucketAllocator mAllocator = cache.getAllocator(); /* * Test the allocator first */ - int[] blockSizes = new int[2]; - blockSizes[0] = 4 * 1024; - blockSizes[1] = 8 * 1024; + final List BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); + boolean full = false; - int i = 0; ArrayList allocations = new ArrayList(); - // Fill the allocated extents - while (!full) { + // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until + // the cache is completely filled. + List tmp = new ArrayList(BLOCKSIZES); + for (int i = 0; !full; i++) { + Integer blockSize = null; try { - allocations.add(new Long(mAllocator.allocateBlock(blockSizes[i - % blockSizes.length]))); - ++i; + blockSize = randFrom(tmp); + allocations.add(mAllocator.allocateBlock(blockSize)); } catch (CacheFullException cfe) { - full = true; + tmp.remove(blockSize); + if (tmp.isEmpty()) full = true; } } - for (i = 0; i < blockSizes.length; i++) { - BucketSizeInfo bucketSizeInfo = mAllocator - .roundUpToBucketSizeInfo(blockSizes[0]); + for (Integer blockSize : BLOCKSIZES) { + BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); IndexStatistics indexStatistics = bucketSizeInfo.statistics(); - assertTrue(indexStatistics.freeCount() == 0); + assertEquals( + "unexpected freeCount for " + bucketSizeInfo, + 0, indexStatistics.freeCount()); } for (long offset : allocations) { - assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator - .freeBlock(offset)); + assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset)); } - assertTrue(mAllocator.getUsedSize() == 0); + assertEquals(0, mAllocator.getUsedSize()); } @Test -- 1.9.0