diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/client/HTable.java jonsrc/java/org/apache/hadoop/hbase/client/HTable.java --- src/java/org/apache/hadoop/hbase/client/HTable.java 2008-08-07 17:12:07.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/client/HTable.java 2008-08-07 15:44:12.000000000 -0700 @@ -978,7 +978,7 @@ * @throws IOException */ public void deleteAll(final byte [] row) throws IOException { - deleteAll(row, null); + deleteAll(row, (byte[])null); } /** @@ -1111,15 +1111,75 @@ */ public void deleteAll(final byte [] row, final byte [] column, final long ts) throws IOException { + deleteAll(row,column,ts,(RowLock)null); + } + + /** + * Completely delete the row's cells using existing row lock. + * + * @param row Key of the row you want to completely delete. + * @param rl Previously acquired row lock. + * @throws IOException + */ + public void deleteAll(final byte [] row, final RowLock rl) + throws IOException { + deleteAll(row, null, rl); + } + + /** + * Completely delete the row's cells using existing row lock. + * + * @param row Key of the row you want to completely delete. + * @param column column to be deleted + * @param rl Previously acquired row lock. + * @throws IOException + */ + public void deleteAll(final byte [] row, final byte [] column, + final RowLock rl) + throws IOException { + deleteAll(row, column, HConstants.LATEST_TIMESTAMP, rl); + } + + /** + * Completely delete the row's cells using existing row lock. + * + * @param row Key of the row you want to completely delete. + * @param ts Delete all cells of the same timestamp or older. + * @param rl Previously acquired row lock. + * @throws IOException + */ + public void deleteAll(final byte [] row, final long ts, + final RowLock rl) + throws IOException { + deleteAll(row, null, ts, rl); + } + + /** + * Delete all cells that match the passed row and column and whose + * timestamp is equal-to or older than the passed timestamp, using + * existing row lock. + * @param row Row to update + * @param column name of column whose value is to be deleted + * @param ts Delete all cells of the same timestamp or older. + * @param rl Previously acquired row lock + * @throws IOException + */ + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } if (column != null) { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, column, ts); + row, column, ts, lockId); } else { this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, ts); + row, ts, lockId); } return null; } @@ -1168,11 +1228,31 @@ public void deleteFamily(final byte [] row, final byte [] family, final long timestamp) throws IOException { + deleteFamily(row,family,timestamp,(RowLock)null); + } + + /** + * Delete all cells for a row with matching column family with timestamps + * less than or equal to timestamp using existing row lock. + * + * @param row The row to operate on + * @param family The column family to match + * @param timestamp Timestamp to match + * @param rl Previously acquired row lock. + * @throws IOException + */ + public void deleteFamily(final byte [] row, final byte [] family, + final long timestamp, final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.deleteFamily(location.getRegionInfo().getRegionName(), row, - family, timestamp); + family, timestamp, lockId); return null; } } @@ -1186,17 +1266,34 @@ */ public synchronized void commit(final BatchUpdate batchUpdate) throws IOException { + commit(batchUpdate,(RowLock)null); + } + + /** + * Commit a BatchUpdate to the table using existing row lock. + * @param batchUpdate + * @param rl Previously acquired row lock. + * @throws IOException + */ + public synchronized void commit(final BatchUpdate batchUpdate, final RowLock rl) + throws IOException { connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, batchUpdate.getRow()) { public Boolean call() throws IOException { + long lockId = -1L; + if(rl != null) { + lockId = rl.getLockId(); + } server.batchUpdate(location.getRegionInfo().getRegionName(), - batchUpdate); + batchUpdate, lockId); return null; } } ); } + + /** * Commit a RowsBatchUpdate to the table. * @param batchUpdates @@ -1208,6 +1305,51 @@ commit(batchUpdate); } + + // + // New Row Locking Functions + // + + /** + * Obtain a row lock + * @param row The row to lock + * @return rowLock RowLock containing row and lock id + * @throws IOException + */ + public RowLock lockRow(final byte [] row) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public RowLock call() throws IOException { + long lockId = + server.lockRow(location.getRegionInfo().getRegionName(), row); + RowLock rowLock = new RowLock(row,lockId); + return rowLock; + } + } + ); + } + + /** + * Release a row lock + * @param rl The row lock to release + * @throws IOException + */ + public void unlockRow(final RowLock rl) + throws IOException { + connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, rl.getRow()) { + public Boolean call() throws IOException { + server.unlockRow(location.getRegionInfo().getRegionName(), + rl.getLockId()); + return null; + } + } + ); + } + + + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/HMerge.java jonsrc/java/org/apache/hadoop/hbase/HMerge.java --- src/java/org/apache/hadoop/hbase/HMerge.java 2008-08-07 17:12:07.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/HMerge.java 2008-08-07 08:48:35.000000000 -0700 @@ -373,7 +373,7 @@ b.delete(COL_STARTCODE); b.delete(COL_SPLITA); b.delete(COL_SPLITB); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + regionsToDelete[r]); @@ -383,7 +383,7 @@ newInfo.setOffline(true); BatchUpdate b = new BatchUpdate(newRegion.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(newInfo)); - root.batchUpdate(b); + root.batchUpdate(b,null); if(LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + newRegion.getRegionName()); } diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java jonsrc/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java --- src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java 2008-08-07 08:44:58.000000000 -0700 @@ -147,9 +147,11 @@ * * @param regionName name of the region to update * @param b BatchUpdate + * @param lockId Existing row lock id (if available) * @throws IOException */ - public void batchUpdate(final byte [] regionName, final BatchUpdate b) + public void batchUpdate(final byte [] regionName, final BatchUpdate b, + final long lockId) throws IOException; /** @@ -160,10 +162,11 @@ * @param row row key * @param column column key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId Existing row lock id (if available) * @throws IOException */ public void deleteAll(byte [] regionName, byte [] row, byte [] column, - long timestamp) + long timestamp, long lockId) throws IOException; /** @@ -173,9 +176,11 @@ * @param regionName region name * @param row row key * @param timestamp Delete all entries that have this timestamp or older + * @param lockId Existing row lock id (if available) * @throws IOException */ - public void deleteAll(byte [] regionName, byte [] row, long timestamp) + public void deleteAll(byte [] regionName, byte [] row, long timestamp, + long lockId) throws IOException; /** @@ -186,9 +191,11 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockId Existing ro wlock id (if avialable) + * @throws IOException */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) + long timestamp, long lockId) throws IOException; @@ -231,4 +238,29 @@ * @throws IOException */ public void close(long scannerId) throws IOException; + + // + // remote row lock interface + // + + /** + * Opens a remote row lock. + * + * @param regionName name of region + * @param row row to lock + * + * @return lockId lock identifier used in other calls + * @throws IOException + */ + public long lockRow(final byte [] regionName, final byte [] row) + throws IOException; + + /** + * Releases a remote row lock. + * + * @param lockId the lock id returned by lockRow + * @throws IOException + */ + public void unlockRow(final byte [] regionName, final long lockId) + throws IOException; } \ No newline at end of file diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/BaseScanner.java jonsrc/java/org/apache/hadoop/hbase/master/BaseScanner.java --- src/java/org/apache/hadoop/hbase/master/BaseScanner.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/BaseScanner.java 2008-08-07 08:52:00.000000000 -0700 @@ -332,7 +332,7 @@ BatchUpdate b = new BatchUpdate(parent); b.delete(splitColumn); - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); return result; } diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/ChangeTableState.java jonsrc/java/org/apache/hadoop/hbase/master/ChangeTableState.java --- src/java/org/apache/hadoop/hbase/master/ChangeTableState.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/ChangeTableState.java 2008-08-07 08:52:17.000000000 -0700 @@ -91,7 +91,7 @@ updateRegionInfo(b, i); b.delete(COL_SERVER); b.delete(COL_STARTCODE); - server.batchUpdate(m.getRegionName(), b); + server.batchUpdate(m.getRegionName(), b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/ColumnOperation.java jonsrc/java/org/apache/hadoop/hbase/master/ColumnOperation.java --- src/java/org/apache/hadoop/hbase/master/ColumnOperation.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/ColumnOperation.java 2008-08-07 08:52:07.000000000 -0700 @@ -52,7 +52,7 @@ throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.getRegionNameAsString()); } diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java jonsrc/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java --- src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java 2008-08-07 08:52:26.000000000 -0700 @@ -52,7 +52,7 @@ throws IOException { BatchUpdate b = new BatchUpdate(i.getRegionName()); b.put(COL_REGIONINFO, Writables.getBytes(i)); - server.batchUpdate(regionName, b); + server.batchUpdate(regionName, b, -1L); LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString()); } diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java jonsrc/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java --- src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java 2008-08-07 08:52:36.000000000 -0700 @@ -83,7 +83,7 @@ BatchUpdate b = new BatchUpdate(regionInfo.getRegionName()); b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString())); b.put(COL_STARTCODE, startCode); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); if (!this.historian.isOnline()) { // This is safest place to do the onlining of the historian in // the master. When we get to here, we know there is a .META. diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/master/RegionManager.java jonsrc/java/org/apache/hadoop/hbase/master/RegionManager.java --- src/java/org/apache/hadoop/hbase/master/RegionManager.java 2008-08-07 17:12:05.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/master/RegionManager.java 2008-08-07 08:51:47.000000000 -0700 @@ -560,7 +560,7 @@ byte [] regionName = region.getRegionName(); BatchUpdate b = new BatchUpdate(regionName); b.put(COL_REGIONINFO, Writables.getBytes(info)); - server.batchUpdate(metaRegionName, b); + server.batchUpdate(metaRegionName, b, -1L); // 4. Close the new region to flush it to disk. Close its log file too. region.close(); diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/regionserver/HRegion.java jonsrc/java/org/apache/hadoop/hbase/regionserver/HRegion.java --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java 2008-08-07 17:12:03.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/regionserver/HRegion.java 2008-08-07 15:46:09.000000000 -0700 @@ -1,4 +1,4 @@ -/** + /** * Copyright 2007 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one @@ -1347,7 +1347,7 @@ * @param b * @throws IOException */ - public void batchUpdate(BatchUpdate b) + public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException { checkReadOnly(); @@ -1363,7 +1363,17 @@ // See HRegionServer#RegionListener for how the expire on HRegionServer // invokes a HRegion#abort. byte [] row = b.getRow(); - Integer lid = obtainRowLock(row); + // If we did not pass an existing row lock, obtain a new one + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + LOG.debug("batchUpdate is successfully using an existing row lock!"); + lid = lockid; + } long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); try { @@ -1408,7 +1418,7 @@ this.targetColumns.remove(Long.valueOf(lid)); throw e; } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1451,17 +1461,27 @@ * @param row * @param column * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final byte [] column, final long ts) + public void deleteAll(final byte [] row, final byte [] column, final long ts, + final Integer lockid) throws IOException { checkColumn(column); checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { deleteMultiple(row, column, ts, ALL_VERSIONS); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1469,12 +1489,22 @@ * Delete all cells of the same age as the passed timestamp or older. * @param row * @param ts Delete all entries that have this timestamp or older + * @param lockid Row lock * @throws IOException */ - public void deleteAll(final byte [] row, final long ts) + public void deleteAll(final byte [] row, final long ts, + final Integer lockid) throws IOException { checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { for (HStore store : stores.values()){ List keys = store.getKeys(new HStoreKey(row, ts), @@ -1486,7 +1516,7 @@ update(edits); } } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1497,12 +1527,22 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @param lockid Row lock * @throws IOException */ - public void deleteFamily(byte [] row, byte [] family, long timestamp) + public void deleteFamily(byte [] row, byte [] family, long timestamp, + final Integer lockid) throws IOException{ checkReadOnly(); - Integer lid = obtainRowLock(row); + Integer lid = null; + if(lockid == null) { + lid = obtainRowLock(row); + } else { + if(!isRowLocked(lockid)) { + throw new IOException("Invalid row lock"); + } + lid = lockid; + } try { // find the HStore for the column family HStore store = getStore(family); @@ -1515,7 +1555,7 @@ } update(edits); } finally { - releaseRowLock(lid); + if(lockid == null) releaseRowLock(lid); } } @@ -1770,6 +1810,21 @@ } } + /** + * See if row is currently locked. + * @param lockid + * @return boolean + */ + boolean isRowLocked(final Integer lockid) { + synchronized (locksToRows) { + if(locksToRows.containsKey(lockid)) { + return true; + } else { + return false; + } + } + } + private void waitOnRowLocks() { synchronized (locksToRows) { while (this.locksToRows.size() > 0) { @@ -2126,7 +2181,8 @@ public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + (long)-1L); } /** @@ -2147,7 +2203,7 @@ b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, (long)-1L); } /** diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java jonsrc/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java 2008-08-07 17:12:03.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java 2008-08-07 15:45:33.000000000 -0700 @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.RegionHistorian; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchOperation; @@ -1140,7 +1141,8 @@ } /** {@inheritDoc} */ - public void batchUpdate(final byte [] regionName, BatchUpdate b) + public void batchUpdate(final byte [] regionName, BatchUpdate b, + final long lockId) throws IOException { checkOpen(); this.requestCount.incrementAndGet(); @@ -1148,7 +1150,7 @@ validateValuesLength(b, region); try { cacheFlusher.reclaimMemcacheMemory(); - region.batchUpdate(b); + region.batchUpdate(b, getLockFromId(lockId)); } catch (OutOfMemoryError error) { abort(); LOG.fatal("Ran out of memory", error); @@ -1289,26 +1291,161 @@ /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final byte [] column, final long timestamp) + final byte [] column, final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, column, timestamp); + region.deleteAll(row, column, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteAll(final byte [] regionName, final byte [] row, - final long timestamp) + final long timestamp, final long lockId) throws IOException { HRegion region = getRegion(regionName); - region.deleteAll(row, timestamp); + region.deleteAll(row, timestamp, getLockFromId(lockId)); } /** {@inheritDoc} */ public void deleteFamily(byte [] regionName, byte [] row, byte [] family, - long timestamp) throws IOException{ - getRegion(regionName).deleteFamily(row, family, timestamp); + long timestamp, final long lockId) throws IOException{ + getRegion(regionName).deleteFamily(row, family, timestamp, + getLockFromId(lockId)); } + // + // remote row lock interface + // + + /** {@inheritDoc} */ + public long lockRow(byte [] regionName, byte [] row) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(row == null) { + npe = new NullPointerException("row to lock is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to lockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + Integer r = region.obtainRowLock(row); + long lockId = addRowLock(r,region); + LOG.debug("Row lock " + lockId + " explicitly acquired by client"); + return lockId; + } catch (IOException e) { + LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException { + long lockId = -1L; + lockId = rand.nextLong(); + String lockName = String.valueOf(lockId); + synchronized(rowlocks) { + rowlocks.put(lockName, r); + } + this.leases. + createLease(lockName, new RowLockListener(lockName, region)); + return lockId; + } + + /** + * Method to get the Integer lock identifier used internally + * from the long lock identifier used by the client. + * @param lockId long row lock identifier from client + * @return intId Integer row lock used internally in HRegion + * @throws IOException Thrown if this is not a valid client lock id. + */ + private Integer getLockFromId(long lockId) + throws IOException { + LOG.debug("getLockFromId(" + lockId + ")"); + if(lockId == -1L) { + return null; + } + String lockName = String.valueOf(lockId); + Integer rl = null; + synchronized(rowlocks) { + rl = rowlocks.get(lockName); + } + if(rl == null) { + throw new IOException("Invalid row lock"); + } + LOG.debug("getLockFromId -> Valid lock! " + rl); + return rl; + } + + /** {@inheritDoc} */ + public void unlockRow(byte [] regionName, long lockId) + throws IOException { + checkOpen(); + NullPointerException npe = null; + if(regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if(lockId == -1L) { + npe = new NullPointerException("lockId is null"); + } + if(npe != null) { + IOException io = new IOException("Invalid arguments to unlockRow"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + String lockName = String.valueOf(lockId); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(lockName); + } + if(r == null) { + throw new UnknownRowLockException(lockName); + } + region.releaseRowLock(r); + this.leases.cancelLease(lockName); + LOG.debug("Row lock " + lockId + " has been explicitly released by client"); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + Map rowlocks = + Collections.synchronizedMap(new HashMap()); + + /** + * Instantiated as a row lock lease. + * If the lease times out, the row lock is released + */ + private class RowLockListener implements LeaseListener { + private final String lockName; + private final HRegion region; + + RowLockListener(final String lockName, final HRegion region) { + this.lockName = lockName; + this.region = region; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Row Lock " + this.lockName + " lease expired"); + Integer r = null; + synchronized(rowlocks) { + r = rowlocks.remove(this.lockName); + } + if(r != null) { + region.releaseRowLock(r); + } + } + } /** * @return Info on this server. diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/util/Merge.java jonsrc/java/org/apache/hadoop/hbase/util/Merge.java --- src/java/org/apache/hadoop/hbase/util/Merge.java 2008-08-07 17:12:06.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/util/Merge.java 2008-08-07 08:50:38.000000000 -0700 @@ -308,7 +308,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Removing region: " + regioninfo + " from " + meta); } - meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis()); + meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null); } /* diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/util/MetaUtils.java jonsrc/java/org/apache/hadoop/hbase/util/MetaUtils.java --- src/java/org/apache/hadoop/hbase/util/MetaUtils.java 2008-08-07 17:12:06.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/util/MetaUtils.java 2008-08-07 08:51:03.000000000 -0700 @@ -407,7 +407,7 @@ } BatchUpdate b = new BatchUpdate(hri.getRegionName()); b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri)); - r.batchUpdate(b); + r.batchUpdate(b, null); if (LOG.isDebugEnabled()) { HRegionInfo h = Writables.getHRegionInfoOrNull( r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue()); diff -uNbrx .svn -x '*~' -x '*.orig' -x '*.rej' -x DEADJOE -x '*.old' -x '*.class' -x '*.xml' src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java jonsrc/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java --- src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java 2008-08-07 17:12:06.000000000 -0700 +++ jonsrc/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java 2008-08-07 08:54:25.000000000 -0700 @@ -2050,7 +2050,8 @@ public static void removeRegionFromMETA(final HRegionInterface srvr, final byte [] metaRegionName, final byte [] regionName) throws IOException { - srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); + srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, + -1L); } /** @@ -2071,7 +2072,7 @@ b.delete(COL_STARTCODE); // If carrying splits, they'll be in place when we show up on new // server. - srvr.batchUpdate(metaRegionName, b); + srvr.batchUpdate(metaRegionName, b, -1L); } /**