diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java index 5398582..a7222fa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java @@ -18,49 +18,47 @@ package org.apache.hadoop.hbase.util; - -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * A utility class to manage a set of locks. Each lock is identified by a String which serves - * as a key. Typical usage is:

- * class Example{ - * private final static KeyLocker<String> locker = new Locker<String>(); - *

- *

- * public void foo(String s){ - * Lock lock = locker.acquireLock(s); - * try { - * // whatever - * }finally{ - * lock.unlock(); - * } - * } + * as a key. Typical usage is:

+ * class Example {
+ *   private final static KeyLocker<String> locker = new Locker<String>();
+ *   public void foo(String s){
+ *     Lock lock = locker.acquireLock(s);
+ *     try {
+ *       // whatever
+ *     }finally{
+ *       lock.unlock();
+ *     }
+ *   }
  * }
- * 

+ *
*/ @InterfaceAudience.Private public class KeyLocker> { - private static final Log LOG = LogFactory.getLog(KeyLocker.class); - // The number of lock we want to easily support. It's not a maximum. private static final int NB_CONCURRENT_LOCKS = 1000; - // We need an atomic counter to manage the number of users using the lock and free it when - // it's equal to zero. - private final Map, AtomicInteger>> locks = - new HashMap, AtomicInteger>>(NB_CONCURRENT_LOCKS); + private final WeakObjectPool lockPool = + new WeakObjectPool( + new WeakObjectPool.ObjectFactory() { + @Override + public ReentrantLock createObject(K key) { + return new ReentrantLock(); + } + }, + NB_CONCURRENT_LOCKS); /** * Return a lock for the given key. The lock is already locked. @@ -70,67 +68,31 @@ public class KeyLocker> { public ReentrantLock acquireLock(K key) { if (key == null) throw new IllegalArgumentException("key must not be null"); - Pair, AtomicInteger> lock; - synchronized (this) { - lock = locks.get(key); - if (lock == null) { - lock = new Pair, AtomicInteger>( - new KeyLock(this, key), new AtomicInteger(1)); - locks.put(key, lock); - } else { - lock.getSecond().incrementAndGet(); - } - } - lock.getFirst().lock(); - return lock.getFirst(); + lockPool.purge(); + ReentrantLock lock = lockPool.get(key); + + lock.lock(); + return lock; } /** * Acquire locks for a set of keys. The keys will be * sorted internally to avoid possible deadlock. */ - public Map acquireLocks(final Set keys) { - Map locks = new HashMap(keys.size()); - SortedSet sortedKeys = new TreeSet(keys); - for (K key : sortedKeys) { - locks.put(key, acquireLock(key)); - } - return locks; - } + public Map acquireLocks(Set keys) { + List keyList = new ArrayList(keys); + Collections.sort(keyList); - /** - * Free the lock for the given key. - */ - private synchronized void releaseLock(K key) { - Pair, AtomicInteger> lock = locks.get(key); - if (lock != null) { - if (lock.getSecond().decrementAndGet() == 0) { - locks.remove(key); - } - } else { - String message = "Can't release the lock for " + key+", this key is not in the key list." + - " known keys are: "+ locks.keySet(); - LOG.error(message); - throw new RuntimeException(message); - } - } - - static class KeyLock> extends ReentrantLock { - private static final long serialVersionUID = -12432857283423584L; - - private final transient KeyLocker locker; - private final K lockId; - - private KeyLock(KeyLocker locker, K lockId) { - super(); - this.locker = locker; - this.lockId = lockId; + lockPool.purge(); + Map locks = new LinkedHashMap(keyList.size()); + for (K key : keyList) { + ReentrantLock lock = lockPool.get(key); + locks.put(key, lock); } - @Override - public void unlock() { - super.unlock(); - locker.releaseLock(lockId); + for (Lock lock : locks.values()) { + lock.lock(); } + return locks; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java new file mode 100644 index 0000000..9039d26 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * {@code WeakReference} based shared object pool. + * The objects are kept in weak references and + * associated with keys which are identified by the {@code equals} method. + * The objects are created by {@link ObjectFactory} on demand. + * The object creation is expected to be lightweight, + * and the objects may be excessively created and discarded. + * Thread safe. + */ +@InterfaceAudience.Private +public class WeakObjectPool { + public interface ObjectFactory { + V createObject(K key); + } + + private final ReferenceQueue staleRefQueue = new ReferenceQueue(); + + private class ObjectReference extends WeakReference { + final K key; + + ObjectReference(K key, V obj) { + super(obj, staleRefQueue); + this.key = key; + } + } + + private final ObjectFactory objectFactory; + + /** Does not permit null keys. */ + private final ConcurrentMap referenceCache; + + /** + * The default initial capacity, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default concurrency level, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * Creates a new pool with the default initial capacity (16) + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * + * @throws NullPointerException if {@code objectFactory} is null + */ + public WeakObjectPool(ObjectFactory objectFactory) { + this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative + */ + public WeakObjectPool(ObjectFactory objectFactory, int initialCapacity) { + this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the given concurrency level. + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * @param concurrencyLevel the estimated count of concurrently accessing threads + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative or + * {@code concurrencyLevel} is non-positive + */ + public WeakObjectPool( + ObjectFactory objectFactory, + int initialCapacity, + int concurrencyLevel) { + + if (objectFactory == null) { + throw new NullPointerException(); + } + this.objectFactory = objectFactory; + + this.referenceCache = new ConcurrentHashMap( + initialCapacity, 0.75f, concurrencyLevel); + // 0.75f is the default load factor threshold of ConcurrentHashMap. + } + + /** + * Removes stale references of shared objects from the pool. + * References newly becoming stale may still remain. + * The implementation of this method is expected to be lightweight + * when there is no stale reference. + */ + public void purge() { + // This method is lightweight while there is no stale reference + // with the Oracle (Sun) implementation of {@code ReferenceQueue}, + // because {@code ReferenceQueue.poll} just checks a volatile instance + // variable in {@code ReferenceQueue}. + + while (true) { + @SuppressWarnings("unchecked") + ObjectReference ref = (ObjectReference)staleRefQueue.poll(); + if (ref == null) { + break; + } + referenceCache.remove(ref.key, ref); + } + } + + /** + * Returns a shared object associated with the given {@code key}, + * which is identified by the {@code equals} method. + * @throws NullPointerException if {@code key} is null + */ + public V get(K key) { + ObjectReference ref = referenceCache.get(key); + if (ref != null) { + V obj = ref.get(); + if (obj != null) { + return obj; + } + referenceCache.remove(key, ref); + } + + V newObj = objectFactory.createObject(key); + ObjectReference newRef = new ObjectReference(key, newObj); + while (true) { + ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef); + if (existingRef == null) { + return newObj; + } + + V existingObject = existingRef.get(); + if (existingObject != null) { + return existingObject; + } + referenceCache.remove(key, existingRef); + } + } + + /** + * Returns an estimated count of objects kept in the pool. + * This also counts stale references, + * and you might want to call {@link #purge()} beforehand. + */ + public int size() { + return referenceCache.size(); + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java index 9bb8a04..40b918c 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java @@ -30,7 +30,7 @@ import org.junit.experimental.categories.Category; public class TestKeyLocker { @Test public void testLocker(){ - KeyLocker locker = new KeyLocker(); + KeyLocker locker = new KeyLocker(); ReentrantLock lock1 = locker.acquireLock("l1"); Assert.assertTrue(lock1.isHeldByCurrentThread()); @@ -51,9 +51,19 @@ public class TestKeyLocker { lock2.unlock(); Assert.assertFalse(lock20.isHeldByCurrentThread()); - // The lock object was freed once useless, so we're recreating a new one + // The lock object will be garbage-collected + // if you free its reference for a long time, + // and you will get a new one at the next time. + int lock2Hash = System.identityHashCode(lock2); + lock2 = null; + lock20 = null; + + System.gc(); + System.gc(); + System.gc(); + ReentrantLock lock200 = locker.acquireLock("l2"); - Assert.assertTrue(lock2 != lock200); + Assert.assertNotEquals(lock2Hash, System.identityHashCode(lock200)); lock200.unlock(); Assert.assertFalse(lock200.isHeldByCurrentThread()); diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java new file mode 100644 index 0000000..bf1b4eb --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MiscTests.class, SmallTests.class}) +public class TestWeakObjectPool { + WeakObjectPool pool; + + @Before + public void setUp() { + pool = new WeakObjectPool( + new WeakObjectPool.ObjectFactory() { + @Override + public Object createObject(String key) { + return new Object(); + } + }); + } + + @Test + public void testKeys() { + Object obj1 = pool.get("a"); + Object obj2 = pool.get(new String("a")); + + Assert.assertSame(obj1, obj2); + + Object obj3 = pool.get("b"); + + Assert.assertNotSame(obj1, obj3); + } + + @Test + public void testWeakReference() throws Exception { + Object obj1 = pool.get("a"); + int hash1 = System.identityHashCode(obj1); + + System.gc(); + System.gc(); + System.gc(); + + Thread.sleep(10); + // Sleep a while because references newly becoming stale + // may still remain when calling the {@code purge} method. + pool.purge(); + Assert.assertEquals(1, pool.size()); + + Object obj2 = pool.get("a"); + Assert.assertSame(obj1, obj2); + + obj1 = null; + obj2 = null; + + System.gc(); + System.gc(); + System.gc(); + + Thread.sleep(10); + pool.purge(); + Assert.assertEquals(0, pool.size()); + + Object obj3 = pool.get("a"); + Assert.assertNotEquals(hash1, System.identityHashCode(obj3)); + } + + @Test(timeout=1000) + public void testCongestion() throws Exception { + final int THREAD_COUNT = 100; + + final AtomicBoolean assertionFailed = new AtomicBoolean(); + final AtomicReference expectedObjRef = new AtomicReference(); + final CountDownLatch prepareLatch = new CountDownLatch(THREAD_COUNT); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT); + + for (int i=0; i