diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 06f93e1..0064147 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4426,11 +4426,7 @@ public class HRegion implements HeapSize { // , Writable{
// TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that.
/**
- *
* Perform one or more append operations on a row.
- *
- * Appends performed are done under row lock but reads do not take locks out
- * so this can be seen partially complete by gets and scans.
*
* @param append
* @param lockid
@@ -4440,7 +4436,6 @@ public class HRegion implements HeapSize { // , Writable{
*/
public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of appends atomic to reads
byte[] row = append.getRow();
checkRow(row, "append");
boolean flush = false;
@@ -4454,9 +4449,15 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation();
this.writeRequestsCount.increment();
+ MultiVersionConsistencyControl.WriteEntry w = null;
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4518,7 +4519,7 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
newKV.getBuffer(), newKV.getQualifierOffset(),
kv.getQualifierLength());
-
+ newKV.setMemstoreTS(w.getWriteNumber());
kvs.add(newKV);
// Append update to WAL
@@ -4547,7 +4548,15 @@ public class HRegion implements HeapSize { // , Writable{
// Actually write to Memstore now
for (Map.Entry> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
- size += store.upsert(entry.getValue());
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (KeyValue kv : entry.getValue()) {
+ size += store.add(kv);
+ }
+ }
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -4560,13 +4569,15 @@ public class HRegion implements HeapSize { // , Writable{
syncOrDefer(txid); // sync the transaction log outside the rowlock
}
} finally {
+ if (w != null) {
+ mvcc.completeMemstoreInsert(w);
+ }
closeRegionOperation();
}
-
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4576,11 +4587,8 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
- *
* Perform one or more increment operations on a row.
- *
- * Increments performed are done under row lock but reads do not take locks
- * out so this can be seen partially complete by gets and scans.
+ *
* @param increment
* @param lockid
* @param writeToWAL
@@ -4590,7 +4598,6 @@ public class HRegion implements HeapSize { // , Writable{
public Result increment(Increment increment, Integer lockid,
boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of increments atomic to reads
byte [] row = increment.getRow();
checkRow(row, "increment");
TimeRange tr = increment.getTimeRange();
@@ -4605,9 +4612,15 @@ public class HRegion implements HeapSize { // , Writable{
// Lock row
startRegionOperation();
this.writeRequestsCount.increment();
+ MultiVersionConsistencyControl.WriteEntry w = null;
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4646,6 +4659,7 @@ public class HRegion implements HeapSize { // , Writable{
// Append new incremented KeyValue to list
KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
now, Bytes.toBytes(amount));
+ newKV.setMemstoreTS(w.getWriteNumber());
kvs.add(newKV);
// Append update to WAL
@@ -4674,7 +4688,15 @@ public class HRegion implements HeapSize { // , Writable{
//Actually write to Memstore now
for (Map.Entry> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
- size += store.upsert(entry.getValue());
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (KeyValue kv : entry.getValue()) {
+ size += store.add(kv);
+ }
+ }
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -4687,11 +4709,14 @@ public class HRegion implements HeapSize { // , Writable{
syncOrDefer(txid); // sync the transaction log outside the rowlock
}
} finally {
+ if (w != null) {
+ mvcc.completeMemstoreInsert(w);
+ }
closeRegionOperation();
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
-
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -4712,96 +4737,21 @@ public class HRegion implements HeapSize { // , Writable{
public long incrementColumnValue(byte [] row, byte [] family,
byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
- // to be used for metrics
- long before = EnvironmentEdgeManager.currentTimeMillis();
-
- checkRow(row, "increment");
- boolean flush = false;
- boolean wrongLength = false;
- long txid = 0;
- // Lock row
- long result = amount;
- startRegionOperation();
- this.writeRequestsCount.increment();
- try {
- Integer lid = obtainRowLock(row);
- this.updatesLock.readLock().lock();
- try {
- Store store = stores.get(family);
-
- // Get the old value:
- Get get = new Get(row);
- get.addColumn(family, qualifier);
-
- // we don't want to invoke coprocessor in this case; ICV is wrapped
- // in HRegionServer, so we leave getLastIncrement alone
- List results = get(get, false);
-
- if (!results.isEmpty()) {
- KeyValue kv = results.get(0);
- if(kv.getValueLength() == Bytes.SIZEOF_LONG){
- byte [] buffer = kv.getBuffer();
- int valueOffset = kv.getValueOffset();
- result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
- }
- else{
- wrongLength = true;
- }
- }
- if(!wrongLength){
- // build the KeyValue now:
- KeyValue newKv = new KeyValue(row, family,
- qualifier, EnvironmentEdgeManager.currentTimeMillis(),
- Bytes.toBytes(result));
-
- // now log it:
- if (writeToWAL) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- WALEdit walEdit = new WALEdit();
- walEdit.add(newKv);
- // Using default cluster id, as this can only happen in the
- // orginating cluster. A slave cluster receives the final value (not
- // the delta) as a Put.
- txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
- walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
- this.htableDescriptor);
- }
-
- // Now request the ICV to the store, this will set the timestamp
- // appropriately depending on if there is a value in memcache or not.
- // returns the change in the size of the memstore from operation
- long size = store.updateColumnValue(row, family, qualifier, result);
-
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
- }
- } finally {
- this.updatesLock.readLock().unlock();
- releaseRowLock(lid);
- }
- if (writeToWAL) {
- syncOrDefer(txid); // sync the transaction log outside the rowlock
- }
- } finally {
- closeRegionOperation();
- }
-
- // do after lock
- long after = EnvironmentEdgeManager.currentTimeMillis();
- this.opMetrics.updateIncrementColumnValueMetrics(family, after - before);
-
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
- }
- if(wrongLength){
- throw new DoNotRetryIOException(
- "Attempted to increment field that isn't 64 bits wide");
+ Increment increment = new Increment(row);
+ increment.addColumn(family, qualifier, amount);
+ Result result = this.increment(increment, null, writeToWAL);
+ List kvs = result.getColumn(family, qualifier);
+ if (kvs.size() == 0) {
+ throw new RuntimeException("Result from Increment is empty.");
+ }
+ KeyValue kv = kvs.get(0);
+ if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
+ return Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
+ } else {
+ throw new RuntimeException("Result from Increment has incorrect size.");
}
- return result;
}
-
//
// New HBASE-880 Helpers
//
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 9ee6720..c7622ff 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -496,8 +496,8 @@ public class MemStore implements HeapSize {
// create or update (upsert) a new KeyValue with
// 'now' and a 0 memstoreTS == immediately visible
return upsert(Arrays.asList(
- new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
- );
+ new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))),
+ 1L);
} finally {
this.lock.readLock().unlock();
}
@@ -518,15 +518,15 @@ public class MemStore implements HeapSize {
* atomically. Scans will only see each KeyValue update as atomic.
*
* @param kvs
+ * @param readPoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
- public long upsert(List kvs) {
+ public long upsert(List kvs, long readPoint) {
this.lock.readLock().lock();
try {
long size = 0;
for (KeyValue kv : kvs) {
- kv.setMemstoreTS(0);
- size += upsert(kv);
+ size += upsert(kv, readPoint);
}
return size;
} finally {
@@ -546,9 +546,10 @@ public class MemStore implements HeapSize {
* Callers must hold the read lock.
*
* @param kv
+ * @param readPoint readpoint below which we can safely remove duplicate KVs
* @return change in size of MemStore
*/
- private long upsert(KeyValue kv) {
+ private long upsert(KeyValue kv, long readPoint) {
// Add the KeyValue to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
@@ -565,6 +566,8 @@ public class MemStore implements HeapSize {
kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
SortedSet ss = kvset.tailSet(firstKv);
Iterator it = ss.iterator();
+ // versions visible to oldest scanner
+ int versionsVisible = 0;
while ( it.hasNext() ) {
KeyValue cur = it.next();
@@ -572,23 +575,22 @@ public class MemStore implements HeapSize {
// ignore the one just put in
continue;
}
- // if this isn't the row we are interested in, then bail
- if (!kv.matchingRow(cur)) {
- break;
- }
-
- // if the qualifier matches and it's a put, remove it
- if (kv.matchingQualifier(cur)) {
-
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getMemstoreTS() == 0) {
- // false means there was a change, so give us the size.
- addedSize -= heapSizeChange(kv, true);
- it.remove();
+ // check that this is the row and column we are interested in, otherwise bail
+ if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+ // only remove Puts that concurrent scanners cannot possibly see
+ if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() <= readPoint) {
+ if (versionsVisible > 1) {
+ // if we get here we have seen at least one version visible to the oldest scanner,
+ // which means we can prove that no scanner will see this version
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(cur, true);
+ it.remove();
+ } else {
+ versionsVisible++;
+ }
}
} else {
- // past the column, done
+ // past the row or column, done
break;
}
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index f9e1103..e8eee31 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -2245,12 +2245,11 @@ public class Store extends SchemaConfigured implements HeapSize {
* @return memstore size delta
* @throws IOException
*/
- public long upsert(List kvs)
+ public long upsert(List kvs, long readPoint)
throws IOException {
this.lock.readLock().lock();
try {
- // TODO: Make this operation atomic w/ MVCC
- return this.memstore.upsert(kvs);
+ return this.memstore.upsert(kvs, readPoint);
} finally {
this.lock.readLock().unlock();
}
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 6cbb2bc..69f7814 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -128,8 +129,9 @@ public class TestAtomicOperation extends HBaseTestCase {
assertEquals(value+amount, result);
Store store = region.getStore(fam1);
- // ICV removes any extra values floating around in there.
- assertEquals(1, store.memstore.kvset.size());
+ // Leaves the old value in place during upsert since the get during the increment
+ // uses it.
+ assertEquals(2, store.memstore.kvset.size());
assertTrue(store.memstore.snapshot.isEmpty());
assertICV(row, fam1, qual1, value+amount);
@@ -141,7 +143,8 @@ public class TestAtomicOperation extends HBaseTestCase {
public void testIncrementMultiThreads() throws IOException {
LOG.info("Starting test testIncrementMultiThreads");
- initHRegion(tableName, getName(), fam1);
+ // Run with mixed column families (1 and 3).
+ initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2);
// create 100 threads, each will increment by its own quantity
int numThreads = 100;
@@ -151,7 +154,7 @@ public class TestAtomicOperation extends HBaseTestCase {
// create all threads
for (int i = 0; i < numThreads; i++) {
- all[i] = new Incrementer(region, i, i, incrementsPerThread);
+ all[i] = new Incrementer(region, i, incrementsPerThread);
expectedTotal += (i * incrementsPerThread);
}
@@ -168,11 +171,12 @@ public class TestAtomicOperation extends HBaseTestCase {
}
}
assertICV(row, fam1, qual1, expectedTotal);
+ assertICV(row, fam1, qual2, expectedTotal * 2);
+ assertICV(row, fam2, qual3, expectedTotal * 3);
LOG.info("testIncrementMultiThreads successfully verified that total is " +
expectedTotal);
}
-
private void assertICV(byte [] row,
byte [] familiy,
byte[] qualifier,
@@ -189,17 +193,19 @@ public class TestAtomicOperation extends HBaseTestCase {
}
private void initHRegion (byte [] tableName, String callingMethod,
- byte[] ... families)
- throws IOException {
- initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+ byte[] ... families) throws IOException {
+ initHRegion(tableName, callingMethod, null, families);
}
private void initHRegion (byte [] tableName, String callingMethod,
- Configuration conf, byte [] ... families)
+ int[] maxVersions, byte [] ... families)
throws IOException{
HTableDescriptor htd = new HTableDescriptor(tableName);
+ int i = 0;
for(byte [] family : families) {
- htd.addFamily(new HColumnDescriptor(family));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
+ htd.addFamily(hcd);
}
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
Path path = new Path(DIR + callingMethod);
@@ -208,7 +214,7 @@ public class TestAtomicOperation extends HBaseTestCase {
throw new IOException("Failed delete of " + path);
}
}
- region = HRegion.createHRegion(info, path, conf, htd);
+ region = HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd);
}
/**
@@ -217,18 +223,12 @@ public class TestAtomicOperation extends HBaseTestCase {
public static class Incrementer extends Thread {
private final HRegion region;
- private final int threadNumber;
private final int numIncrements;
private final int amount;
- private int count;
-
- public Incrementer(HRegion region,
- int threadNumber, int amount, int numIncrements) {
+ public Incrementer(HRegion region, int amount, int numIncrements) {
this.region = region;
- this.threadNumber = threadNumber;
this.numIncrements = numIncrements;
- this.count = 0;
this.amount = amount;
setDaemon(true);
}
@@ -237,16 +237,81 @@ public class TestAtomicOperation extends HBaseTestCase {
public void run() {
for (int i=0; i