Index: src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMerge.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -373,7 +373,7 @@ b.delete(COL_STARTCODE); b.delete(COL_SPLITA); b.delete(COL_SPLITB); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + regionsToDelete[r]); @@ -383,7 +383,7 @@ newInfo.setOffline(true); BatchUpdate b = new BatchUpdate(newRegion.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(newInfo)); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + newRegion.getRegionName()); } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.RegionHistorian; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchOperation; @@ -1048,7 +1049,7 @@ /** {@inheritDoc} */ public RowResult getRow(final byte [] regionName, final byte [] row, - final byte [][] columns, final long ts) + final byte [][] columns, final long ts, final long lockId) throws IOException { checkOpen(); requestCount.incrementAndGet(); @@ -1061,7 +1062,8 @@ } HRegion region = getRegion(regionName); - Map map = region.getFull(row, columnSet, ts); + Map map = region.getFull(row, columnSet, ts, + getLockFromId(lockId)); HbaseMapWritable result = new HbaseMapWritable(); result.putAll(map); @@ -1126,7 +1128,7 @@ } /** {@inheritDoc} */ - public void batchUpdate(final byte [] regionName, BatchUpdate b) + public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId) throws IOException { checkOpen(); this.requestCount.incrementAndGet(); @@ -1134,7 +1136,7 @@ validateValuesLength(b, region); try { cacheFlusher.reclaimMemcacheMemory(); - region.batchUpdate(b); + region.batchUpdate(b, getLockFromId(lockId)); } catch (OutOfMemoryError error) { abort(); LOG.fatal("Ran out of memory", error); @@ -1275,28 +1277,158 @@ /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp) + final byte [] column, final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, column, timestamp); + region.deleteAll(row, column, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final long timestamp) + final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, timestamp); + region.deleteAll(row, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) throws IOException{ - getRegion(regionName).deleteFamily(row, family, timestamp); + long timestamp, final long lockId) + throws IOException{ + getRegion(regionName).deleteFamily(row, family, timestamp, + getLockFromId(lockId)); } + /** {@inheritDoc} */ + public long lockRow(byte [] regionName, byte [] row) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(row == null) { + npe = new NullPointerException("row to lock is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to lockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + Integer r = region.obtainRowLock(row); + long lockId = addRowLock(r,region); + LOG.debug("Row lock " + lockId + " explicitly acquired by client"); + return lockId; + } catch (IOException e) { + LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException { + long lockId = -1L; + lockId = rand.nextLong(); + String lockName = String.valueOf(lockId); + synchronized(rowlocks) { + rowlocks.put(lockName, r); + } + this.leases. + createLease(lockName, new RowLockListener(lockName, region)); + return lockId; + } + /** + * Method to get the Integer lock identifier used internally + * from the long lock identifier used by the client. + * @param lockId long row lock identifier from client + * @return intId Integer row lock used internally in HRegion + * @throws IOException Thrown if this is not a valid client lock id. + */ + private Integer getLockFromId(long lockId) + throws IOException { + if(lockId == -1L) { + return null; + } + String lockName = String.valueOf(lockId); + Integer rl = null; + synchronized(rowlocks) { + rl = rowlocks.get(lockName); + } + if(rl == null) { + throw new IOException("Invalid row lock"); + } + return rl; + } + + /** {@inheritDoc} */ + public void unlockRow(byte [] regionName, long lockId) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(lockId == -1L) { + npe = new NullPointerException("lockId is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to unlockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + String lockName = String.valueOf(lockId); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(lockName); + } + if(r == null) { + throw new UnknownRowLockException(lockName); + } + region.releaseRowLock(r); + this.leases.cancelLease(lockName); + LOG.debug("Row lock " + lockId + " has been explicitly released by client"); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + Map rowlocks = + Collections.synchronizedMap(new HashMap()); + + /** + * Instantiated as a row lock lease. + * If the lease times out, the row lock is released + */ + private class RowLockListener implements LeaseListener { + private final String lockName; + private final HRegion region; + + RowLockListener(final String lockName, final HRegion region) { + this.lockName = lockName; + this.region = region; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Row Lock " + this.lockName + " lease expired"); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(this.lockName); + } + if(r != null) { + region.releaseRowLock(r); + } + } + } + + /** * @return Info on this server. */ public HServerInfo getServerInfo() { Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1,4 +1,4 @@ -/** + /** * Copyright 2007 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -1170,7 +1170,7 @@ * @throws IOException */ public Map getFull(final byte [] row, - final Set columns, final long ts) + final Set columns, final long ts, final Integer lockid) throws IOException { // Check columns passed if (columns != null) { @@ -1179,7 +1179,15 @@ } } HStoreKey key = new HStoreKey(row, ts); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } HashSet storeSet = new HashSet(); try { TreeMap result = @@ -1215,7 +1223,7 @@ return result; } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1347,7 +1355,7 @@ * @param b * @throws IOException */ - public void batchUpdate(BatchUpdate b) + public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException { checkReadOnly(); @@ -1363,7 +1371,16 @@ // See HRegionServer#RegionListener for how the expire on HRegionServer // invokes a HRegion#abort. byte [] row = b.getRow(); - Integer lid = obtainRowLock(row); + // If we did not pass an existing row lock, obtain a new one + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); try { @@ -1408,7 +1425,7 @@ this.targetColumns.remove(Long.valueOf(lid)); throw e; } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1451,17 +1468,27 @@ * @param row * @param column * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final byte [] column, final long ts) + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final Integer lockid) throws IOException { checkColumn(column); checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { deleteMultiple(row, column, ts, ALL_VERSIONS); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1469,12 +1496,22 @@ * Delete all cells of the same age as the passed timestamp or older. * @param row * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final long ts) + public void deleteAll(final byte [] row, final long ts, + final Integer lockid) throws IOException { checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { for (HStore store : stores.values()){ List keys = store.getKeys(new HStoreKey(row, ts), @@ -1486,7 +1523,7 @@ update(edits); } } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1497,12 +1534,22 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockid Row lock * @throws IOException */ - public void deleteFamily(byte [] row, byte [] family, long timestamp) + public void deleteFamily(byte [] row, byte [] family, long timestamp, + final Integer lockid) throws IOException{ checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { // find the HStore for the column family HStore store = getStore(family); @@ -1515,7 +1562,7 @@ } update(edits); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1545,7 +1592,7 @@ update(edits); } } - + /** * @throws IOException Throws exception if region is in read-only mode. */ @@ -1770,6 +1817,21 @@ } } + /** + * See if row is currently locked. + * @param lockid + * @return boolean + */ + boolean isRowLocked(final Integer lockid) { + synchronized (locksToRows) { + if(locksToRows.containsKey(lockid)) { + return true; + } else { + return false; + } + } + } + private void waitOnRowLocks() { synchronized (locksToRows) { while (this.locksToRows.size() > 0) { @@ -2126,7 +2188,8 @@ public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + (long)-1L); } /** @@ -2147,7 +2210,7 @@ b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, (long)-1L); } /** Index: src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (working copy) @@ -83,7 +83,7 @@ BatchUpdate b = new BatchUpdate(regionInfo.getRegionName()); b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString())); b.put(COL_STARTCODE, startCode); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); if (!this.historian.isOnline()) { // This is safest place to do the onlining of the historian in // the master. When we get to here, we know there is a .META. Index: src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java (working copy) @@ -52,7 +52,7 @@ throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString()); } Index: src/java/org/apache/hadoop/hbase/master/ColumnOperation.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ColumnOperation.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/ColumnOperation.java (working copy) @@ -52,7 +52,7 @@ throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } Index: src/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -560,7 +560,7 @@ byte [] regionName = region.getRegionName(); BatchUpdate b = new BatchUpdate(regionName); b.put(COL_REGIONINFO, Writables.getBytes(info)); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); // 4. Close the new region to flush it to disk. Close its log file too. region.close(); Index: src/java/org/apache/hadoop/hbase/master/BaseScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/BaseScanner.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/BaseScanner.java (working copy) @@ -332,7 +332,7 @@ BatchUpdate b = new BatchUpdate(parent); b.delete(splitColumn); - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); return result; } Index: src/java/org/apache/hadoop/hbase/master/ChangeTableState.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (working copy) @@ -91,7 +91,7 @@ updateRegionInfo(b, i); b.delete(COL_SERVER); b.delete(COL_STARTCODE); - server.batchUpdate(m.getRegionName(), b); + server.batchUpdate(m.getRegionName(), b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } Index: src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -111,11 +111,12 @@ * * @param regionName region name * @param row row key + * @param lockId lock id * @return map of values * @throws IOException */ public RowResult getRow(final byte [] regionName, final byte [] row, - final byte[][] columns, final long ts) + final byte[][] columns, final long ts, final long lockId) throws IOException; /** @@ -123,9 +124,11 @@ * * @param regionName name of the region to update * @param b BatchUpdate + * @param lockId lock id * @throws IOException */ - public void batchUpdate(final byte [] regionName, final BatchUpdate b) + public void batchUpdate(final byte [] regionName, final BatchUpdate b, + final long lockId) throws IOException; /** @@ -136,10 +139,11 @@ * @param row row key * @param column column key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId lock id * @throws IOException */ public void deleteAll(byte [] regionName, byte [] row, byte [] column, - long timestamp) + long timestamp, long lockId) throws IOException; /** @@ -149,9 +153,11 @@ * @param regionName region name * @param row row key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId lock id * @throws IOException */ - public void deleteAll(byte [] regionName, byte [] row, long timestamp) + public void deleteAll(byte [] regionName, byte [] row, long timestamp, + long lockId) throws IOException; /** @@ -162,9 +168,10 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockId lock id */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) + long timestamp, long lockId) throws IOException; @@ -207,4 +214,24 @@ * @throws IOException */ public void close(long scannerId) throws IOException; + + /** + * Opens a remote row lock. + * + * @param regionName name of region + * @param row row to lock + * @return lockId lock identifier + * @throws IOException + */ + public long lockRow(final byte [] regionName, final byte [] row) + throws IOException; + + /** + * Releases a remote row lock. + * + * @param lockId the lock id returned by lockRow + * @throws IOException + */ + public void unlockRow(final byte [] regionName, final long lockId) + throws IOException; } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/util/Merge.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Merge.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/util/Merge.java (working copy) @@ -308,7 +308,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Removing region: " + regioninfo + " from " + meta); } - meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis()); + meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null); } /* Index: src/java/org/apache/hadoop/hbase/util/MetaUtils.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/MetaUtils.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/util/MetaUtils.java (working copy) @@ -407,7 +407,7 @@ } BatchUpdate b = new BatchUpdate(hri.getRegionName()); b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri)); - r.batchUpdate(b); + r.batchUpdate(b, null); if (LOG.isDebugEnabled()) { HRegionInfo h = Writables.getHRegionInfoOrNull( r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue()); Index: src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java (working copy) @@ -2050,7 +2050,8 @@ public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + -1L); } /** @@ -2071,7 +2072,7 @@ b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); } /** Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 684125) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -640,11 +640,32 @@ public RowResult getRow(final byte [] row, final byte [][] columns, final long ts) throws IOException { + return getRow(row,columns,ts,null); + } + + /** + * Get selected columns for the specified row at a specified timestamp + * using existing row lock. + * + * @param row row key + * @param columns Array of column names and families you want to retrieve. + * @param ts timestamp + * @param rl row lock + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final byte [] row, final byte [][] columns, + final long ts, final RowLock rl) + throws IOException { return connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public RowResult call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } return server.getRow(location.getRegionInfo().getRegionName(), row, - columns, ts); + columns, ts, lockId); } } ); @@ -1104,15 +1125,35 @@ */ public void deleteAll(final byte [] row, final byte [] column, final long ts) throws IOException { + deleteAll(row,column,ts,null); + } + + /** + * Delete all cells that match the passed row and column and whose + * timestamp is equal-to or older than the passed timestamp, using an + * existing row lock. + * @param row Row to update + * @param column name of column whose value is to be deleted + * @param ts Delete all cells of the same timestamp or older. + * @param rl Existing row lock + * @throws IOException + */ + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } if (column != null) { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, column, ts); + row, column, ts, lockId); } else { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, ts); + row, ts, lockId); } return null; } @@ -1161,11 +1202,31 @@ public void deleteFamily(final byte [] row, final byte [] family, final long timestamp) throws IOException { + deleteFamily(row,family,timestamp,null); + } + + /** + * Delete all cells for a row with matching column family with timestamps + * less than or equal to timestamp, using existing row lock. + * + * @param row The row to operate on + * @param family The column family to match + * @param timestamp Timestamp to match + * @param rl Existing row lock + * @throws IOException + */ + public void deleteFamily(final byte [] row, final byte [] family, + final long timestamp, final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.deleteFamily(location.getRegionInfo().getRegionName(), row, - family, timestamp); + family, timestamp, lockId); return null; } } @@ -1179,11 +1240,27 @@ */ public synchronized void commit(final BatchUpdate batchUpdate) throws IOException { + commit(batchUpdate,null); + } + + /** + * Commit a BatchUpdate to the table using existing row lock. + * @param batchUpdate + * @param rl Existing row lock + * @throws IOException + */ + public synchronized void commit(final BatchUpdate batchUpdate, + final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, batchUpdate.getRow()) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.batchUpdate(location.getRegionInfo().getRegionName(), - batchUpdate); + batchUpdate, lockId); return null; } } @@ -1198,10 +1275,48 @@ public synchronized void commit(final List batchUpdates) throws IOException { for (BatchUpdate batchUpdate : batchUpdates) - commit(batchUpdate); + commit(batchUpdate,null); } /** + * Obtain a row lock + * @param row The row to lock + * @return rowLock RowLock containing row and lock id + * @throws IOException + */ + public RowLock lockRow(final byte [] row) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public RowLock call() throws IOException { + long lockId = + server.lockRow(location.getRegionInfo().getRegionName(), row); + RowLock rowLock = new RowLock(row,lockId); + return rowLock; + } + } + ); + } + + /** + * Release a row lock + * @param rl The row lock to release + * @throws IOException + */ + public void unlockRow(final RowLock rl) + throws IOException { + connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, rl.getRow()) { + public Boolean call() throws IOException { + server.unlockRow(location.getRegionInfo().getRegionName(), + rl.getLockId()); + return null; + } + } + ); + } + + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate * through them all.