Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 946565) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -981,6 +981,104 @@ LOG.info("locks completed."); } + public void testTryLocks() throws IOException{ + byte [] tableName = Bytes.toBytes("testtable"); + byte [][] families = {fam1, fam2, fam3}; + final byte [] qual1 = Bytes.toBytes("qualifier1"); + + HBaseConfiguration hc = initSplit(); + //Setting up region + String method = this.getName(); + initHRegion(tableName, method, hc, families); + + final int threadCount = 10; + final int lockCount = 10; + + //Initialize counters + for(int i = 0; i < lockCount; i++) { + Put put = new Put(Bytes.toBytes(Integer.toString(i))); + put.add(fam1, qual1, Bytes.toBytes(0)); + region.put(put); + } + + Listthreads = new ArrayList(threadCount); + for (int i = 0; i < threadCount; i++) { + threads.add(new Thread(Integer.toString(i)) { + @Override + public void run() { + int tid = Integer.valueOf(getName()); + Integer [] lockids = new Integer[lockCount]; + // Get locks. + for (int i = 0; i < lockCount; i++) { + try { + byte [] rowid = Bytes.toBytes(Integer.toString(i)); + if (((tid + i) & 1) != 0) { + lockids[i] = region.obtainRowLock(rowid); + } else { + do { + lockids[i] = region.obtainRowLock(rowid, false); + if (lockids[i] == null) + try { + Thread.sleep(1); + } catch (InterruptedException ie) { } + } while (lockids[i] == null); + } + + //Perform an increment-by-one to test that locks are actually + //serializing accesses. + Get g = new Get(rowid); + Result r = region.get(g, lockids[i]); + Put put = new Put(rowid); + int incVal = Bytes.toInt(r.getValue(fam1, qual1)) + 1; + put.add(fam1, qual1, Bytes.toBytes(incVal)); + region.put(put, lockids[i]); + + assertEquals(rowid, region.getRowFromLock(lockids[i])); + LOG.debug(getName() + " locked " + Bytes.toString(rowid)); + } catch (IOException e) { + e.printStackTrace(); + } + } + LOG.debug(getName() + " set " + + Integer.toString(lockCount) + " locks"); + + // Abort outstanding locks. + for (int i = lockCount - 1; i >= 0; i--) { + region.releaseRowLock(lockids[i]); + LOG.debug(getName() + " unlocked " + i); + } + LOG.debug(getName() + " released " + + Integer.toString(lockCount) + " locks"); + } + }); + } + + // Startup all our threads. + for (Thread t : threads) { + t.start(); + } + + // Now wait around till all are done. + for (Thread t: threads) { + while (t.isAlive()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // Go around again. + } + } + } + + /* Verify all counters have expected values */ + for (int i = 0; i < lockCount; i++) { + byte [] rowid = Bytes.toBytes(Integer.toString(i)); + Get get = new Get(rowid); + Result r = region.get(get, null); + assertEquals(Bytes.toInt(r.getValue(fam1, qual1)), threadCount); + } + LOG.info("try locks completed."); + } + ////////////////////////////////////////////////////////////////////////////// // Merge test ////////////////////////////////////////////////////////////////////////////// Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1915,7 +1915,7 @@ return -1; } - public long lockRow(byte [] regionName, byte [] row) + private long lockRow(byte [] regionName, byte [] row, boolean isBlocking) throws IOException { checkOpen(); NullPointerException npe = null; @@ -1932,7 +1932,9 @@ requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - Integer r = region.obtainRowLock(row); + Integer r = region.obtainRowLock(row, isBlocking); + if (r == null) + return -1; long lockId = addRowLock(r,region); LOG.debug("Row lock " + lockId + " explicitly acquired by client"); return lockId; @@ -1942,6 +1944,16 @@ } } + public long lockRow(byte [] regionName, byte [] row) + throws IOException { + return lockRow(regionName, row, true); + } + + public long tryLockRow(byte [] regionName, byte [] row) + throws IOException { + return lockRow(regionName, row, false); + } + protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException { long lockId = -1L; lockId = rand.nextLong(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1775,10 +1775,12 @@ * which maybe we'll do in the future. * * @param row Name of row to lock. + * @param isBlocking true if lock acquisition may block * @throws IOException - * @return The id of the held lock. + * @return The id of the held lock (or null if nonblocking lock acquire fails) */ - public Integer obtainRowLock(final byte [] row) throws IOException { + public Integer obtainRowLock(final byte [] row, final boolean isBlocking) + throws IOException { checkRow(row); splitsAndClosesLock.readLock().lock(); try { @@ -1787,6 +1789,8 @@ } synchronized (lockedRows) { while (lockedRows.contains(row)) { + if (!isBlocking) + return null; try { lockedRows.wait(); } catch (InterruptedException ie) { @@ -1819,6 +1823,10 @@ } } + public Integer obtainRowLock(final byte [] row) throws IOException { + return obtainRowLock(row, true); + } + /** * Used by unit tests. * @param lockid Index: src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (working copy) @@ -572,6 +572,10 @@ public RowLock lockRow(byte[] row) throws IOException { throw new IOException("lockRow not implemented"); } + + public RowLock tryLockRow(byte[] row) throws IOException { + throw new IOException("lockRow not implemented"); + } public void unlockRow(RowLock rl) throws IOException { throw new IOException("unlockRow not implemented"); Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -214,7 +214,7 @@ public void close(long scannerId) throws IOException; /** - * Opens a remote row lock. + * Opens a remote row lock, blocking. * * @param regionName name of region * @param row row to lock @@ -224,7 +224,20 @@ public long lockRow(final byte [] regionName, final byte [] row) throws IOException; + /** + * Opens a remote row lock, non-blocking. + * + * @param regionName name of region + * @param row row to lock + * @return lockId lock identifier or -1 if lock could not be acquired + * @throws IOException e + */ + public long tryLockRow(final byte [] regionName, final byte [] row) + throws IOException; + + + /** * Releases a remote row lock. * * @param regionName region name Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -259,7 +259,7 @@ void close() throws IOException; /** - * Obtains a lock on a row. + * Obtains a lock on a row, blocks if row is currently locked. * * @param row The row to lock. * @return A {@link RowLock} containing the row and lock id. @@ -270,6 +270,19 @@ RowLock lockRow(byte[] row) throws IOException; /** + * Try to obtain a lock on a row (does not block) + * + * @param row The row to lock. + * @return A {@link RowLock} containing the row and lock id. If the + * row is currently locked, returns null immediately and does not block. + * @throws IOException if a remote or network exception occurs. + * @see RowLock + * @see #lockRow + * @see #unlockRow + */ + RowLock tryLockRow(byte[] row) throws IOException; + + /** * Releases a row lock. * * @param rl The row lock to release. Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 946565) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -581,6 +581,20 @@ ); } + public RowLock tryLockRow(final byte [] row) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public RowLock call() throws IOException { + long lockId = + server.tryLockRow(location.getRegionInfo().getRegionName(), row); + if (lockId == -1) return null; + return new RowLock(row,lockId); + } + } + ); + } + public void unlockRow(final RowLock rl) throws IOException { connection.getRegionServerWithRetries(