Index: src/test/org/apache/hadoop/hbase/client/TestHTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestHTable.java (revision 729584) +++ src/test/org/apache/hadoop/hbase/client/TestHTable.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; +import java.util.HashMap; import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -255,4 +256,38 @@ } } + public void testCheckAndSave() { + HTable table = null; + try { + HColumnDescriptor column2 = + new HColumnDescriptor(Bytes.toBytes("info2:")); + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor testTableADesc = + new HTableDescriptor(tableAname); + testTableADesc.addFamily(column); + testTableADesc.addFamily(column2); + admin.createTable(testTableADesc); + + table = new HTable(conf, tableAname); + BatchUpdate batchUpdate = new BatchUpdate(row); + BatchUpdate batchUpdate2 = new BatchUpdate(row); + + HashMap expectedValues = new HashMap(); + + for(int i = 0; i < 5; i++) { + batchUpdate.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i)); + batchUpdate2.put(COLUMN_FAMILY_STR+i+1, Bytes.toBytes(i+1)); + expectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), Bytes.toBytes(i)); } + + table.commit(batchUpdate); + + assertTrue(table.checkAndSave(batchUpdate2,expectedValues,null)); + assertTrue(!table.checkAndSave(batchUpdate2,expectedValues,null)); + } catch (IOException e) { + e.printStackTrace(); + fail("Should not have any exception " + + e.getClass()); + } + } +} Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 729584) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1622,6 +1622,25 @@ return -1; } + public Boolean checkAndSave(final byte [] regionName, final BatchUpdate b, + final Map expectedValues) + throws IOException { + if (b.getRow() == null) + throw new IllegalArgumentException("update has null row"); + + checkOpen(); + this.requestCount.incrementAndGet(); + HRegion region = getRegion(regionName); + validateValuesLength(b, region); + try { + cacheFlusher.reclaimMemcacheMemory(); + return region.checkAndSave(b,expectedValues,getLockFromId(b.getRowLock()),false); + } catch (Throwable t) { + throw convertThrowableToIOE(cleanup(t)); + } + } + + /** * Utility method to verify values length * @param batchUpdate The update to verify Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 729584) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1275,6 +1275,93 @@ } } + + /** + * Performs an atomic check and save operation. Checks if + * the specified expected values have changed, and if not + * applies the update. + * + * @param b the update to apply + * @param expectedValues the expected values to check + * @param writeToWAL whether or not to write to the write ahead log + */ + public Boolean checkAndSave(BatchUpdate b, Map expectedValues, Integer lockid, boolean writeToWAL) + throws IOException { + // This is basically a copy of batchUpdate with the atomic check and save added in. So + // you should read this method with batchUpdate. I will comment the areas that I have changed + // where I have not changed, you should read the comments from the batchUpdate method + checkReadOnly(); + + checkResources(); + + splitsAndClosesLock.readLock().lock(); + try { + byte[] row = b.getRow(); + Integer lid = getLock(lockid,row); + + Set keySet = expectedValues.keySet(); + Map actualValues = this.getFull(row,keySet,HConstants.LATEST_TIMESTAMP,1,lid); + for(byte[] key : keySet) { + // If test fails exit + if(!Bytes.equals(actualValues.get(key).getValue(),expectedValues.get(key))) { + if(lockid == null) releaseRowLock(lid); + splitsAndClosesLock.readLock().unlock(); + return new Boolean(false); + } + } + long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? + System.currentTimeMillis() : b.getTimestamp(); + try { + List deletes = null; + for (BatchOperation op: b) { + HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime, + this.regionInfo); + byte[] val = null; + if (op.isPut()) { + val = op.getValue(); + if (HLogEdit.isDeleted(val)) { + throw new IOException("Cannot insert value: " + val); + } + } else { + if (b.getTimestamp() == LATEST_TIMESTAMP) { + // Save off these deletes + if (deletes == null) { + deletes = new ArrayList(); + } + deletes.add(op.getColumn()); + } else { + val = HLogEdit.deleteBytes.get(); + } + } + if (val != null) { + localput(lid, key, val); + } + } + TreeMap edits = + this.targetColumns.remove(lid); + + if (edits != null && edits.size() > 0) { + update(edits, writeToWAL); + } + + if (deletes != null && deletes.size() > 0) { + // We have some LATEST_TIMESTAMP deletes to run. + for (byte [] column: deletes) { + deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + } + } + } catch (IOException e) { + this.targetColumns.remove(Long.valueOf(lid)); + throw e; + } finally { + if(lockid == null) releaseRowLock(lid); + } + } finally { + splitsAndClosesLock.readLock().unlock(); + } + return new Boolean(true); + } + /* * Check if resources to support an update. * Index: src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 729584) +++ src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; @@ -118,6 +119,22 @@ throws IOException; /** + * Applies a batch of updates to one row atomically via one RPC + * if the columns specified in expectedValues match + * the given values in expectedValues + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @param expectedValues map of column names to expected data values. + * @param lockId lock id + * @throws IOException + */ + public Boolean checkAndSave(final byte [] regionName, final BatchUpdate b, + final Map expectedValues) + throws IOException; + + + /** * Delete all cells that match the passed row and column and whose timestamp * is equal-to or older than the passed timestamp. * Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 729584) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -1258,6 +1258,27 @@ } /** + * Atomically checks if a row's values match + * the expectedValues. If it does, it uses the + * batchUpdate to update the row. + */ + public synchronized Boolean checkAndSave(final BatchUpdate batchUpdate, final Map expectedValues, + final RowLock rl) + throws IOException { + checkRowAndColumns(batchUpdate); + if(rl != null) { + batchUpdate.setRowLock(rl.getLockId()); + } + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, batchUpdate.getRow()) { + public Boolean call() throws IOException { + return server.checkAndSave(location.getRegionInfo().getRegionName(), batchUpdate, expectedValues); + } + } + ); + } + + /** * Commit to the table the buffer of BatchUpdate. * Called automaticaly in the commit methods when autoFlush is true. * @throws IOException