Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1403613) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; @@ -116,7 +115,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics; import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage; @@ -291,6 +290,27 @@ } return minimumReadPoint; } + /** + * @return The smallest mvcc readPoint across all the scanners in this + * region. Writes newer than this readPoint, are guaranteed not to be seen + * by any current scanner. + */ + public long getLargestReadPoint() { + long maximumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + maximumReadPoint = mvcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint > maximumReadPoint) { + maximumReadPoint = readPoint; + } + } + } + return maximumReadPoint; + } /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -4575,6 +4595,12 @@ try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish + // (so that we are guaranteed to see the latest state) + WriteEntry w = mvcc.beginMemstoreInsert(); + mvcc.completeMemstoreInsert(w); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4637,6 +4663,7 @@ newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); + newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Append update to WAL @@ -4666,12 +4693,13 @@ //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); - size += store.upsert(entry.getValue()); + size += store.upsert(entry.getValue(), getSmallestReadPoint()); allKVs.addAll(entry.getValue()); } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { + mvcc.completeMemstoreInsert(w); this.updatesLock.readLock().unlock(); releaseRowLock(lid); } @@ -4728,6 +4756,12 @@ try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish + // (so that we are guaranteed to see the latest state) + WriteEntry w = mvcc.beginMemstoreInsert(); + mvcc.completeMemstoreInsert(w); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4766,6 +4800,7 @@ // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), now, Bytes.toBytes(amount)); + newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Prepare WAL updates @@ -4794,12 +4829,13 @@ //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); - size += store.upsert(entry.getValue()); + size += store.upsert(entry.getValue(), getSmallestReadPoint()); allKVs.addAll(entry.getValue()); } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { + mvcc.completeMemstoreInsert(w); this.updatesLock.readLock().unlock(); releaseRowLock(lid); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1403613) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -2054,11 +2054,10 @@ } @Override - public long upsert(Iterable kvs) throws IOException { + public long upsert(Iterable kvs, long readpoint) throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ MVCC - return this.memstore.upsert(kvs); + return this.memstore.upsert(kvs, readpoint); } finally { this.lock.readLock().unlock(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1403613) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -435,6 +435,8 @@ } /** + * This now only used by tests. + * * Given the specs of a column, update it, first by inserting a new record, * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying @@ -497,7 +499,7 @@ // create or update (upsert) a new KeyValue with // 'now' and a 0 memstoreTS == immediately visible return upsert(Arrays.asList( - new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))) + new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), 1L ); } finally { this.lock.readLock().unlock(); @@ -521,13 +523,12 @@ * @param kvs * @return change in memstore size */ - public long upsert(Iterable kvs) { + public long upsert(Iterable kvs, long readpoint) { this.lock.readLock().lock(); try { long size = 0; for (KeyValue kv : kvs) { - kv.setMemstoreTS(0); - size += upsert(kv); + size += upsert(kv, readpoint); } return size; } finally { @@ -549,7 +550,7 @@ * @param kv * @return change in size of MemStore */ - private long upsert(KeyValue kv) { + private long upsert(KeyValue kv, long readpoint) { // Add the KeyValue to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -580,12 +581,11 @@ // if the qualifier matches and it's a put, remove it if (kv.matchingQualifier(cur)) { - - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { + // only remove Puts for which we can strictly guarantee + // that each scanner sees the newer version (readpoint - 1) + if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() < readpoint - 1) { // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); + addedSize -= heapSizeChange(cur, true); it.remove(); } } else { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1403613) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -75,6 +75,8 @@ throws IOException; /** + * Now only used in tests + * * Updates the value for the given row/family/qualifier. This function will always be seen as * atomic by other readers because it only puts a single KV to memstore. Thus no read/write * control necessary. @@ -100,7 +102,7 @@ * @return memstore size delta * @throws IOException */ - public long upsert(Iterable kvs) throws IOException; + public long upsert(Iterable kvs, long readpoint) throws IOException; /** * Adds a value to the memstore Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1403613) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -108,7 +108,7 @@ public void testIncrementMultiThreads() throws IOException { LOG.info("Starting test testIncrementMultiThreads"); - initHRegion(tableName, getName(), fam1); + initHRegion(tableName, getName(), fam1, fam2); // create 100 threads, each will increment by its own quantity int numThreads = 100; @@ -135,6 +135,8 @@ } } assertICV(row, fam1, qual1, expectedTotal); + assertICV(row, fam1, qual2, expectedTotal*2); + assertICV(row, fam2, qual3, expectedTotal*3); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } @@ -184,18 +186,14 @@ public static class Incrementer extends Thread { private final HRegion region; - private final int threadNumber; private final int numIncrements; private final int amount; - private int count; public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) { this.region = region; - this.threadNumber = threadNumber; this.numIncrements = numIncrements; - this.count = 0; this.amount = amount; setDaemon(true); } @@ -206,12 +204,18 @@ try { Increment inc = new Increment(row); inc.addColumn(fam1, qual1, amount); - Result result = region.increment(inc, null, true); - // LOG.info("thread:" + threadNumber + " iter:" + i); + inc.addColumn(fam1, qual2, amount*2); + inc.addColumn(fam2, qual3, amount*3); + region.increment(inc, null, true); + + // verify: Make sure we only see completed increments + Get g = new Get(row); + Result result = region.get(g, null); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); + assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); } catch (IOException e) { e.printStackTrace(); } - count++; } } }