diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index 7890eb0..2853f31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -22,101 +22,220 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.classification.InterfaceAudience; /** - * Allows multiple concurrent clients to lock on a numeric id with a minimal - * memory overhead. The intended usage is as follows: + * Allows multiple concurrent clients to lock on a numeric or non-null object id, + * with low contention except for waiting for a lock. + * The intended usage is as follows: * - *
- * IdLock.Entry lockEntry = idLock.getLockEntry(id);
- * try {
- *   // User code.
- * } finally {
- *   idLock.releaseLockEntry(lockEntry);
- * }
+ *
     IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ *     try {
+ *         // User code.
+ *     } finally {
+ *         lockEntry.release();
+ *     }
+ * 
*/ @InterfaceAudience.Private public class IdLock { + /** + * The count starts at 1, and once the count turns down to 0, + * this instance is regarded as disabled and never becomes enabled. + * This irreversibility is expected to use to keep an unused object from reuse, + * and remove the unused object from a collection without blocking threads. + */ + private static class ParticipationCount { + final AtomicLong countRef = new AtomicLong(1); + + boolean incrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count + 1; + + if (nextCount == 0) { + throw new Error("Maximum participation count exceeded"); + } + + if (countRef.compareAndSet(count, nextCount)) { + return true; + } + } + } + + boolean decrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count - 1; + + if (countRef.compareAndSet(count, nextCount)) { + return nextCount != 0; + } + } + } + } + + /** + * Increment the participation count before participating in the lock, + * and decrement the count after leaving the lock. + * In short, the {@code participationCount} is a sort of reservation count for the {@code lock}, + * and if once the {@code participationCount} is disabled + * the {@code lock} is ensured not to be used anymore. + */ + private static class ExclusiveControl { + final ParticipationCount participationCount = new ParticipationCount(); + final ReentrantLock lock = new ReentrantLock(); +} /** An entry returned to the client as a lock object */ - public static class Entry { - private final long id; - private int numWaiters; - private boolean isLocked = true; + public class Entry { + private final Object id; + private final ExclusiveControl control; + private final AtomicBoolean released = new AtomicBoolean(); - private Entry(long id) { + private Entry(Object id, ExclusiveControl control) { this.id = id; + this.control = control; } + /** + * Releases the corresponding lock. + * It is required to call with the same thread which you get this instance with. + * In practice it is appropriate to call this method in a finally block. + */ + public void release() { + if (control.lock.isHeldByCurrentThread() && released.compareAndSet(false, true)) { + control.lock.unlock(); + returnControl(id, control); + } + } + + @Override public String toString() { - return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" - + isLocked; + return "Entry[id=" + id + ", released=" + released + "]"; + } + } + + private final ConcurrentMap controlMap = + new ConcurrentHashMap(); + + private ExclusiveControl prepareControl(Object id) { + // Calling ConcurrentHashMap.get() before putIfAbsent() is an idiom for performance; + // get() generally doesn't block in contrast to putIfAbsent(). + + ExclusiveControl control = controlMap.get(id); + if (control != null) { + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + controlMap.remove(id, control); + } + + final ExclusiveControl newControl = new ExclusiveControl(); + + while (true) { + control = controlMap.putIfAbsent(id, newControl); + if (control == null) { + return newControl; + } + + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + controlMap.remove(id, control); } } - private ConcurrentMap map = - new ConcurrentHashMap(); + private void returnControl(Object id, ExclusiveControl control) { + if (! control.participationCount.decrementAndIsEnabled()) { + controlMap.remove(id, control); + } + } /** - * Blocks until the lock corresponding to the given id is acquired. + * Blocks until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}; + * This is equivalent to the other overloaded method {@link #getLockEntry(Object)} + * with the wrapper object of the given {@code id}, + * which object is an instance of {@code Long}. + *

+ * Pay attention to which locks are same or not if you also call {@code getLockEntry(Object)}. + * Also calling with a {@code byte}, {@code char}, {@code short} or {@code int} parameter + * is usually bound to this method rather than {@code getLockEntry(Object)} with auto-boxing, + * and ids whose values are equal are regarded as equal, + * regardless of types of their actual parameters. * * @param id an arbitrary number to lock on - * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release - * the lock + * @return an "entry" to release the lock by {@link Entry#release()} * @throws IOException if interrupted + * @see #getLockEntry(Object) */ public Entry getLockEntry(long id) throws IOException { - Entry entry = new Entry(id); - Entry existing; - while ((existing = map.putIfAbsent(entry.id, entry)) != null) { - synchronized (existing) { - if (existing.isLocked) { - ++existing.numWaiters; // Add ourselves to waiters. - while (existing.isLocked) { - try { - existing.wait(); - } catch (InterruptedException e) { - --existing.numWaiters; // Remove ourselves from waiters. - throw new InterruptedIOException( - "Interrupted waiting to acquire sparse lock"); - } - } - - --existing.numWaiters; // Remove ourselves from waiters. - existing.isLocked = true; - return existing; - } - // If the entry is not locked, it might already be deleted from the - // map, so we cannot return it. We need to get our entry into the map - // or get someone else's locked entry. - } - } - return entry; + return getLockEntry(Long.valueOf(id)); } /** - * Must be called in a finally block to decrease the internal counter and - * remove the monitor object for the given id if the caller is the last - * client. + * Blocks until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}. * - * @param entry the return value of {@link #getLockEntry(long)} + * @param id an arbitrary non-null object to lock on + * @return an "entry" to release the lock by {@link Entry#release()} + * @throws IOException if interrupted + * @throws NullPointerException if {@code id} is null + * @see #getLockEntry(long) */ - public void releaseLockEntry(Entry entry) { - synchronized (entry) { - entry.isLocked = false; - if (entry.numWaiters > 0) { - entry.notify(); - } else { - map.remove(entry.id); - } + public Entry getLockEntry(Object id) throws IOException { + if(id == null) { + throw new NullPointerException(); + } + + ExclusiveControl control = prepareControl(id); + + try { + control.lock.lockInterruptibly(); + + } catch (InterruptedException e) { + returnControl(id, control); + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); } + + return new Entry(id, control); } - /** For testing */ - void assertMapEmpty() { - assert map.size() == 0; + /** + * Releases the corresponding lock. + * It is required to call with the same thread which you get the {@code entry} with. + * In practice it is appropriate to call this method in a finally block. + * + * @param entry the return value of {@link #getLockEntry(long)} or {@link #getLockEntry(Object)} + * @deprecated Replaced by {@code entry.release()}. + */ + @Deprecated + public void releaseLockEntry(Entry entry) { + entry.release(); } + /** + * For testing; According to the implementation of {@link ConcurrentHashMap#isEmpty()}, + * this method is expected to return true + * only if the map keeps empty while invoking this method. + */ + boolean isEmpty() { + return controlMap.isEmpty(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java index bbf4bba..fb72019 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -19,95 +19,308 @@ package org.apache.hadoop.hbase.util; -import java.util.Map; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static org.junit.Assert.*; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.MediumTests; import org.junit.Test; import org.junit.experimental.categories.Category; @Category(MediumTests.class) -// Medium as it creates 100 threads; seems better to run it isolated +// Medium as it creates many threads; seems better to run it isolated public class TestIdLock { + @Test(timeout=10000) + public void testBlockForEqualIds() throws Exception { + final IdLock idLock = new IdLock(); - private static final Log LOG = LogFactory.getLog(TestIdLock.class); + IdLock.Entry entry = idLock.getLockEntry(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(0); + try { + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } - private static final int NUM_IDS = 16; - private static final int NUM_THREADS = 128; - private static final int NUM_SECONDS = 15; + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); - private IdLock idLock = new IdLock(); + Thread.sleep(100); + assertFalse(unlocked.get()); - private Map idOwner = new ConcurrentHashMap(); + entry.release(); + Thread.sleep(100); + assertTrue(unlocked.get()); - private class IdLockTestThread implements Callable { + futureTask.get(); + assertTrue(idLock.isEmpty()); + } - private String clientId; + @Test(timeout=10000) + public void testNotBlockForDifferentIds() throws Exception { + final IdLock idLock = new IdLock(); - public IdLockTestThread(String clientId) { - this.clientId = clientId; - } + IdLock.Entry entry = idLock.getLockEntry(0); - @Override - public Boolean call() throws Exception { - Thread.currentThread().setName(clientId); - Random rand = new Random(); - long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000; - while (System.currentTimeMillis() < endTime) { - long id = rand.nextInt(NUM_IDS); + final AtomicBoolean unlocked = new AtomicBoolean(); - IdLock.Entry lockEntry = idLock.getLockEntry(id); + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(1); try { - int sleepMs = 1 + rand.nextInt(4); - String owner = idOwner.get(id); - if (owner != null) { - LOG.error("Id " + id + " already taken by " + owner + ", " - + clientId + " failed"); - return false; - } + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); + + Thread.sleep(100); + assertTrue(unlocked.get()); + + entry.release(); + Thread.sleep(100); + + futureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testUnblockOneByOne() throws Exception { + final int threadCount = 10; - idOwner.put(id, clientId); - Thread.sleep(sleepMs); - idOwner.remove(id); + final IdLock idLock = new IdLock(); + IdLock.Entry entry = idLock.getLockEntry(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(0); + try { + assertFalse(unlocked.get()); + unlocked.set(true); + Thread.sleep(100); + unlocked.set(false); } finally { - idLock.releaseLockEntry(lockEntry); + entry.release(); } + return null; } - return true; } + List> futureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + futureTasks.add(new FutureTask(new LockAndReleaseTask())); + } + + for (FutureTask task : futureTasks) { + new Thread(task).start(); + } + + entry.release(); + + for (FutureTask task : futureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); } - @Test - public void testMultipleClients() throws Exception { - ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); - try { - ExecutorCompletionService ecs = - new ExecutorCompletionService(exec); - for (int i = 0; i < NUM_THREADS; ++i) - ecs.submit(new IdLockTestThread("client_" + i)); - for (int i = 0; i < NUM_THREADS; ++i) { - Future result = ecs.take(); - assertTrue(result.get()); + @Test(timeout=10000) + public void testCompetitionBetweenReleaseAndLock() throws Exception { + final int loopCount = 10000; + + final IdLock idLock = new IdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(2); + final CyclicBarrier nextBarrier = new CyclicBarrier(2); + + class ReleaseTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + IdLock.Entry entry = idLock.getLockEntry(0); + readyBarrier.await(); + entry.release(); + nextBarrier.await(); + } + return null; } - idLock.assertMapEmpty(); - } finally { - exec.shutdown(); } + + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + IdLock.Entry entry = idLock.getLockEntry(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + FutureTask releaseFutureTask = new FutureTask(new ReleaseTask()); + new Thread(releaseFutureTask).start(); + + FutureTask blockFutureTask = new FutureTask(new BlockTask()); + new Thread(blockFutureTask).start(); + + releaseFutureTask.get(); + blockFutureTask.get(); + assertTrue(idLock.isEmpty()); } + @Test(timeout=10000) + public void testCompetitionBetweenLocks() throws Exception { + final int loopCount = 10000; + final int threadCount = 10; -} + final IdLock idLock = new IdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(threadCount); + final CyclicBarrier nextBarrier = new CyclicBarrier(threadCount); + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + IdLock.Entry entry = idLock.getLockEntry(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + List> blockFutureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + blockFutureTasks.add(new FutureTask(new BlockTask())); + } + + for (FutureTask task : blockFutureTasks) { + new Thread(task).start(); + } + + for (FutureTask task : blockFutureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); + } + + private static class IdLockIntensiveTestTaskProvider { + final IdLock idLock; + final int idCount; + final long executionIntervalMillis; + + final ConcurrentMap idOwner = new ConcurrentHashMap(); + final Random rand = new Random(); + + IdLockIntensiveTestTaskProvider(IdLock idLock, int idCount, long executionIntervalMillis) { + this.idLock = idLock; + this.idCount = idCount; + this.executionIntervalMillis = executionIntervalMillis; + } + + int nextTaskId; + + Callable nextTask() { + return new IdLockIntensiveTestTask("task_" + nextTaskId++); + } + + class IdLockIntensiveTestTask implements Callable { + final String taskName; + + IdLockIntensiveTestTask(String taskName) { + this.taskName = taskName; + } + + @Override + public String call() throws Exception { + long endTime = System.currentTimeMillis() + executionIntervalMillis; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(idCount); + + IdLock.Entry lockEntry = idLock.getLockEntry(id); + try { + String existingTaskName = idOwner.putIfAbsent(id, taskName); + if (existingTaskName != null) { + return "Id " + id + " already taken by " + existingTaskName + ", " + + taskName + " failed"; + } + + int sleepMs = 1 + rand.nextInt(4); + Thread.sleep(sleepMs); + idOwner.remove(id, taskName); + + } finally { + lockEntry.release(); + } + } + return null; + } + } + } + + @Test(timeout=60000) + public void testIntensively() throws Exception { + final int idCount = 16; + final int executionIntervalMillis = 15 * 1000; + final int threadCount = 128; + + IdLock idLock = new IdLock(); + + IdLockIntensiveTestTaskProvider taskProvider = + new IdLockIntensiveTestTaskProvider(idLock, idCount, executionIntervalMillis); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + try { + ExecutorCompletionService completionService = + new ExecutorCompletionService(service); + + for (int i = 0; i < threadCount; i++) { + completionService.submit(taskProvider.nextTask()); + } + + for (int i = 0; i < threadCount; i++) { + Future result = completionService.take(); + assertNull(result.get()); + } + + } finally { + service.shutdown(); + } + + assertTrue(idLock.isEmpty()); + } +}