diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index 7890eb0..f75af46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -22,6 +22,10 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.classification.InterfaceAudience; @@ -34,30 +38,121 @@ import org.apache.hadoop.classification.InterfaceAudience; * try { * // User code. * } finally { - * idLock.releaseLockEntry(lockEntry); + * lockEntry.release(); * } */ @InterfaceAudience.Private public class IdLock { + private static class ParticipationCount { + /** + * Once the count turns down to 0, + * the participation count is regarded as disabled. + */ + final AtomicLong countRef = new AtomicLong(1); + + boolean incrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count + 1; + + if (nextCount == 0) { + throw new Error("Maximum participation count exceeded"); + } + + if (countRef.compareAndSet(count, nextCount)) { + return true; + } + } + } + + boolean decrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count - 1; + + if (countRef.compareAndSet(count, nextCount)) { + return nextCount != 0; + } + } + } + } + + private static class ExclusiveControl { + final ParticipationCount participationCount = new ParticipationCount(); + final Lock lock = new ReentrantLock(); +} /** An entry returned to the client as a lock object */ - public static class Entry { + public class Entry { private final long id; - private int numWaiters; - private boolean isLocked = true; + private final ExclusiveControl control; + private final AtomicBoolean released = new AtomicBoolean(); - private Entry(long id) { + private Entry(long id, ExclusiveControl control) { this.id = id; + this.control = control; + } + + /** + * Must be called in a finally block to decrease the internal counter and + * remove the monitor object for the id if the caller is the last client. + */ + public void release() { + if (released.compareAndSet(false, true)) { + control.lock.unlock(); + returnControl(id, control); + } } + @Override public String toString() { - return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" - + isLocked; + return "Entry[id=" + id + ", released=" + released + "]"; } } - private ConcurrentMap map = - new ConcurrentHashMap(); + private final ConcurrentMap map = + new ConcurrentHashMap(); + + private ExclusiveControl prepareControl(long id) { + // Calling get() before putIfAbsent() is a mere trick for performance. + ExclusiveControl control = map.get(id); + if (control != null) { + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + map.remove(id, control); + } + + final ExclusiveControl newControl = new ExclusiveControl(); + + while (true) { + control = map.putIfAbsent(id, newControl); + if (control == null) { + return newControl; + } + + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + map.remove(id, control); + } + } + + private void returnControl(long id, ExclusiveControl control) { + if (! control.participationCount.decrementAndIsEnabled()) { + map.remove(id, control); + } + } /** * Blocks until the lock corresponding to the given id is acquired. @@ -68,32 +163,18 @@ public class IdLock { * @throws IOException if interrupted */ public Entry getLockEntry(long id) throws IOException { - Entry entry = new Entry(id); - Entry existing; - while ((existing = map.putIfAbsent(entry.id, entry)) != null) { - synchronized (existing) { - if (existing.isLocked) { - ++existing.numWaiters; // Add ourselves to waiters. - while (existing.isLocked) { - try { - existing.wait(); - } catch (InterruptedException e) { - --existing.numWaiters; // Remove ourselves from waiters. - throw new InterruptedIOException( - "Interrupted waiting to acquire sparse lock"); - } - } - - --existing.numWaiters; // Remove ourselves from waiters. - existing.isLocked = true; - return existing; - } - // If the entry is not locked, it might already be deleted from the - // map, so we cannot return it. We need to get our entry into the map - // or get someone else's locked entry. - } + ExclusiveControl control = prepareControl(id); + + try { + control.lock.lockInterruptibly(); + + } catch (InterruptedException e) { + returnControl(id, control); + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); } - return entry; + + return new Entry(id, control); } /** @@ -102,21 +183,19 @@ public class IdLock { * client. * * @param entry the return value of {@link #getLockEntry(long)} + * @deprecated Replaced by {@code entry.release()}. */ + @Deprecated public void releaseLockEntry(Entry entry) { - synchronized (entry) { - entry.isLocked = false; - if (entry.numWaiters > 0) { - entry.notify(); - } else { - map.remove(entry.id); - } - } + entry.release(); } - /** For testing */ - void assertMapEmpty() { - assert map.size() == 0; + /** + * For testing; According to the implementation of {@link ConcurrentHashMap#isEmpty()}, + * this method is expected to return true + * only if the map keeps empty while invoking this method. + */ + boolean isEmpty() { + return map.isEmpty(); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java index bbf4bba..4631f41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -82,7 +82,7 @@ public class TestIdLock { idOwner.remove(id); } finally { - idLock.releaseLockEntry(lockEntry); + lockEntry.release(); } } return true; @@ -102,7 +102,7 @@ public class TestIdLock { Future result = ecs.take(); assertTrue(result.get()); } - idLock.assertMapEmpty(); + assertTrue(idLock.isEmpty()); } finally { exec.shutdown(); }