diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index d759367..ba321f7 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -179,8 +179,7 @@ public class MemcachedBlockCache implements BlockCache { * This method does nothing so that memcached can handle all evictions. */ @Override - public int evictBlocksByHfileName(String hfileName) { - return 0; + public void evictBlocksByHfileName(String hfileName) { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 20ec8ee..2fef9b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -68,10 +68,8 @@ public interface BlockCache extends Iterable { /** * Evicts all blocks for the given HFile. - * - * @return the number of blocks evicted */ - int evictBlocksByHfileName(String hfileName); + void evictBlocksByHfileName(String hfileName); /** * Get the statistics for this block cache. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 201a41b..9cfd0af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -92,9 +92,9 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { } @Override - public int evictBlocksByHfileName(String hfileName) { - return lruCache.evictBlocksByHfileName(hfileName) - + l2Cache.evictBlocksByHfileName(hfileName); + public void evictBlocksByHfileName(String hfileName) { + lruCache.evictBlocksByHfileName(hfileName); + l2Cache.evictBlocksByHfileName(hfileName); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index f2416bc..94c6b9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1572,10 +1572,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public void close(boolean evictOnClose) throws IOException { PrefetchExecutor.cancel(path); if (evictOnClose && cacheConf.isBlockCacheEnabled()) { - int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); + cacheConf.getBlockCache().evictBlocksByHfileName(name); if (LOG.isTraceEnabled()) { - LOG.trace("On close, file=" + name + " evicted=" + numEvicted - + " block(s)"); + LOG.trace("On close, file=" + name + " evicted block(s)"); } } fsBlockReader.closeStreams(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 0fde0a7..97c283f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -532,22 +532,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * *

* This is used for evict-on-close to remove all blocks of a specific HFile. - * - * @return the number of blocks evicted */ @Override - public int evictBlocksByHfileName(String hfileName) { - int numEvicted = 0; + public void evictBlocksByHfileName(String hfileName) { for (BlockCacheKey key : map.keySet()) { if (key.getHfileName().equals(hfileName)) { - if (evictBlock(key)) - ++numEvicted; + evictBlock(key); } } if (victimHandler != null) { - numEvicted += victimHandler.evictBlocksByHfileName(hfileName); + victimHandler.evictBlocksByHfileName(hfileName); } - return numEvicted; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 6f95b6f..f77a4a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -35,15 +35,14 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -77,6 +76,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; +import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -205,24 +205,6 @@ public class BucketCache implements BlockCache, HeapSize { @VisibleForTesting final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT); - private final NavigableSet blocksByHFile = - new ConcurrentSkipListSet<>(new Comparator() { - @Override - public int compare(BlockCacheKey a, BlockCacheKey b) { - int nameComparison = a.getHfileName().compareTo(b.getHfileName()); - if (nameComparison != 0) { - return nameComparison; - } - - if (a.getOffset() == b.getOffset()) { - return 0; - } else if (a.getOffset() < b.getOffset()) { - return -1; - } - return 1; - } - }); - /** Statistics thread schedule pool (for heavy debugging, could remove) */ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); @@ -248,6 +230,9 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private final EvictionThread evictionThread; + private final BlockingQueue evictFileQueue; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { @@ -324,6 +309,9 @@ public class BucketCache implements BlockCache, HeapSize { ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); + this.evictFileQueue = new LinkedBlockingQueue<>(); + this.evictionThread = new EvictionThread(); + this.evictionThread.start(); } private void sanityCheckConfigs() { @@ -472,7 +460,6 @@ public class BucketCache implements BlockCache, HeapSize { } else { this.blockNumber.increment(); this.heapSize.add(cachedItem.heapSize()); - blocksByHFile.add(cacheKey); } } @@ -522,7 +509,7 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.ioHit(timeTaken); } if (cachedBlock.getMemoryType() == MemoryType.SHARED) { - bucketEntry.refCount.incrementAndGet(); + bucketEntry.incrementRefCount(); } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { @@ -547,7 +534,6 @@ public class BucketCache implements BlockCache, HeapSize { void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { bucketAllocator.freeBlock(bucketEntry.offset()); realCacheSize.add(-1 * bucketEntry.getLength()); - blocksByHFile.remove(cacheKey); if (decrementBlockNumber) { this.blockNumber.decrement(); } @@ -615,8 +601,8 @@ public class BucketCache implements BlockCache, HeapSize { ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { lock.writeLock().lock(); - int refCount = bucketEntry.refCount.get(); - if(refCount == 0) { + int refCount = bucketEntry.getRefCount(); + if (refCount == 0) { if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { @@ -635,7 +621,7 @@ public class BucketCache implements BlockCache, HeapSize { + " readers. Can not be freed now. Hence will mark this" + " for evicting at a later point"); } - bucketEntry.markedForEvict = true; + bucketEntry.markForEvict(); } } } finally { @@ -733,7 +719,7 @@ public class BucketCache implements BlockCache, HeapSize { // this set is small around O(Handler Count) unless something else is wrong Set inUseBuckets = new HashSet(); for (BucketEntry entry : backingMap.values()) { - if (entry.refCount.get() != 0) { + if (entry.getRefCount() != 0) { inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); } } @@ -1183,6 +1169,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.warn("Failed to persist data on exit", e); } } + this.evictionThread.shutdown(); } @Override @@ -1233,23 +1220,55 @@ public class BucketCache implements BlockCache, HeapSize { * Evicts all blocks for a specific HFile. *

* This is used for evict-on-close to remove all blocks of a specific HFile. - * - * @return the number of blocks evicted */ @Override - public int evictBlocksByHfileName(String hfileName) { - Set keySet = blocksByHFile.subSet( - new BlockCacheKey(hfileName, Long.MIN_VALUE), true, - new BlockCacheKey(hfileName, Long.MAX_VALUE), true); - - int numEvicted = 0; - for (BlockCacheKey key : keySet) { - if (evictBlock(key)) { - ++numEvicted; + public void evictBlocksByHfileName(String hfileName) { + try { + this.evictFileQueue.put(hfileName); + } catch (InterruptedException e) { + // evictFileQueue is unbounded so this would never happen. + } + this.evictionThread.notifyEvict(); + } + + private class EvictionThread extends HasThread { + private volatile boolean go = true; + + EvictionThread() { + super(Thread.currentThread().getName() + ".BucketCache.EvictionThread"); + setDaemon(true); + } + + @Override + public void run() { + while (this.go) { + synchronized (this) { + try { + this.wait(1000 * 10); + } catch (InterruptedException e) { + LOG.warn("Interrupted eviction thread ", e); + Thread.currentThread().interrupt(); + } + } + List files = new ArrayList<>(); + evictFileQueue.drainTo(files); + if (files.isEmpty()) continue; + for (BlockCacheKey key : backingMap.keySet()) { + if (files.contains(key.getHfileName())) { + evictBlock(key); + } + } } } - return numEvicted; + synchronized void notifyEvict() { + this.notifyAll(); + } + + synchronized void shutdown() { + this.go = false; + this.notifyAll(); + } } /** @@ -1278,9 +1297,6 @@ public class BucketCache implements BlockCache, HeapSize { byte deserialiserIndex; private volatile long accessCounter; private BlockPriority priority; - // Set this when we were not able to forcefully evict the block - private volatile boolean markedForEvict; - private AtomicInteger refCount = new AtomicInteger(0); /** * Time this block was cached. Presumes we are created just before we are added to the cache. @@ -1345,6 +1361,63 @@ public class BucketCache implements BlockCache, HeapSize { public long getCachedTime() { return cachedTime; } + + protected int getRefCount() { + return 0; + } + + protected int incrementRefCount() { + return 0; + } + + protected int decrementRefCount() { + return 0; + } + + protected boolean isMarkedForEvict() { + return false; + } + + protected void markForEvict() { + // noop; + } + } + + static class SharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = -2187147283772338481L; + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private AtomicInteger refCount = new AtomicInteger(0); + + SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + @Override + protected int getRefCount() { + return this.refCount.get(); + } + + @Override + protected int incrementRefCount() { + return this.refCount.incrementAndGet(); + } + + @Override + protected int decrementRefCount() { + return this.refCount.decrementAndGet(); + } + + @Override + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } } /** @@ -1445,7 +1518,11 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); + BucketEntry bucketEntry = ioEngine.usesSharedMemory() + ? UnsafeAvailChecker.isAvailable() + ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { @@ -1587,8 +1664,8 @@ public class BucketCache implements BlockCache, HeapSize { if (block.getMemoryType() == MemoryType.SHARED) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - int refCount = bucketEntry.refCount.decrementAndGet(); - if (bucketEntry.markedForEvict && refCount == 0) { + int refCount = bucketEntry.decrementRefCount(); + if (refCount == 0 && bucketEntry.isMarkedForEvict()) { forceEvict(cacheKey); } } @@ -1599,7 +1676,7 @@ public class BucketCache implements BlockCache, HeapSize { public int getRefCount(BlockCacheKey cacheKey) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - return bucketEntry.refCount.get(); + return bucketEntry.getRefCount(); } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 4fefa97..f1b43ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -111,6 +111,11 @@ public class ByteBufferIOEngine implements IOEngine { } @Override + public boolean usesSharedMemory() { + return true; + } + + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 61bde6b..251be6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -38,6 +38,15 @@ public interface IOEngine { boolean isPersistent(); /** + * IOEngine uses shared memory means, when reading Cacheable from it, those refers to the same + * memory area as used by the Engine for caching it + * @return true when IOEngine using shared memory. + */ + default boolean usesSharedMemory() { + return false; + } + + /** * Transfers data from IOEngine to a Cacheable object. * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java new file mode 100644 index 0000000..3827894 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; +import org.apache.yetus.audience.InterfaceAudience; + +import sun.misc.Unsafe; + +@InterfaceAudience.Private +public class UnsafeSharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = 707544024564058801L; + + // We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet. + // We are avoiding the need to have a field of AtomicIneger type and have it as just int type. + // We would like to reduce the head overhead per object of this type as much as possible. + // Doing this direct Unsafe usage save us 16 bytes per Object. + // ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes) + // and 4 bytes reference to it. + private static final Unsafe unsafe = Unsafe.getUnsafe(); + private static final long refCountOffset; + + static { + try { + refCountOffset = unsafe + .objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount")); + } catch (Exception ex) { + throw new Error(ex); + } + } + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private volatile int refCount = 0; + + public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter, + boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + @Override + protected int getRefCount() { + return this.refCount; + } + + @Override + protected int incrementRefCount() { + return unsafe.getAndAddInt(this, refCountOffset, 1) + 1; + } + + @Override + protected int decrementRefCount() { + return unsafe.getAndAddInt(this, refCountOffset, -1) - 1; + } + + @Override + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java index 44bbb38..de91c9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java @@ -80,9 +80,11 @@ public class HFileCorruptionChecker { public HFileCorruptionChecker(Configuration conf, ExecutorService executor, boolean quarantine) throws IOException { - this.conf = conf; - this.fs = FileSystem.get(conf); - this.cacheConf = new CacheConfig(conf); + this.conf = new Configuration(conf); + // Do not make use of the Block Cache while doing the HFile corruption checks. + this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0F); + this.fs = FileSystem.get(this.conf); + this.cacheConf = new CacheConfig(this.conf); this.executor = executor; this.inQuarantineMode = quarantine; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index aae04df..ec2c072 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -718,9 +718,8 @@ public class TestHeapMemoryManager { } @Override - public int evictBlocksByHfileName(String hfileName) { + public void evictBlocksByHfileName(String hfileName) { stats.evicted(0, true); // Just assuming only one block for file here. - return 0; } @Override