From 7d4c5faee0a926cf69814d4a423dd9308bb51b11 Mon Sep 17 00:00:00 2001 From: dvdreddy Date: Tue, 13 Sep 2016 17:00:45 -0700 Subject: [PATCH] HBASE-16630 Handle Fragmentation in bucket cache Currently whenever a compaction/bulkload happen and the blocks are evicted from theirs buckets the buckets become fragmented and are not available to be used by other BucketSizes, inorder to fix this whenever we encounter a situation where an allocation cant be made for a BucketSize we will doing a deFragmentation(compaction) for the bucketSizes where are occupancy ratio is terrible. Most of the logic is present 'deFragment' function in BucketCache, even though compaction seems to be a more logical name, it is avoided as it causes confusion with existing HBase's compacitons. Change-Id: I037ac50a4f46c7e964beeee3bbd038a633885473 --- .../hbase/io/hfile/bucket/BucketAllocator.java | 33 ++++++-- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 97 ++++++++++++++++++++-- .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 7 ++ .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 10 ++- .../hbase/io/hfile/bucket/FileMmapEngine.java | 11 ++- .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 10 +++ 6 files changed, 152 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 67a4f1f..5adfb2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -197,6 +197,10 @@ public final class BucketAllocator { return sizeIndex; } + public int itemSize() { + return bucketSizes[sizeIndex]; + } + /** * Find a bucket to allocate a block * @return the offset in the IOEngine @@ -255,12 +259,18 @@ public final class BucketAllocator { public synchronized IndexStatistics statistics() { long free = 0, used = 0; + float nonZeroUsedCount = 0.0f, nonZeroFreeCount = 0.0f; for (Object obj : bucketList.keySet()) { Bucket b = (Bucket) obj; free += b.freeCount(); used += b.usedCount(); + if (used > 0) { + nonZeroFreeCount += b.freeCount(); + nonZeroUsedCount += b.usedCount(); + } } - return new IndexStatistics(free, used, bucketSizes[sizeIndex]); + return new IndexStatistics(free, used, bucketSizes[sizeIndex], + nonZeroUsedCount / (nonZeroFreeCount + nonZeroUsedCount)); } @Override @@ -497,6 +507,10 @@ public final class BucketAllocator { static class IndexStatistics { private long freeCount, usedCount, itemSize, totalCount; + // This tracks the overall occupancy ratio for buckets that + // have atleast one usedBlock, this is used to track fragmentation + // for the each of the BucketSizes and trigger a defragmentation + private float nonZeroOccupancyRatio; public long freeCount() { return freeCount; @@ -526,19 +540,26 @@ public final class BucketAllocator { return itemSize; } - public IndexStatistics(long free, long used, long itemSize) { - setTo(free, used, itemSize); + public float nonZeroOccupancyRatio() { + return nonZeroOccupancyRatio; + } + + public IndexStatistics(long free, long used, long itemSize, + float nonZeroOccupancyRatio) { + setTo(free, used, itemSize, nonZeroOccupancyRatio); } public IndexStatistics() { - setTo(-1, -1, 0); + setTo(-1, -1, 0, 0.0f); } - public void setTo(long free, long used, long itemSize) { + public void setTo(long free, long used, long itemSize, + float nonZeroOccupancyRatio) { this.itemSize = itemSize; this.freeCount = free; this.usedCount = used; this.totalCount = free + used; + this.nonZeroOccupancyRatio = nonZeroOccupancyRatio; } } @@ -565,7 +586,7 @@ public final class BucketAllocator { totalfree += stat.freeBytes(); totalused += stat.usedBytes(); } - grandTotal.setTo(totalfree, totalused, 1); + grandTotal.setTo(totalfree, totalused, 1, -1.0f); return stats; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index ec7a71f..ad61a02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -31,6 +31,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -107,6 +109,8 @@ public class BucketCache implements BlockCache, HeapSize { private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; private static final float DEFAULT_MIN_FACTOR = 0.85f; + private static final float DEFAULT_DEFRAGMENTATION_THRESHOLD = 0.70f; + /** Statistics thread */ private static final int statThreadPeriod = 5 * 60; @@ -630,6 +634,86 @@ public class BucketCache implements BlockCache, HeapSize { } /** + * + * @param why the reason why thsi function is being called + **/ + private synchronized void deFragment(final String why) { + LOG.info("Running a deFragmentation on the allocator because " + why); + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + Set bucketSizesForFragmentation = new HashSet(); + for (BucketAllocator.IndexStatistics stat : stats) { + if (stat.nonZeroOccupancyRatio() < DEFAULT_DEFRAGMENTATION_THRESHOLD) { + bucketSizesForFragmentation.add(stat.itemSize()); + } + } + + long relocatedCount = 0; + + for (BlockCacheKey key : backingMap.keySet()) { + BucketEntry bucketEntry = backingMap.get(key); + // Skip if the Bucket is not the one in fragmentation + if (!bucketSizesForFragmentation.contains((long) bucketAllocator + .roundUpToBucketSizeInfo(bucketEntry.getLength()).itemSize())) { + continue; + } + // Skip the relocation if somebody is using it + if (bucketEntry.refCount.get() > 0 || bucketEntry.markedForEvict) { + continue; + } + ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); + // Parameters used for eviction + long newOffset = -1; + boolean blockCached = false; + try { + lock.writeLock().lock(); + // ReCheck the refCount, so somebody has used in between + if (bucketEntry.refCount.get() > 0 || bucketEntry.markedForEvict) { + continue; + } + + // We can not read here even if backingMap does contain the given key because its offset + // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check + // existence here. + if (bucketEntry.equals(backingMap.get(key))) { + int len = bucketEntry.getLength(); + if (LOG.isTraceEnabled()) { + LOG.trace("Relocating offset=" + bucketEntry.offset() + ", len=" + len); + } + relocatedCount++; + // Read from the old offset + ByteBuffer cachedBlock = ioEngine.read(bucketEntry.offset(), len); + // Some IOEngine currently requires that for writing we need the + // ByteBuffer contain array + if (!cachedBlock.hasArray()) { + byte[] buff = ByteBufferUtils.toBytes(cachedBlock, cachedBlock.position()); + // Check the length is same in both cases + assert buff.length == len; + cachedBlock = ByteBuffer.wrap(buff); + } + // Get a new offset and write to new offset + newOffset = bucketAllocator.allocateBlock(len); + ioEngine.write(cachedBlock, newOffset); + backingMap.put(key, new BucketEntry(newOffset, len, bucketEntry.accessCounter, + bucketEntry.getPriority())); + blockCached = true; + // Evict the old one + bucketAllocator.freeBlock(bucketEntry.offset()); + } + } catch (IOException ioex) { + LOG.error("Failed reading block " + key + " from bucket cache", ioex); + checkIOErrorIsTolerated(); + // Evict the block from bucketAllocator incase of IOException + if (blockCached) { + bucketAllocator.freeBlock(newOffset); + } + } finally { + lock.readLock().unlock(); + } + } + LOG.info("Defragmentation completed and the number of blocks relocated are" + relocatedCount); + } + + /** * Free the space if the used size reaches acceptableSize() or one size block * couldn't be allocated. When freeing the space, we use the LRU algorithm and * ensure there must be some blocks evicted @@ -864,6 +948,8 @@ public class BucketCache implements BlockCache, HeapSize { } catch (CacheFullException cfe) { // Cache full when we tried to add. Try freeing space and then retrying (don't up index) if (!freeInProgress) { + // Run both deFragmentation and free + deFragment("Full!"); freeSpace("Full!"); } else { Thread.sleep(50); @@ -1162,16 +1248,17 @@ public class BucketCache implements BlockCache, HeapSize { private final long cachedTime = System.nanoTime(); BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { + this(offset, length, accessCounter, inMemory ? BlockPriority.MEMORY : BlockPriority.SINGLE); + } + + BucketEntry(long offset, int length, long accessCounter, BlockPriority priority) { setOffset(offset); this.length = length; this.accessCounter = accessCounter; - if (inMemory) { - this.priority = BlockPriority.MEMORY; - } else { - this.priority = BlockPriority.SINGLE; - } + this.priority = priority; } + long offset() { // Java has no unsigned numbers long o = ((long) offsetBase) & 0xFFFFFFFF; o += (((long) (offset1)) & 0xFF) << 32; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 63de32c..0145bb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -123,6 +123,13 @@ public class ByteBufferIOEngine implements IOEngine { return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED); } + @Override + public ByteBuffer read(long offset, int length) { + ByteBuff buff = bufferArray.asSubByteBuff(offset, length); + return ByteBuffer.wrap(buff.toBytes(0, buff.limit())); + } + + /** * Transfers data from the given byte buffer to the buffer array * @param srcBuffer the given byte buffer from which bytes are to be read diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index aaf5cf9..07f5cd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -93,8 +93,7 @@ public class FileIOEngine implements IOEngine { @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { - ByteBuffer dstBuffer = ByteBuffer.allocate(length); - fileChannel.read(dstBuffer, offset); + ByteBuffer dstBuffer = read(offset, length); // The buffer created out of the fileChannel is formed by copying the data from the file // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts // this buffer from the file the data is already copied and there is no need to ensure that @@ -106,6 +105,13 @@ public class FileIOEngine implements IOEngine { return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); } + @Override + public ByteBuffer read(long offset, int length) throws IOException { + ByteBuffer dstBuffer = ByteBuffer.allocate(length); + fileChannel.read(dstBuffer, offset); + return dstBuffer; + } + /** * Transfers data from the given byte buffer to file * @param srcBuffer the given byte buffer from which bytes are to be read diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java index 7a2afe8..35088a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java @@ -109,10 +109,15 @@ public class FileMmapEngine implements IOEngine { @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { + return deserializer.deserialize(new SingleByteBuff(read(offset, length)), + true, MemoryType.EXCLUSIVE); + } + + @Override + public ByteBuffer read(long offset, int length) throws IOException { byte[] dst = new byte[length]; bufferArray.getMultiple(offset, length, dst); - return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, - MemoryType.EXCLUSIVE); + return ByteBuffer.wrap(dst); } /** @@ -163,4 +168,4 @@ public class FileMmapEngine implements IOEngine { LOG.error("Can't shutdown cleanly", ex); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index ab673f4..0a5d094 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -50,6 +50,16 @@ public interface IOEngine { throws IOException; /** + * Returns the raw ByteBuffer when reading from IOEngine + * @param offset the offset to read from + * @param length the length of the buffer to read from + * @return ByteBuffer of the data read + * @throws IOException + * @throws RuntimeException when the lenght of the ByteBuffer read is less then 'len' + */ + ByteBuffer read(long offset, int length) throws IOException; + + /** * Transfers data from the given byte buffer to IOEngine * @param srcBuffer the given byte buffer from which bytes are to be read * @param offset The offset in the IO engine where the first byte to be -- 2.2.1