diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 06f93e1..0064147 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4426,11 +4426,7 @@ public class HRegion implements HeapSize { // , Writable{ // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** - * * Perform one or more append operations on a row. - *

- * Appends performed are done under row lock but reads do not take locks out - * so this can be seen partially complete by gets and scans. * * @param append * @param lockid @@ -4440,7 +4436,6 @@ public class HRegion implements HeapSize { // , Writable{ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use MVCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; @@ -4454,9 +4449,15 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(); this.writeRequestsCount.increment(); + MultiVersionConsistencyControl.WriteEntry w = null; try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4518,7 +4519,7 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); - + newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Append update to WAL @@ -4547,7 +4548,15 @@ public class HRegion implements HeapSize { // , Writable{ // Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); - size += store.upsert(entry.getValue()); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (KeyValue kv : entry.getValue()) { + size += store.add(kv); + } + } allKVs.addAll(entry.getValue()); } size = this.addAndGetGlobalMemstoreSize(size); @@ -4560,13 +4569,15 @@ public class HRegion implements HeapSize { // , Writable{ syncOrDefer(txid); // sync the transaction log outside the rowlock } } finally { + if (w != null) { + mvcc.completeMemstoreInsert(w); + } closeRegionOperation(); } - long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before); - + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -4576,11 +4587,8 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * * Perform one or more increment operations on a row. - *

- * Increments performed are done under row lock but reads do not take locks - * out so this can be seen partially complete by gets and scans. + * * @param increment * @param lockid * @param writeToWAL @@ -4590,7 +4598,6 @@ public class HRegion implements HeapSize { // , Writable{ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use MVCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); @@ -4605,9 +4612,15 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(); this.writeRequestsCount.increment(); + MultiVersionConsistencyControl.WriteEntry w = null; try { Integer lid = getLock(lockid, row, true); this.updatesLock.readLock().lock(); + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + // now start my own transaction + w = mvcc.beginMemstoreInsert(); try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4646,6 +4659,7 @@ public class HRegion implements HeapSize { // , Writable{ // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(), now, Bytes.toBytes(amount)); + newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Append update to WAL @@ -4674,7 +4688,15 @@ public class HRegion implements HeapSize { // , Writable{ //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); - size += store.upsert(entry.getValue()); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (KeyValue kv : entry.getValue()) { + size += store.add(kv); + } + } allKVs.addAll(entry.getValue()); } size = this.addAndGetGlobalMemstoreSize(size); @@ -4687,11 +4709,14 @@ public class HRegion implements HeapSize { // , Writable{ syncOrDefer(txid); // sync the transaction log outside the rowlock } } finally { + if (w != null) { + mvcc.completeMemstoreInsert(w); + } closeRegionOperation(); long after = EnvironmentEdgeManager.currentTimeMillis(); this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before); } - + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -4712,96 +4737,21 @@ public class HRegion implements HeapSize { // , Writable{ public long incrementColumnValue(byte [] row, byte [] family, byte [] qualifier, long amount, boolean writeToWAL) throws IOException { - // to be used for metrics - long before = EnvironmentEdgeManager.currentTimeMillis(); - - checkRow(row, "increment"); - boolean flush = false; - boolean wrongLength = false; - long txid = 0; - // Lock row - long result = amount; - startRegionOperation(); - this.writeRequestsCount.increment(); - try { - Integer lid = obtainRowLock(row); - this.updatesLock.readLock().lock(); - try { - Store store = stores.get(family); - - // Get the old value: - Get get = new Get(row); - get.addColumn(family, qualifier); - - // we don't want to invoke coprocessor in this case; ICV is wrapped - // in HRegionServer, so we leave getLastIncrement alone - List results = get(get, false); - - if (!results.isEmpty()) { - KeyValue kv = results.get(0); - if(kv.getValueLength() == Bytes.SIZEOF_LONG){ - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); - } - else{ - wrongLength = true; - } - } - if(!wrongLength){ - // build the KeyValue now: - KeyValue newKv = new KeyValue(row, family, - qualifier, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(result)); - - // now log it: - if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); - // Using default cluster id, as this can only happen in the - // orginating cluster. A slave cluster receives the final value (not - // the delta) as a Put. - txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), - walEdit, HConstants.DEFAULT_CLUSTER_ID, now, - this.htableDescriptor); - } - - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the change in the size of the memstore from operation - long size = store.updateColumnValue(row, family, qualifier, result); - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } - } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); - } - if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock - } - } finally { - closeRegionOperation(); - } - - // do after lock - long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateIncrementColumnValueMetrics(family, after - before); - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - if(wrongLength){ - throw new DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); + Increment increment = new Increment(row); + increment.addColumn(family, qualifier, amount); + Result result = this.increment(increment, null, writeToWAL); + List kvs = result.getColumn(family, qualifier); + if (kvs.size() == 0) { + throw new RuntimeException("Result from Increment is empty."); + } + KeyValue kv = kvs.get(0); + if(kv.getValueLength() == Bytes.SIZEOF_LONG) { + return Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG); + } else { + throw new RuntimeException("Result from Increment has incorrect size."); } - return result; } - // // New HBASE-880 Helpers // diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 9ee6720..c7622ff 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -496,8 +496,8 @@ public class MemStore implements HeapSize { // create or update (upsert) a new KeyValue with // 'now' and a 0 memstoreTS == immediately visible return upsert(Arrays.asList( - new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))) - ); + new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), + 1L); } finally { this.lock.readLock().unlock(); } @@ -518,15 +518,15 @@ public class MemStore implements HeapSize { * atomically. Scans will only see each KeyValue update as atomic. * * @param kvs + * @param readPoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ - public long upsert(List kvs) { + public long upsert(List kvs, long readPoint) { this.lock.readLock().lock(); try { long size = 0; for (KeyValue kv : kvs) { - kv.setMemstoreTS(0); - size += upsert(kv); + size += upsert(kv, readPoint); } return size; } finally { @@ -546,9 +546,10 @@ public class MemStore implements HeapSize { * Callers must hold the read lock. * * @param kv + * @param readPoint readpoint below which we can safely remove duplicate KVs * @return change in size of MemStore */ - private long upsert(KeyValue kv) { + private long upsert(KeyValue kv, long readPoint) { // Add the KeyValue to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -565,6 +566,8 @@ public class MemStore implements HeapSize { kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); SortedSet ss = kvset.tailSet(firstKv); Iterator it = ss.iterator(); + // versions visible to oldest scanner + int versionsVisible = 0; while ( it.hasNext() ) { KeyValue cur = it.next(); @@ -572,23 +575,22 @@ public class MemStore implements HeapSize { // ignore the one just put in continue; } - // if this isn't the row we are interested in, then bail - if (!kv.matchingRow(cur)) { - break; - } - - // if the qualifier matches and it's a put, remove it - if (kv.matchingQualifier(cur)) { - - // to be extra safe we only remove Puts that have a memstoreTS==0 - if (kv.getType() == KeyValue.Type.Put.getCode() && - kv.getMemstoreTS() == 0) { - // false means there was a change, so give us the size. - addedSize -= heapSizeChange(kv, true); - it.remove(); + // check that this is the row and column we are interested in, otherwise bail + if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() <= readPoint) { + if (versionsVisible > 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version + // false means there was a change, so give us the size. + addedSize -= heapSizeChange(cur, true); + it.remove(); + } else { + versionsVisible++; + } } } else { - // past the column, done + // past the row or column, done break; } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index f9e1103..e8eee31 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -2245,12 +2245,11 @@ public class Store extends SchemaConfigured implements HeapSize { * @return memstore size delta * @throws IOException */ - public long upsert(List kvs) + public long upsert(List kvs, long readPoint) throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ MVCC - return this.memstore.upsert(kvs); + return this.memstore.upsert(kvs, readPoint); } finally { this.lock.readLock().unlock(); } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 6cbb2bc..69f7814 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -128,8 +129,9 @@ public class TestAtomicOperation extends HBaseTestCase { assertEquals(value+amount, result); Store store = region.getStore(fam1); - // ICV removes any extra values floating around in there. - assertEquals(1, store.memstore.kvset.size()); + // Leaves the old value in place during upsert since the get during the increment + // uses it. + assertEquals(2, store.memstore.kvset.size()); assertTrue(store.memstore.snapshot.isEmpty()); assertICV(row, fam1, qual1, value+amount); @@ -141,7 +143,8 @@ public class TestAtomicOperation extends HBaseTestCase { public void testIncrementMultiThreads() throws IOException { LOG.info("Starting test testIncrementMultiThreads"); - initHRegion(tableName, getName(), fam1); + // Run with mixed column families (1 and 3). + initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2); // create 100 threads, each will increment by its own quantity int numThreads = 100; @@ -151,7 +154,7 @@ public class TestAtomicOperation extends HBaseTestCase { // create all threads for (int i = 0; i < numThreads; i++) { - all[i] = new Incrementer(region, i, i, incrementsPerThread); + all[i] = new Incrementer(region, i, incrementsPerThread); expectedTotal += (i * incrementsPerThread); } @@ -168,11 +171,12 @@ public class TestAtomicOperation extends HBaseTestCase { } } assertICV(row, fam1, qual1, expectedTotal); + assertICV(row, fam1, qual2, expectedTotal * 2); + assertICV(row, fam2, qual3, expectedTotal * 3); LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); } - private void assertICV(byte [] row, byte [] familiy, byte[] qualifier, @@ -189,17 +193,19 @@ public class TestAtomicOperation extends HBaseTestCase { } private void initHRegion (byte [] tableName, String callingMethod, - byte[] ... families) - throws IOException { - initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + byte[] ... families) throws IOException { + initHRegion(tableName, callingMethod, null, families); } private void initHRegion (byte [] tableName, String callingMethod, - Configuration conf, byte [] ... families) + int[] maxVersions, byte [] ... families) throws IOException{ HTableDescriptor htd = new HTableDescriptor(tableName); + int i = 0; for(byte [] family : families) { - htd.addFamily(new HColumnDescriptor(family)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); + htd.addFamily(hcd); } HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); Path path = new Path(DIR + callingMethod); @@ -208,7 +214,7 @@ public class TestAtomicOperation extends HBaseTestCase { throw new IOException("Failed delete of " + path); } } - region = HRegion.createHRegion(info, path, conf, htd); + region = HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd); } /** @@ -217,18 +223,12 @@ public class TestAtomicOperation extends HBaseTestCase { public static class Incrementer extends Thread { private final HRegion region; - private final int threadNumber; private final int numIncrements; private final int amount; - private int count; - - public Incrementer(HRegion region, - int threadNumber, int amount, int numIncrements) { + public Incrementer(HRegion region, int amount, int numIncrements) { this.region = region; - this.threadNumber = threadNumber; this.numIncrements = numIncrements; - this.count = 0; this.amount = amount; setDaemon(true); } @@ -237,16 +237,81 @@ public class TestAtomicOperation extends HBaseTestCase { public void run() { for (int i=0; i