commit 98c747153fd522bc1c1978a7813494ff11790fba Author: Yu Li Date: Tue Sep 22 21:51:06 2015 +0800 HBASE-14463 Severe performance downgrade when parallel reading a single key from BucketCache diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5eb6f8fb6ff9c72cb99e83a6cc25d0403075033d..e68864b5b5ae795baf8a6fb3a15e2758acbf5407 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -180,14 +180,11 @@ public class BucketCache implements BlockCache, HeapSize { private volatile long ioErrorStartTime = -1; /** - * A "sparse lock" implementation allowing to lock on a particular block - * identified by offset. The purpose of this is to avoid freeing the block - * which is being read. - * - * TODO:We could extend the IdLock to IdReadWriteLock for better. + * A ReentrantReadWriteLock to lock on a particular block identified by offset. + * The purpose of this is to avoid freeing the block which is being read. */ @VisibleForTesting - final IdLock offsetLock = new IdLock(); + final IdReadWriteLock offsetLock = new IdReadWriteLock(); private final ConcurrentIndex blocksByHFile = new ConcurrentIndex(new Comparator() { @@ -412,9 +409,9 @@ public class BucketCache implements BlockCache, HeapSize { BucketEntry bucketEntry = backingMap.get(key); if (bucketEntry != null) { long start = System.nanoTime(); - IdLock.Entry lockEntry = null; + IdReadWriteLock.Entry lockEntry = null; try { - lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + lockEntry = offsetLock.getLockEntry(bucketEntry.offset(), true); // We can not read here even if backingMap does contain the given key because its offset // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. @@ -443,7 +440,7 @@ public class BucketCache implements BlockCache, HeapSize { checkIOErrorIsTolerated(); } finally { if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); + offsetLock.releaseLockEntry(lockEntry, true); } } } @@ -484,9 +481,9 @@ public class BucketCache implements BlockCache, HeapSize { return false; } } - IdLock.Entry lockEntry = null; + IdReadWriteLock.Entry lockEntry = null; try { - lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + lockEntry = offsetLock.getLockEntry(bucketEntry.offset(), false); if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { @@ -497,7 +494,7 @@ public class BucketCache implements BlockCache, HeapSize { return false; } finally { if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); + offsetLock.releaseLockEntry(lockEntry, false); } } cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); @@ -527,9 +524,9 @@ public class BucketCache implements BlockCache, HeapSize { return false; } } - IdLock.Entry lockEntry = null; + IdReadWriteLock.Entry lockEntry = null; try { - lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + lockEntry = offsetLock.getLockEntry(bucketEntry.offset(), false); int refCount = bucketEntry.refCount.get(); if(refCount == 0) { if (backingMap.remove(cacheKey, bucketEntry)) { @@ -558,7 +555,7 @@ public class BucketCache implements BlockCache, HeapSize { return false; } finally { if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); + offsetLock.releaseLockEntry(lockEntry, false); } } cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); @@ -909,9 +906,9 @@ public class BucketCache implements BlockCache, HeapSize { heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); } else if (bucketEntries[i] != null){ // Block should have already been evicted. Remove it and free space. - IdLock.Entry lockEntry = null; + IdReadWriteLock.Entry lockEntry = null; try { - lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset()); + lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset(), false); if (backingMap.remove(key, bucketEntries[i])) { blockEvicted(key, bucketEntries[i], false); } @@ -919,7 +916,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.warn("failed to free space for " + key, e); } finally { if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); + offsetLock.releaseLockEntry(lockEntry, false); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java new file mode 100644 index 0000000000000000000000000000000000000000..f7511c12ea2486791bab9144995d65703c0144b8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The + * intended usage for read lock is as follows: + * + *
+ * IdReadWriteLock.Entry lockEntry = idReadWriteLock.getLockEntry(id, true);
+ * try {
+ *   // User code.
+ * } finally {
+ *   idReadWriteLock.releaseLockEntry(lockEntry, true);
+ * }
+ * 
+ * + * For write lock, change true to false + */ +@InterfaceAudience.Private +public class IdReadWriteLock { + Log LOG = LogFactory.getLog(IdReadWriteLock.class); + // The number of lock we want to easily support. It's not a maximum. + private static final int NB_CONCURRENT_LOCKS = 1000; + // The pool to get entry from, entries are mapped by weak reference to make it able to be + // garbage-collected asap + private final WeakObjectPool entryPool = new WeakObjectPool( + new WeakObjectPool.ObjectFactory() { + @Override + public Entry createObject(Long id) { + return new Entry(id); + } + }, NB_CONCURRENT_LOCKS); + + /** An entry returned to the client with a ReentrantReadWriteLock object */ + public static final class Entry { + private final long id; + private final ReentrantReadWriteLock readWriteLock; + + private Entry(long id) { + this.id = id; + this.readWriteLock = new ReentrantReadWriteLock(); + } + + public String toString() { + return "id=" + id + ", numWaiters=" + readWriteLock.getQueueLength(); + } + + public void lock(boolean read) { + if (read) { + this.readWriteLock.readLock().lock(); + } else { + this.readWriteLock.writeLock().lock(); + } + } + + public void unlock(boolean read) { + if (read) { + this.readWriteLock.readLock().unlock(); + } else { + this.readWriteLock.writeLock().unlock(); + } + } + } + + /** + * Blocks until the lock corresponding to the given id is acquired. + * + * @param id an arbitrary number to lock on + * @param read true for getting the read lock, false for the write lock + * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release + * the lock + * @throws IOException if interrupted + */ + public Entry getLockEntry(long id, boolean read) throws IOException { + entryPool.purge(); + Entry entry = entryPool.get(id); + entry.lock(read); + return entry; + } + + /** + * Must be called in a finally block to release the lock and remove the + * monitor object for the given id if the caller is the last client. + * + * @param entry the return value of {@link #getLockEntry(long, boolean)} + * @param read true for releasing the read lock, false for the write lock + */ + public void releaseLockEntry(Entry entry, boolean read) { + entry.unlock(read); + } + + /** For testing */ + @VisibleForTesting + int purgeAndGetEntryPoolSize() { + System.gc(); + Threads.sleep(200); + entryPool.purge(); + return entryPool.size(); + } + + @VisibleForTesting + public void waitForWaiters(long id, int numWaiters) throws InterruptedException { + for (Entry entry;;) { + entry = entryPool.get(id); + if (entry != null) { + synchronized (entry) { + if (entry.readWriteLock.getQueueLength() >= numWaiters) { + return; + } + } + } + Thread.sleep(50); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 97d591727f9a8d4f9b8363e6e88148e5fb5878cb..8570dba35c1ce8fa007669d984b30d3bc76f012a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -188,7 +188,11 @@ public class CacheTestUtils { public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested .getBlock(key, false, false, true); - assertArrayEquals(buf, returned.buf); + if (returned != null) { + assertArrayEquals(buf, returned.buf); + } else { + Thread.sleep(10); + } totalQueries.incrementAndGet(); } }; @@ -197,6 +201,19 @@ public class CacheTestUtils { ctx.addThread(t); } + // add a thread to periodically evict and re-cache the block + final long blockEvictPeriod = 50; + TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + toBeTested.evictBlock(key); + toBeTested.cacheBlock(key, bac); + Thread.sleep(blockEvictPeriod); + } + }; + t.setDaemon(true); + ctx.addThread(t); + ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 70d21dd115e73f7fa72e587435bc5ecb224d5878..a7d4af57cd81974ca5d09aff67f35020c7b438ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -174,7 +174,7 @@ public class TestBucketCache { @Test public void testCacheMultiThreadedSingleKey() throws Exception { - CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES); } @Test @@ -199,7 +199,7 @@ public class TestBucketCache { cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( new byte[10])); long lockId = cache.backingMap.get(cacheKey).offset(); - IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId); + IdReadWriteLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId, false); Thread evictThread = new Thread("evict-block") { @Override @@ -213,7 +213,7 @@ public class TestBucketCache { cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( new byte[10])); - cache.offsetLock.releaseLockEntry(lockEntry); + cache.offsetLock.releaseLockEntry(lockEntry, false); evictThread.join(); assertEquals(1L, cache.getBlockCount()); assertTrue(cache.getCurrentSize() > 0L); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java new file mode 100644 index 0000000000000000000000000000000000000000..5cb890f6c3b3cd0799ac68606fb2d1e3e1d2d682 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MiscTests.class, MediumTests.class}) +// Medium as it creates 100 threads; seems better to run it isolated +public class TestIdReadWriteLock { + + private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class); + + private static final int NUM_IDS = 16; + private static final int NUM_THREADS = 128; + private static final int NUM_SECONDS = 15; + + private IdReadWriteLock idLock = new IdReadWriteLock(); + + private Map idOwner = new ConcurrentHashMap(); + + private class IdLockTestThread implements Callable { + + private String clientId; + + public IdLockTestThread(String clientId) { + this.clientId = clientId; + } + + @Override + public Boolean call() throws Exception { + Thread.currentThread().setName(clientId); + Random rand = new Random(); + long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(NUM_IDS); + boolean readLock = rand.nextBoolean(); + + IdReadWriteLock.Entry lockEntry = idLock.getLockEntry(id, readLock); + try { + int sleepMs = 1 + rand.nextInt(4); + String owner = idOwner.get(id); + if (owner != null && LOG.isDebugEnabled()) { + LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " already taken by " + + owner + ", we are " + clientId); + } + + idOwner.put(id, clientId); + Thread.sleep(sleepMs); + idOwner.remove(id); + + } finally { + idLock.releaseLockEntry(lockEntry, readLock); + if (LOG.isDebugEnabled()) { + LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of Id" + id + ", we are " + + clientId); + } + } + } + return true; + } + + } + + @Test(timeout = 60000) + public void testMultipleClients() throws Exception { + ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); + try { + ExecutorCompletionService ecs = + new ExecutorCompletionService(exec); + for (int i = 0; i < NUM_THREADS; ++i) + ecs.submit(new IdLockTestThread("client_" + i)); + for (int i = 0; i < NUM_THREADS; ++i) { + Future result = ecs.take(); + assertTrue(result.get()); + } + // make sure the entry pool will be cleared after GC and purge call + int entryPoolSize = idLock.purgeAndGetEntryPoolSize(); + LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize); + assertEquals(0, entryPoolSize); + } finally { + exec.shutdown(); + exec.awaitTermination(5000, TimeUnit.MILLISECONDS); + } + } + + +} +