Index: src/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 787338)
+++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -472,8 +472,8 @@
}
/**
- * Atomically increments a column value. If the column value isn't long-like,
- * this could throw an exception.
+ * Atomically increments a column value. If the column value already exists
+ * and is not a big-endian long, this could throw an exception.
*
* @param row
* @param family
@@ -485,6 +485,26 @@
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount)
throws IOException {
+ return incrementColumnValue(row, family, qualifier, amount, true);
+ }
+
+ /**
+ * Atomically increments a column value. If the column value already exists
+ * and is not a big-endian long, this could throw an exception.
+ *
+ * Setting writeToWAL to false means that in a fail scenario, you will lose
+ * any increments that have not been flushed.
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param amount
+ * @param writeToWAL true if increment should be applied to WAL, false if not
+ * @return The new value.
+ * @throws IOException
+ */
+ public long incrementColumnValue(final byte [] row, final byte [] family,
+ final byte [] qualifier, final long amount, final boolean writeToWAL)
+ throws IOException {
NullPointerException npe = null;
if (row == null) {
npe = new NullPointerException("row is null");
@@ -499,11 +519,9 @@
return connection.getRegionServerWithRetries(
new ServerCallable(connection, tableName, row) {
public Long call() throws IOException {
- Get get = new Get(row);
- get.addColumn(family, qualifier);
return server.incrementColumnValue(
location.getRegionInfo().getRegionName(), row, family,
- qualifier, amount);
+ qualifier, amount, writeToWAL);
}
}
);
Index: src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 787338)
+++ src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy)
@@ -147,11 +147,12 @@
* @param family
* @param qualifier
* @param amount
+ * @param writeToWAL whether to write the increment to the WAL
* @return new incremented column value
* @throws IOException
*/
public long incrementColumnValue(byte [] regionName, byte [] row,
- byte [] family, byte [] qualifier, long amount)
+ byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException;
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 787338)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -2277,7 +2277,7 @@
* @throws IOException
*/
public long incrementColumnValue(byte [] row, byte [] family,
- byte [] qualifier, long amount)
+ byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
checkRow(row);
@@ -2287,10 +2287,19 @@
long result = 0L;
try {
Store store = stores.get(family);
-
+
Store.ValueAndSize vas =
store.incrementColumnValue(row, family, qualifier, amount);
+ if (writeToWAL) {
+ long now = System.currentTimeMillis();
+ List edits = new ArrayList(1);
+ edits.add(vas.kv);
+ this.log.append(regionInfo.getRegionName(),
+ regionInfo.getTableDesc().getName(), edits,
+ (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+ }
+
result = vas.value;
long size = this.memcacheSize.addAndGet(vas.sizeAdded);
flush = isFlushSize(size);
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 787338)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -2392,7 +2392,7 @@
/** {@inheritDoc} */
public long incrementColumnValue(byte [] regionName, byte [] row,
- byte [] family, byte [] qualifier, long amount)
+ byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
checkOpen();
@@ -2403,7 +2403,8 @@
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
- return region.incrementColumnValue(row, family, qualifier, amount);
+ return region.incrementColumnValue(row, family, qualifier, amount,
+ writeToWAL);
} catch (IOException e) {
checkFileSystem();
throw e;
Index: src/java/org/apache/hadoop/hbase/regionserver/Store.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 787338)
+++ src/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy)
@@ -1498,9 +1498,11 @@
public static class ValueAndSize {
public long value;
public long sizeAdded;
- public ValueAndSize(long value, long sizeAdded) {
+ public KeyValue kv;
+ public ValueAndSize(long value, long sizeAdded, KeyValue kv) {
this.value = value;
this.sizeAdded = sizeAdded;
+ this.kv = kv;
}
}
@@ -1536,7 +1538,7 @@
value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
Bytes.SIZEOF_LONG);
- return new ValueAndSize(value, 0);
+ return new ValueAndSize(value, 0, kv);
}
// Check if we even have storefiles
if(this.storefiles.isEmpty()) {
@@ -1567,6 +1569,6 @@
System.currentTimeMillis(),
Bytes.toBytes(newValue));
add(newKv);
- return new ValueAndSize(newValue, newKv.heapSize());
+ return new ValueAndSize(newValue, newKv.heapSize(), newKv);
}
}