Index: src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java (revision 0) @@ -0,0 +1,42 @@ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; + +/** + * This class encapsulates a byte array and overrides hashCode and equals so + * that it's identity is based on the data rather than the array instance. + */ +public class HashedBytes { + + private final byte[] bytes; + private final int hashCode; + + public HashedBytes(byte[] bytes) { + this.bytes = bytes; + hashCode = Bytes.hashCode(bytes); + } + + public byte[] getBytes() { + return bytes; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null || getClass() != obj.getClass()) + return false; + HashedBytes other = (HashedBytes) obj; + return Arrays.equals(bytes, other.bytes); + } + + @Override + public String toString() { + return Bytes.toStringBinary(bytes); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1104561) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -38,11 +38,12 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Random; -import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; @@ -159,11 +161,11 @@ // Members ////////////////////////////////////////////////////////////////////////////// - private final Set lockedRows = - new TreeSet(Bytes.BYTES_COMPARATOR); - private final Map lockIds = - new HashMap(); - private int lockIdGenerator = 1; + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); + private final ConcurrentHashMap lockIds = + new ConcurrentHashMap(); + private final AtomicInteger lockIdGenerator = new AtomicInteger(1); static private Random rand = new Random(); protected final Map stores = @@ -2365,42 +2367,42 @@ * null if unavailable. */ private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) - throws IOException { + throws IOException { checkRow(row); startRegionOperation(); try { - synchronized (lockedRows) { - while (lockedRows.contains(row)) { + HashedBytes rowKey = new HashedBytes(row); + CountDownLatch rowLatch = new CountDownLatch(1); + + // loop until we acquire the row lock (unless !waitForLock) + while (true) { + CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); + if (existingLatch == null) { + break; + } else { + // row already locked if (!waitForLock) { return null; } try { - lockedRows.wait(); + existingLatch.await(); } catch (InterruptedException ie) { // Empty } } - // generate a new lockid. Attempt to insert the new [lockid, row]. - // if this lockid already exists in the map then revert and retry - // We could have first done a lockIds.get, and if it does not exist only - // then do a lockIds.put, but the hope is that the lockIds.put will - // mostly return null the first time itself because there won't be - // too many lockId collisions. - byte [] prev = null; - Integer lockId = null; - do { - lockId = new Integer(lockIdGenerator++); - prev = lockIds.put(lockId, row); - if (prev != null) { - lockIds.put(lockId, prev); // revert old value - lockIdGenerator = rand.nextInt(); // generate new start point - } - } while (prev != null); - - lockedRows.add(row); - lockedRows.notifyAll(); - return lockId; } + + // loop until we generate an unused lock id + while (true) { + Integer lockId = lockIdGenerator.incrementAndGet(); + HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); + if (existingRowKey == null) { + return lockId; + } else { + // lockId already in use, jump generator to a new spot + lockIdGenerator.set(rand.nextInt()); + } + } } finally { closeRegionOperation(); } @@ -2411,22 +2413,28 @@ * @param lockid * @return Row that goes with lockid */ - byte [] getRowFromLock(final Integer lockid) { - synchronized (lockedRows) { - return lockIds.get(lockid); - } + byte[] getRowFromLock(final Integer lockid) { + HashedBytes rowKey = lockIds.get(lockid); + return rowKey == null ? null : rowKey.getBytes(); } - + /** * Release the row lock! * @param lockid The lock ID to release. */ - public void releaseRowLock(final Integer lockid) { - synchronized (lockedRows) { - byte[] row = lockIds.remove(lockid); - lockedRows.remove(row); - lockedRows.notifyAll(); + public void releaseRowLock(final Integer lockId) { + HashedBytes rowKey = lockIds.remove(lockId); + if (rowKey == null) { + LOG.warn("Release unknown lockId: " + lockId); + return; } + CountDownLatch rowLatch = lockedRows.remove(rowKey); + if (rowLatch == null) { + LOG.error("Releases row not locked, lockId: " + lockId + " row: " + + rowKey); + return; + } + rowLatch.countDown(); } /** @@ -2434,13 +2442,8 @@ * @param lockid * @return boolean */ - boolean isRowLocked(final Integer lockid) { - synchronized (lockedRows) { - if (lockIds.get(lockid) != null) { - return true; - } - return false; - } + boolean isRowLocked(final Integer lockId) { + return lockIds.containsKey(lockId); } /**