Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1354813) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3827,8 +3827,8 @@ boolean flush = false; WALEdit walEdits = null; List allKVs = new ArrayList(increment.numColumns()); - List kvs = new ArrayList(increment.numColumns()); - long now = EnvironmentEdgeManager.currentTimeMillis(); + Map> tempMemstore = new HashMap>(); + long before = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; // Lock row @@ -3838,11 +3838,13 @@ Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); try { + long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family : increment.getFamilyMap().entrySet()) { Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); // Get previous values for all columns in this family Get get = new Get(row); @@ -3877,10 +3879,8 @@ } } - // Write the KVs for this family into the store - size += store.upsert(kvs); - allKVs.addAll(kvs); - kvs.clear(); + // store the kvs to the temporary memstore before writing HLog + tempMemstore.put(store, kvs); } // Actually write to WAL now @@ -3889,10 +3889,16 @@ // cluster. A slave cluster receives the final value (not the delta) // as a Put. this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, now, + walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } + // Actually write to Memstore now + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + size += store.upsert(entry.getValue()); + allKVs.addAll(entry.getValue()); + } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1354813) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -3419,7 +3420,99 @@ this.region = null; } } + + /** + * TestCase for increment + * + */ + private static class Incrementer implements Runnable { + private HRegion region; + private final static byte[] incRow = Bytes.toBytes("incRow"); + private final static byte[] family = Bytes.toBytes("family"); + private final static byte[] qualifier = Bytes.toBytes("qualifier"); + private final static long ONE = 1l; + private int incCounter; + public Incrementer(HRegion region, int incCounter) { + this.region = region; + this.incCounter = incCounter; + } + + @Override + public void run() { + int count = 0; + while (count < incCounter) { + Increment inc = new Increment(incRow); + inc.addColumn(family, qualifier, ONE); + count++; + try { + region.increment(inc, null, true); + } catch (IOException e) { + e.printStackTrace(); + break; + } + } + } + } + + /** + * Test case to check increment function with memstore flushing + * @throws Exception + */ + @Test + public void testParallelIncrementWithMemStoreFlush() throws Exception { + Configuration conf = HBaseConfiguration.create(); + String method = "testParallelIncrementWithMemStoreFlush"; + byte[] tableName = Bytes.toBytes(method); + byte[] family = Incrementer.family; + this.region = initHRegion(tableName, method, conf, family); + final HRegion region = this.region; + final AtomicBoolean incrementDone = new AtomicBoolean(false); + Runnable reader = new Runnable() { + @Override + public void run() { + while (!incrementDone.get()) { + try { + region.flushcache(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }; + + //after all increment finished, the row will increment to 20*100 = 2000 + int threadNum = 20; + int incCounter = 100; + long expected = threadNum * incCounter; + Thread[] incrementers = new Thread[threadNum]; + Thread flushThread = new Thread(reader); + for (int i = 0; i < threadNum; i++) { + incrementers[i] = new Thread(new Incrementer(this.region, incCounter)); + incrementers[i].start(); + } + flushThread.start(); + for (int i = 0; i < threadNum; i++) { + incrementers[i].join(); + } + + incrementDone.set(true); + flushThread.join(); + + Get get = new Get(Incrementer.incRow); + get.addColumn(Incrementer.family, Incrementer.qualifier); + get.setMaxVersions(1); + Result res = this.region.get(get, null); + List kvs = res.getColumn(Incrementer.family, + Incrementer.qualifier); + + //we just got the latest version + assertEquals(kvs.size(), 1); + KeyValue kv = kvs.get(0); + assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset())); + this.region = null; + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException {