diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index b9d0983..ad21251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -39,25 +40,59 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class IdLock { - /** An entry returned to the client as a lock object */ - public static class Entry { + public static class Entry extends AbstractQueuedSynchronizer { + private boolean deleted = false; private final long id; - private int numWaiters; - private boolean isLocked = true; private Entry(long id) { this.id = id; + setState(0); + } + + @Override + protected boolean tryAcquire(int acquires) { + assert acquires == 1; + if (compareAndSetState(0, 1)) { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; + } + + @Override + protected final boolean tryRelease(int releases) { + assert releases == 1; + if (getState() == 0) throw new IllegalMonitorStateException(); + setExclusiveOwnerThread(null); + setState(0); + return true; + } + + protected boolean isLocked() { + return getState() == 1; } + @Override + public boolean equals(Object obj) { + if (obj instanceof Entry) { + return id == ((Entry)obj).id; + } + return false; + } + + @Override + public int hashCode() { + return (int)(id ^ (id >>> 32)); + } + + @Override public String toString() { - return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" - + isLocked; + return "id=" + id + ", numWaiter=" + getQueueLength() + ", isLocked=" + isLocked(); } } - private ConcurrentMap map = - new ConcurrentHashMap(); + private final ConcurrentMap map = new ConcurrentHashMap(); /** * Blocks until the lock corresponding to the given id is acquired. @@ -68,32 +103,30 @@ public class IdLock { * @throws IOException if interrupted */ public Entry getLockEntry(long id) throws IOException { - Entry entry = new Entry(id); - Entry existing; - while ((existing = map.putIfAbsent(entry.id, entry)) != null) { - synchronized (existing) { - if (existing.isLocked) { - ++existing.numWaiters; // Add ourselves to waiters. - while (existing.isLocked) { - try { - existing.wait(); - } catch (InterruptedException e) { - --existing.numWaiters; // Remove ourselves from waiters. - throw new InterruptedIOException( + while (true) { + Entry existing; + Entry entry = new Entry(id); + if ((existing = map.putIfAbsent(entry, entry)) == null) { + existing = entry; + } + + try { + existing.acquireInterruptibly(1); + } catch (InterruptedException e) { + throw new InterruptedIOException( "Interrupted waiting to acquire sparse lock"); - } - } - - --existing.numWaiters; // Remove ourselves from waiters. - existing.isLocked = true; - return existing; - } - // If the entry is not locked, it might already be deleted from the - // map, so we cannot return it. We need to get our entry into the map - // or get someone else's locked entry. } + + // TODO: We should replace the map with a custom one to avoid + // the double "take a lock for the slot" and "wait for the semaphore". + // in that case we will not need a delete flag, + // and we can remove this while true loop. + if (!existing.deleted) { + return existing; + } + + existing.release(1); } - return entry; } /** @@ -104,14 +137,11 @@ public class IdLock { * @param entry the return value of {@link #getLockEntry(long)} */ public void releaseLockEntry(Entry entry) { - synchronized (entry) { - entry.isLocked = false; - if (entry.numWaiters > 0) { - entry.notify(); - } else { - map.remove(entry.id); - } + if (!entry.hasQueuedThreads()) { + entry.deleted = true; + map.remove(entry); } + entry.release(1); } /** For testing */