diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index d000145..ea183b2 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -1479,6 +1479,23 @@ public class HTable { public ArrayList getWriteBuffer() { return writeBuffer; } + + /** + * Atomically increments the current value of a column and returns it. + * The timestamp is not well defined after using this. + * + */ + public long incrementColumnValue(final byte [] row, final byte [] column, + final int amount) throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public Long call() throws IOException { + return server.incrementColumnValue( + location.getRegionInfo().getRegionName(), row, column, amount); + } + } + ); + } /** * Implements the scanner interface for the HBase client. diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java index 5202a9d..904d859 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java @@ -68,7 +68,8 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol { *
  • Version 16: Removed HMasterRegionInterface.getRootRegionLocation and * HMasterInterface.findRootRegion. We use ZooKeeper to store root region * location instead.
  • + *
  • Version 17: Added incrementColumnValue.
  • * */ - public static final long versionID = 16L; + public static final long versionID = 17L; } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 7b62205..f26dee8 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -292,4 +292,18 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { */ public void unlockRow(final byte [] regionName, final long lockId) throws IOException; + + /** + * Atomically increments a column value. If the column value isn't long-like, this could + * throw an exception. + * + * @param regionName + * @param row + * @param column + * @param amount + * @return new incremented column value + * @throws IOException + */ + public long incrementColumnValue(byte [] regionName, byte [] row, + byte [] column, long amount) throws IOException; } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dcb73a7..348f9f2 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ColumnNameParseException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -2585,4 +2586,81 @@ public class HRegion implements HConstants { } } } -} \ No newline at end of file + + + public long incrementColumnValue(byte[] row, byte[] column, long amount) throws IOException { + checkRow(row); + checkColumn(column); + + Integer lid = obtainRowLock(row); + splitsAndClosesLock.readLock().lock(); + try { + HStoreKey hsk = new HStoreKey(row, column); + long ts = System.currentTimeMillis(); + byte [] value = null; + long newval; // the new value. + + Store store = getStore(column); + + List c; + // Try the memcache first. + store.lock.readLock().lock(); + try { + c = store.memcache.get(hsk, 1); + } finally { + store.lock.readLock().unlock(); + } + if (c.size() == 1) { + // Use the memcache timestamp value. + LOG.info("Overwriting the memcache value for " + Bytes.toString(row) + "/" + Bytes.toString(column)); + ts = c.get(0).getTimestamp(); + value = c.get(0).getValue(); + } else if (c.size() > 1) { + throw new DoNotRetryIOException("more than 1 value returned in incrementColumnValue from memcache"); + } + + if (value == null) { + // Check the store (including disk) for the previous value. + Cell[] cell = store.get(hsk, 1); + if (cell != null && cell.length == 1) { + LOG.info("Using HFile previous value for " + Bytes.toString(row) + "/" + Bytes.toString(column)); + value = cell[0].getValue(); + } else if (cell != null && c.size() > 1) { + throw new DoNotRetryIOException("more than 1 value returned in incrementColumnValue from Store"); + } + } + + if (value == null) { + // Doesn't exist + LOG.info("Creating new counter value for " + Bytes.toString(row) + "/"+ Bytes.toString(column)); + newval = amount; + } else { + newval = incrementBytes(value, amount); + } + + BatchUpdate b = new BatchUpdate(row, ts); + b.put(column, Bytes.toBytes(newval)); + batchUpdate(b, lid, true); + return newval; + } finally { + splitsAndClosesLock.readLock().unlock(); + releaseRowLock(lid); + } + } + + private long incrementBytes(byte[] value, long amount) throws IOException { + // Hopefully this doesn't happen too often. + if (value.length < Bytes.SIZEOF_LONG) { + byte [] newvalue = new byte[Bytes.SIZEOF_LONG]; + for (int i = 0; i < newvalue.length; ++i) + newvalue[i] = 0; + System.arraycopy(value, 0, newvalue, newvalue.length - value.length, value.length); + value = newvalue; + } else if (value.length > Bytes.SIZEOF_LONG) { + throw new DoNotRetryIOException("Increment Bytes - value too big: " + value.length); + } + long v = Bytes.toLong(value); + v += amount; + return v; + } +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5f0d319..dca5ada 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2287,4 +2287,35 @@ public class HRegionServer implements HConstants, HRegionInterface, HBaseRPCErro .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); doMain(args, regionServerClass); } + + /** {@inheritDoc} */ + public long incrementColumnValue(byte[] regionName, byte[] row, + byte[] column, long amount) throws IOException { + checkOpen(); + + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (row == null) { + npe = new NullPointerException("row is null"); + } else if (column == null) { + npe = new NullPointerException("column is null"); + } + if (npe != null) { + IOException io = new IOException( + "Invalid arguments to incrementColumnValue"); + io.initCause(npe); + throw io; + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + return region.incrementColumnValue(row, column, amount); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + + + } } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestAtomicIncrement.java b/src/test/org/apache/hadoop/hbase/regionserver/TestAtomicIncrement.java new file mode 100644 index 0000000..a9f8fb9 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestAtomicIncrement.java @@ -0,0 +1,99 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestAtomicIncrement extends HBaseClusterTestCase { + static final Log LOG = LogFactory.getLog(TestAtomicIncrement.class); + + private static final byte [] CONTENTS = Bytes.toBytes("contents:"); + + public void testIncrement() throws IOException { + try { + HTable table = null; + + // Setup + + HTableDescriptor desc = new HTableDescriptor(getName()); + desc.addFamily( + new HColumnDescriptor(CONTENTS, // Column name + 1, // Max versions + HColumnDescriptor.DEFAULT_COMPRESSION, // no compression + HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_LENGTH, + HColumnDescriptor.DEFAULT_TTL, + false + ) + ); + + // Create the table + + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + + try { + // Give cache flusher and log roller a chance to run + // Otherwise we'll never hit the bloom filter, just the memcache + Thread.sleep(conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000) * 10); + + } catch (InterruptedException e) { + // ignore + } + // Open table + + table = new HTable(conf, desc.getName()); + + byte [] row = Bytes.toBytes("foo"); + byte [] column = "contents:1".getBytes(HConstants.UTF8_ENCODING); + // increment by 1: + assertEquals(1L, table.incrementColumnValue(row, column, 1)); + + // set a weird value, then increment: + row = Bytes.toBytes("foo2"); + byte [] value = {0,0,1}; + BatchUpdate bu = new BatchUpdate(row); + bu.put(column, value); + table.commit(bu); + + assertEquals(2L, table.incrementColumnValue(row, column, 1)); + + assertEquals(-2L, table.incrementColumnValue(row, column, -4)); + + row = Bytes.toBytes("foo3"); + byte[] value2 = {1,2,3,4,5,6,7,8,9}; + bu = new BatchUpdate(row); + bu.put(column, value2); + table.commit(bu); + + try { + table.incrementColumnValue(row, column, 1); + fail(); + } catch (IOException e) { + System.out.println("Expected exception: " + e); + // expected exception. + } + + + } catch (Exception e) { + e.printStackTrace(); + if (e instanceof IOException) { + IOException i = (IOException) e; + throw i; + } + fail(); + } + + } + +}