diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index 32eb0b2..4b97781 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -107,6 +107,13 @@ public enum BlockType { private final byte[] magic; private final BlockCategory metricCat; + private static BlockType[] typeArray = new BlockType[Byte.MAX_VALUE]; + static { + for (BlockType blockType : values()) { + typeArray[blockType.ordinal()] = blockType; + } + } + private BlockType(String magicStr, BlockCategory metricCat) { magic = Bytes.toBytes(magicStr); this.metricCat = metricCat; @@ -219,4 +226,7 @@ public enum BlockType { return this == DATA || this == ENCODED_DATA; } + public static BlockType getByOrdinal(byte ordinal) { + return typeArray[ordinal]; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java index 64405de..40585f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java @@ -30,7 +30,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable { private static final long serialVersionUID = -5199992013113130534L; private final String hfileName; private final long offset; - private final BlockType blockType; + private final byte blockType; private final boolean isPrimaryReplicaBlock; /** @@ -46,7 +46,8 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable { this.isPrimaryReplicaBlock = isPrimaryReplica; this.hfileName = hfileName; this.offset = offset; - this.blockType = blockType; + // A value of -1 means no specific BlockType been passed. Used by tests. + this.blockType = (blockType == null) ? -1 : (byte) blockType.ordinal(); } @Override @@ -105,6 +106,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable { } public BlockType getBlockType() { - return blockType; + // A value of -1 means no specific BlockType been passed. Used by tests. + return (this.blockType == -1) ? null : BlockType.getByOrdinal(this.blockType); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockPriority.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockPriority.java index 9d4ac87..cf85244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockPriority.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockPriority.java @@ -29,5 +29,18 @@ public enum BlockPriority { /** * Block from in-memory store */ - MEMORY + MEMORY; + + public static BlockPriority getByOrdinal(byte ordinal) { + switch (ordinal) { + case 0: + return BlockPriority.SINGLE; + case 1: + return BlockPriority.MULTI; + case 2: + return BlockPriority.MEMORY; + default: + throw new IllegalArgumentException("Invalid ordinal value: " + ordinal); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 489c805..16e0bae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -35,18 +35,15 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -72,6 +69,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; +import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; import org.apache.hadoop.util.StringUtils; @@ -192,24 +190,6 @@ public class BucketCache implements BlockCache, HeapSize { @VisibleForTesting final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT); - private final NavigableSet blocksByHFile = - new ConcurrentSkipListSet<>(new Comparator() { - @Override - public int compare(BlockCacheKey a, BlockCacheKey b) { - int nameComparison = a.getHfileName().compareTo(b.getHfileName()); - if (nameComparison != 0) { - return nameComparison; - } - - if (a.getOffset() == b.getOffset()) { - return 0; - } else if (a.getOffset() < b.getOffset()) { - return -1; - } - return 1; - } - }); - /** Statistics thread schedule pool (for heavy debugging, could remove) */ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); @@ -401,7 +381,6 @@ public class BucketCache implements BlockCache, HeapSize { } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); - blocksByHFile.add(cacheKey); } } @@ -451,7 +430,7 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.ioHit(timeTaken); } if (cachedBlock.getMemoryType() == MemoryType.SHARED) { - bucketEntry.refCount.incrementAndGet(); + bucketEntry.incrementRefCount(); } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { @@ -476,7 +455,6 @@ public class BucketCache implements BlockCache, HeapSize { void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { bucketAllocator.freeBlock(bucketEntry.offset()); realCacheSize.addAndGet(-1 * bucketEntry.getLength()); - blocksByHFile.remove(cacheKey); if (decrementBlockNumber) { this.blockNumber.decrementAndGet(); } @@ -544,8 +522,8 @@ public class BucketCache implements BlockCache, HeapSize { ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { lock.writeLock().lock(); - int refCount = bucketEntry.refCount.get(); - if(refCount == 0) { + int refCount = bucketEntry.getRefCount(); + if (refCount == 0) { if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { @@ -564,7 +542,7 @@ public class BucketCache implements BlockCache, HeapSize { + " readers. Can not be freed now. Hence will mark this" + " for evicting at a later point"); } - bucketEntry.markedForEvict = true; + bucketEntry.markForEvict(); } } } finally { @@ -672,7 +650,7 @@ public class BucketCache implements BlockCache, HeapSize { // this set is small around O(Handler Count) unless something else is wrong Set inUseBuckets = new HashSet(); for (BucketEntry entry : backingMap.values()) { - if (entry.refCount.get() != 0) { + if (entry.getRefCount() != 0) { inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); } } @@ -1167,17 +1145,12 @@ public class BucketCache implements BlockCache, HeapSize { */ @Override public int evictBlocksByHfileName(String hfileName) { - Set keySet = blocksByHFile.subSet( - new BlockCacheKey(hfileName, Long.MIN_VALUE), true, - new BlockCacheKey(hfileName, Long.MAX_VALUE), true); - int numEvicted = 0; - for (BlockCacheKey key : keySet) { - if (evictBlock(key)) { - ++numEvicted; + for (BlockCacheKey key : backingMap.keySet()) { + if (key.getHfileName().equals(hfileName)) { + if (evictBlock(key)) ++numEvicted; } } - return numEvicted; } @@ -1206,10 +1179,7 @@ public class BucketCache implements BlockCache, HeapSize { private byte offset1; byte deserialiserIndex; private volatile long accessCounter; - private BlockPriority priority; - // Set this when we were not able to forcefully evict the block - private volatile boolean markedForEvict; - private AtomicInteger refCount = new AtomicInteger(0); + private byte priority; /** * Time this block was cached. Presumes we are created just before we are added to the cache. @@ -1221,9 +1191,9 @@ public class BucketCache implements BlockCache, HeapSize { this.length = length; this.accessCounter = accessCounter; if (inMemory) { - this.priority = BlockPriority.MEMORY; + this.priority = (byte) BlockPriority.MEMORY.ordinal(); } else { - this.priority = BlockPriority.SINGLE; + this.priority = (byte) BlockPriority.SINGLE.ordinal(); } } @@ -1262,18 +1232,38 @@ public class BucketCache implements BlockCache, HeapSize { */ public void access(long accessCounter) { this.accessCounter = accessCounter; - if (this.priority == BlockPriority.SINGLE) { - this.priority = BlockPriority.MULTI; + if (getPriority() == BlockPriority.SINGLE) { + this.priority = (byte) BlockPriority.MULTI.ordinal(); } } public BlockPriority getPriority() { - return this.priority; + return BlockPriority.getByOrdinal(this.priority); } public long getCachedTime() { return cachedTime; } + + protected int getRefCount() { + return 0; + } + + protected int incrementRefCount() { + return 0; + } + + protected int decrementRefCount() { + return 0; + } + + protected boolean isMarkedForEvict() { + return false; + } + + protected void markForEvict() { + // noop; + } } /** @@ -1374,7 +1364,11 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); + BucketEntry bucketEntry = ioEngine.usesSharedMemory() + ? UnsafeAvailChecker.isAvailable() + ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { @@ -1516,8 +1510,8 @@ public class BucketCache implements BlockCache, HeapSize { if (block.getMemoryType() == MemoryType.SHARED) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - int refCount = bucketEntry.refCount.decrementAndGet(); - if (bucketEntry.markedForEvict && refCount == 0) { + int refCount = bucketEntry.decrementRefCount(); + if (refCount == 0 && bucketEntry.isMarkedForEvict()) { forceEvict(cacheKey); } } @@ -1528,7 +1522,7 @@ public class BucketCache implements BlockCache, HeapSize { public int getRefCount(BlockCacheKey cacheKey) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - return bucketEntry.refCount.get(); + return bucketEntry.getRefCount(); } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 63de32c..f0ea939 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -111,6 +111,11 @@ public class ByteBufferIOEngine implements IOEngine { } @Override + public boolean usesSharedMemory() { + return true; + } + + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index ab673f4..f81e302c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -38,6 +38,15 @@ public interface IOEngine { boolean isPersistent(); /** + * IOEngine uses shared memory means, when reading Cacheable from it, those refers to the same + * memory area as used by the Engine for caching it + * @return true when IOEngine using shared memory. + */ + default boolean usesSharedMemory() { + return false; + } + + /** * Transfers data from IOEngine to a Cacheable object. * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryBucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryBucketEntry.java new file mode 100644 index 0000000..8c94f47 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/SharedMemoryBucketEntry.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +@InterfaceAudience.Private +public class SharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = 1L; + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private AtomicInteger refCount = new AtomicInteger(0); + + public SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + protected int getRefCount() { + return this.refCount.get(); + } + + protected int incrementRefCount() { + return this.refCount.incrementAndGet(); + } + + protected int decrementRefCount() { + return this.refCount.decrementAndGet(); + } + + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java new file mode 100644 index 0000000..414a830 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java @@ -0,0 +1,77 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; + +import sun.misc.Unsafe; + +@InterfaceAudience.Private +public class UnsafeSharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = 1L; + + // We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet. + // We are avoiding the need to have a field of AtomicIneger type and have it as just int type. + // We would like to reduce the head overhead per object of this type as much as possible. + // Doing this direct Unsafe usage save us 16 bytes per Object. + // ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes) + // and 4 bytes reference to it. + private static final Unsafe unsafe = Unsafe.getUnsafe(); + private static final long refCountOffset; + + static { + try { + refCountOffset = unsafe + .objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount")); + } catch (Exception ex) { + throw new Error(ex); + } + } + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private volatile int refCount = 0; + + public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter, + boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + protected int getRefCount() { + return this.refCount; + } + + protected int incrementRefCount() { + return unsafe.getAndAddInt(this, refCountOffset, 1) + 1; + } + + protected int decrementRefCount() { + return unsafe.getAndAddInt(this, refCountOffset, -1) - 1; + } + + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java index e46e43b..6b646ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java @@ -82,7 +82,13 @@ public class HFileCorruptionChecker { boolean quarantine) throws IOException { this.conf = conf; this.fs = FileSystem.get(conf); - this.cacheConf = new CacheConfig(conf); + this.cacheConf = new CacheConfig(conf) { + @Override + public boolean isBlockCacheEnabled() { + // Do not make use of the Block Cache while doing the HFile corruption checks. + return false; + } + }; this.executor = executor; this.inQuarantineMode = quarantine; } @@ -120,7 +126,7 @@ public class HFileCorruptionChecker { } finally { hfilesChecked.addAndGet(1); if (r != null) { - r.close(true); + r.close(false); } } } @@ -252,7 +258,7 @@ public class HFileCorruptionChecker { } finally { mobFilesChecked.addAndGet(1); if (r != null) { - r.close(true); + r.close(false); } } }