diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java new file mode 100644 index 0000000..7757c6c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A {@code WeakReference} based shared object pool. + * The objects are kept in weak references and + * associated with keys which are identified by the {@code equals} method. + * The objects are created by {@link ObjectFactory} on demand. + * The object creation is expected to be lightweight, + * and the objects may be excessively created and discarded. + * Thread safe. + */ +@InterfaceAudience.Private +public class WeakObjectPool { + /** + * An {@code ObjectFactory} object is used to create + * new shared objects on demand. + */ + public interface ObjectFactory { + /** + * Creates a new shared object associated with the given {@code key}, + * identified by the {@code equals} method. + * This method may be simultaneously called by multiple threads + * with the same key, and the excessive objects are just discarded. + */ + V createObject(K key); + } + + private final ReferenceQueue staleRefQueue = new ReferenceQueue(); + + private class ObjectReference extends WeakReference { + final K key; + + ObjectReference(K key, V obj) { + super(obj, staleRefQueue); + this.key = key; + } + } + + private final ObjectFactory objectFactory; + + /** Does not permit null keys. */ + private final ConcurrentMap referenceCache; + + /** + * The default initial capacity, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default concurrency level, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * Creates a new pool with the default initial capacity (16) + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * + * @throws NullPointerException if {@code objectFactory} is null + */ + public WeakObjectPool(ObjectFactory objectFactory) { + this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative + */ + public WeakObjectPool(ObjectFactory objectFactory, int initialCapacity) { + this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the given concurrency level. + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * @param concurrencyLevel the estimated count of concurrently accessing threads + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative or + * {@code concurrencyLevel} is non-positive + */ + public WeakObjectPool( + ObjectFactory objectFactory, + int initialCapacity, + int concurrencyLevel) { + + if (objectFactory == null) { + throw new NullPointerException(); + } + this.objectFactory = objectFactory; + + this.referenceCache = new ConcurrentHashMap( + initialCapacity, 0.75f, concurrencyLevel); + // 0.75f is the default load factor threshold of ConcurrentHashMap. + } + + /** + * Removes stale references of shared objects from the pool. + * References newly becoming stale may still remain. + * The implementation of this method is expected to be lightweight + * when there is no stale reference. + */ + public void purge() { + // This method is lightweight while there is no stale reference + // with the Oracle (Sun) implementation of {@code ReferenceQueue}, + // because {@code ReferenceQueue.poll} just checks a volatile instance + // variable in {@code ReferenceQueue}. + + while (true) { + @SuppressWarnings("unchecked") + ObjectReference ref = (ObjectReference)staleRefQueue.poll(); + if (ref == null) { + break; + } + referenceCache.remove(ref.key, ref); + } + } + + /** + * Returns a shared object associated with the given {@code key}, + * which is identified by the {@code equals} method. + * @throws NullPointerException if {@code key} is null + */ + public V get(K key) { + ObjectReference ref = referenceCache.get(key); + if (ref != null) { + V obj = ref.get(); + if (obj != null) { + return obj; + } + referenceCache.remove(key, ref); + } + + V newObj = objectFactory.createObject(key); + ObjectReference newRef = new ObjectReference(key, newObj); + while (true) { + ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef); + if (existingRef == null) { + return newObj; + } + + V existingObject = existingRef.get(); + if (existingObject != null) { + return existingObject; + } + referenceCache.remove(key, existingRef); + } + } + + /** + * Returns an estimated count of objects kept in the pool. + * This also counts stale references, + * and you might want to call {@link #purge()} beforehand. + */ + public int size() { + return referenceCache.size(); + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java new file mode 100644 index 0000000..8aefca3 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestWeakObjectPool { + WeakObjectPool pool; + + @Before + public void setUp() { + pool = new WeakObjectPool( + new WeakObjectPool.ObjectFactory() { + @Override + public Object createObject(String key) { + return new Object(); + } + }); + } + + @Test + public void testKeys() { + Object obj1 = pool.get("a"); + Object obj2 = pool.get(new String("a")); + + Assert.assertSame(obj1, obj2); + + Object obj3 = pool.get("b"); + + Assert.assertNotSame(obj1, obj3); + } + + @Test + public void testWeakReference() throws Exception { + Object obj1 = pool.get("a"); + int hash1 = System.identityHashCode(obj1); + + System.gc(); + System.gc(); + System.gc(); + + Thread.sleep(10); + // Sleep a while because references newly becoming stale + // may still remain when calling the {@code purge} method. + pool.purge(); + Assert.assertEquals(1, pool.size()); + + Object obj2 = pool.get("a"); + Assert.assertSame(obj1, obj2); + + obj1 = null; + obj2 = null; + + System.gc(); + System.gc(); + System.gc(); + + Thread.sleep(10); + pool.purge(); + Assert.assertEquals(0, pool.size()); + + Object obj3 = pool.get("a"); + Assert.assertNotEquals(hash1, System.identityHashCode(obj3)); + } + + @Test(timeout=1000) + public void testCongestion() throws Exception { + final int THREAD_COUNT = 100; + + final AtomicBoolean assertionFailed = new AtomicBoolean(); + final AtomicReference expectedObjRef = new AtomicReference(); + final CountDownLatch prepareLatch = new CountDownLatch(THREAD_COUNT); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); + + for (int i=0; i blocksByHFile = new ConcurrentIndex(new Comparator() { @@ -412,9 +410,9 @@ public class BucketCache implements BlockCache, HeapSize { BucketEntry bucketEntry = backingMap.get(key); if(bucketEntry!=null) { long start = System.nanoTime(); - IdLock.Entry lockEntry = null; + ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { - lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + lock.readLock().lock(); // We can not read here even if backingMap does contain the given key because its offset // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. @@ -443,9 +441,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); } finally { - if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); - } + lock.readLock().unlock(); } } if (!repeat && updateCacheMetrics) { @@ -483,21 +479,16 @@ public class BucketCache implements BlockCache, HeapSize { return false; } } - IdLock.Entry lockEntry = null; + ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { - lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + lock.writeLock().lock(); if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { return false; } - } catch (IOException ie) { - LOG.warn("Failed evicting block " + cacheKey); - return false; } finally { - if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); - } + lock.writeLock().unlock(); } cacheStats.evicted(); return true; @@ -841,18 +832,14 @@ public class BucketCache implements BlockCache, HeapSize { heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); } else if (bucketEntries[i] != null){ // Block should have already been evicted. Remove it and free space. - IdLock.Entry lockEntry = null; + ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); try { - lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset()); + lock.writeLock().lock(); if (backingMap.remove(key, bucketEntries[i])) { blockEvicted(key, bucketEntries[i], false); } - } catch (IOException e) { - LOG.warn("failed to free space for " + key, e); } finally { - if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); - } + lock.writeLock().unlock(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java new file mode 100644 index 0000000..7dc6fbf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Allows multiple concurrent clients to lock on a numeric id with ReentrantReadWriteLock. The + * intended usage for read lock is as follows: + * + *
+ * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
+ * try {
+ *   lock.readLock().lock();
+ *   // User code.
+ * } finally {
+ *   lock.readLock().unlock();
+ * }
+ * 
+ * + * For write lock, use lock.writeLock() + */ +@InterfaceAudience.Private +public class IdReadWriteLock { + // The number of lock we want to easily support. It's not a maximum. + private static final int NB_CONCURRENT_LOCKS = 1000; + // The pool to get entry from, entries are mapped by weak reference to make it able to be + // garbage-collected asap + private final WeakObjectPool lockPool = + new WeakObjectPool( + new WeakObjectPool.ObjectFactory() { + @Override + public ReentrantReadWriteLock createObject(Long id) { + return new ReentrantReadWriteLock(); + } + }, NB_CONCURRENT_LOCKS); + + /** + * Get the ReentrantReadWriteLock corresponding to the given id + * @param id an arbitrary number to identify the lock + */ + public ReentrantReadWriteLock getLock(long id) { + lockPool.purge(); + ReentrantReadWriteLock readWriteLock = lockPool.get(id); + return readWriteLock; + } + + /** For testing */ + @VisibleForTesting + int purgeAndGetEntryPoolSize() { + System.gc(); + Threads.sleep(200); + lockPool.purge(); + return lockPool.size(); + } + + @VisibleForTesting + public void waitForWaiters(long id, int numWaiters) throws InterruptedException { + for (ReentrantReadWriteLock readWriteLock;;) { + readWriteLock = lockPool.get(id); + if (readWriteLock != null) { + synchronized (readWriteLock) { + if (readWriteLock.getQueueLength() >= numWaiters) { + return; + } + } + } + Thread.sleep(50); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index b0a2ba2..5ca5f6c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -185,7 +185,11 @@ public class CacheTestUtils { public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested .getBlock(key, false, false, true); - assertArrayEquals(buf, returned.buf); + if (returned != null) { + assertArrayEquals(buf, returned.buf); + } else { + Thread.sleep(10); + } totalQueries.incrementAndGet(); } }; @@ -194,6 +198,19 @@ public class CacheTestUtils { ctx.addThread(t); } + // add a thread to periodically evict and re-cache the block + final long blockEvictPeriod = 50; + TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + toBeTested.evictBlock(key); + toBeTested.cacheBlock(key, bac); + Thread.sleep(blockEvictPeriod); + } + }; + t.setDaemon(true); + ctx.addThread(t); + ctx.startThreads(); while (totalQueries.get() < numQueries && ctx.shouldRun()) { Thread.sleep(10); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index afcbd28..de17bc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.IdLock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -173,7 +173,7 @@ public class TestBucketCache { @Test public void testCacheMultiThreadedSingleKey() throws Exception { - CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES); + CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES); } @Test @@ -198,7 +198,8 @@ public class TestBucketCache { cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( new byte[10])); long lockId = cache.backingMap.get(cacheKey).offset(); - IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId); + ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); + lock.writeLock().lock(); Thread evictThread = new Thread("evict-block") { @Override @@ -212,7 +213,7 @@ public class TestBucketCache { cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable( new byte[10])); - cache.offsetLock.releaseLockEntry(lockEntry); + lock.writeLock().unlock(); evictThread.join(); assertEquals(1L, cache.getBlockCount()); assertTrue(cache.getCurrentSize() > 0L); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java new file mode 100644 index 0000000..d57941e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +// Medium as it creates 100 threads; seems better to run it isolated +public class TestIdReadWriteLock { + + private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class); + + private static final int NUM_IDS = 16; + private static final int NUM_THREADS = 128; + private static final int NUM_SECONDS = 15; + + private IdReadWriteLock idLock = new IdReadWriteLock(); + + private Map idOwner = new ConcurrentHashMap(); + + private class IdLockTestThread implements Callable { + + private String clientId; + + public IdLockTestThread(String clientId) { + this.clientId = clientId; + } + + @Override + public Boolean call() throws Exception { + Thread.currentThread().setName(clientId); + Random rand = new Random(); + long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(NUM_IDS); + boolean readLock = rand.nextBoolean(); + + ReentrantReadWriteLock readWriteLock = idLock.getLock(id); + Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock(); + try { + lock.lock(); + int sleepMs = 1 + rand.nextInt(4); + String owner = idOwner.get(id); + if (owner != null && LOG.isDebugEnabled()) { + LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " already taken by " + + owner + ", we are " + clientId); + } + + idOwner.put(id, clientId); + Thread.sleep(sleepMs); + idOwner.remove(id); + + } finally { + lock.unlock(); + if (LOG.isDebugEnabled()) { + LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of Id" + id + ", we are " + + clientId); + } + } + } + return true; + } + + } + + @Test(timeout = 60000) + public void testMultipleClients() throws Exception { + ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); + try { + ExecutorCompletionService ecs = + new ExecutorCompletionService(exec); + for (int i = 0; i < NUM_THREADS; ++i) + ecs.submit(new IdLockTestThread("client_" + i)); + for (int i = 0; i < NUM_THREADS; ++i) { + Future result = ecs.take(); + assertTrue(result.get()); + } + // make sure the entry pool will be cleared after GC and purge call + int entryPoolSize = idLock.purgeAndGetEntryPoolSize(); + LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize); + assertEquals(0, entryPoolSize); + } finally { + exec.shutdown(); + exec.awaitTermination(5000, TimeUnit.MILLISECONDS); + } + } + + +} +