Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1402882)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -81,7 +81,6 @@
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -116,7 +115,7 @@
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
@@ -4546,9 +4545,6 @@
/**
*
* Perform one or more append operations on a row.
- *
- * Appends performed are done under row lock but reads do not take locks out
- * so this can be seen partially complete by gets and scans.
*
* @param append
* @param lockid
@@ -4558,15 +4554,14 @@
*/
public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of appends atomic to reads
byte[] row = append.getRow();
checkRow(row, "append");
boolean flush = false;
WALEdit walEdits = null;
List allKVs = new ArrayList(append.size());
- Map> tempMemstore = new HashMap>();
+ Map> familyMap =
+ new TreeMap>(Bytes.BYTES_COMPARATOR);
long before = EnvironmentEdgeManager.currentTimeMillis();
- long size = 0;
long txid = 0;
// Lock row
@@ -4575,13 +4570,18 @@
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish
+ // (so that we are guaranteed to see the latest state)
+ WriteEntry w = mvcc.beginMemstoreInsert();
+ mvcc.completeMemstoreInsert(w);
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry> family : append.getFamilyMap()
.entrySet()) {
- Store store = stores.get(family.getKey());
List kvs = new ArrayList(family.getValue().size());
// Get previous values for all columns in this family
@@ -4648,8 +4648,9 @@
}
}
- //store the kvs to the temporary memstore before writing HLog
- tempMemstore.put(store, kvs);
+ // build the family to apply
+ familyMap.put(family.getKey(), kvs);
+ allKVs.addAll(kvs);
}
// Actually write to WAL now
@@ -4664,14 +4665,9 @@
}
//Actually write to Memstore now
- for (Map.Entry> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- size += store.upsert(entry.getValue());
- allKVs.addAll(entry.getValue());
- }
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
+ flush = isFlushSize(this.addAndGetGlobalMemstoreSize(applyFamilyMapToMemstore(familyMap, w)));
} finally {
+ mvcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock();
releaseRowLock(lid);
}
@@ -4685,22 +4681,18 @@
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
}
-
return append.isReturnResults() ? new Result(allKVs) : null;
}
/**
*
* Perform one or more increment operations on a row.
- *
- * Increments performed are done under row lock but reads do not take locks
- * out so this can be seen partially complete by gets and scans.
+ *
* @param increment
* @param lockid
* @param writeToWAL
@@ -4710,16 +4702,15 @@
public Result increment(Increment increment, Integer lockid,
boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of increments atomic to reads
byte [] row = increment.getRow();
checkRow(row, "increment");
TimeRange tr = increment.getTimeRange();
boolean flush = false;
WALEdit walEdits = null;
List allKVs = new ArrayList(increment.numColumns());
- Map> tempMemstore = new HashMap>();
+ Map> familyMap =
+ new TreeMap>(Bytes.BYTES_COMPARATOR);
long before = EnvironmentEdgeManager.currentTimeMillis();
- long size = 0;
long txid = 0;
// Lock row
@@ -4728,13 +4719,18 @@
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish
+ // (so that we are guaranteed to see the latest state)
+ WriteEntry w = mvcc.beginMemstoreInsert();
+ mvcc.completeMemstoreInsert(w);
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry> family :
increment.getFamilyMap().entrySet()) {
- Store store = stores.get(family.getKey());
List kvs = new ArrayList(family.getValue().size());
// Get previous values for all columns in this family
@@ -4777,8 +4773,9 @@
}
}
- //store the kvs to the temporary memstore before writing HLog
- tempMemstore.put(store, kvs);
+ // build the family to apply
+ familyMap.put(family.getKey(), kvs);
+ allKVs.addAll(kvs);
}
// Actually write to WAL now
@@ -4792,14 +4789,9 @@
}
//Actually write to Memstore now
- for (Map.Entry> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- size += store.upsert(entry.getValue());
- allKVs.addAll(entry.getValue());
- }
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
+ flush = isFlushSize(addAndGetGlobalMemstoreSize(applyFamilyMapToMemstore(familyMap, w)));
} finally {
+ mvcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock();
releaseRowLock(lid);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1402882)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy)
@@ -2033,37 +2033,6 @@
return this.region.regionInfo;
}
- @Override
- public long updateColumnValue(byte [] row, byte [] f,
- byte [] qualifier, long newValue)
- throws IOException {
-
- this.lock.readLock().lock();
- try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
-
- return this.memstore.updateColumnValue(row,
- f,
- qualifier,
- newValue,
- now);
-
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- @Override
- public long upsert(Iterable kvs) throws IOException {
- this.lock.readLock().lock();
- try {
- // TODO: Make this operation atomic w/ MVCC
- return this.memstore.upsert(kvs);
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
public StoreFlusher getStoreFlusher(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1402882)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy)
@@ -22,7 +22,6 @@
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -434,168 +433,6 @@
}
}
- /**
- * Given the specs of a column, update it, first by inserting a new record,
- * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
- * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
- * store will ensure that the insert/delete each are atomic. A scanner/reader will either
- * get the new value, or the old value and all readers will eventually only see the new
- * value after the old was removed.
- *
- * @param row
- * @param family
- * @param qualifier
- * @param newValue
- * @param now
- * @return Timestamp
- */
- public long updateColumnValue(byte[] row,
- byte[] family,
- byte[] qualifier,
- long newValue,
- long now) {
- this.lock.readLock().lock();
- try {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
- }
- }
-
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
-
- SortedSet ss = kvset.tailSet(firstKv);
- Iterator it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue kv = it.next();
-
- // if this isnt the row we are interested in, then bail:
- if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
- break; // rows dont match, bail.
- }
-
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
- now = kv.getTimestamp();
- }
- }
-
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- return upsert(Arrays.asList(
- new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
- );
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- /**
- * Update or insert the specified KeyValues.
- *
- * For each KeyValue, insert into MemStore. This will atomically upsert the
- * value for that row/family/qualifier. If a KeyValue did already exist,
- * it will then be removed.
- *
- * Currently the memstoreTS is kept at 0 so as each insert happens, it will
- * be immediately visible. May want to change this so it is atomic across
- * all KeyValues.
- *
- * This is called under row lock, so Get operations will still see updates
- * atomically. Scans will only see each KeyValue update as atomic.
- *
- * @param kvs
- * @return change in memstore size
- */
- public long upsert(Iterable kvs) {
- this.lock.readLock().lock();
- try {
- long size = 0;
- for (KeyValue kv : kvs) {
- kv.setMemstoreTS(0);
- size += upsert(kv);
- }
- return size;
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- /**
- * Inserts the specified KeyValue into MemStore and deletes any existing
- * versions of the same row/family/qualifier as the specified KeyValue.
- *
- * First, the specified KeyValue is inserted into the Memstore.
- *
- * If there are any existing KeyValues in this MemStore with the same row,
- * family, and qualifier, they are removed.
- *
- * Callers must hold the read lock.
- *
- * @param kv
- * @return change in size of MemStore
- */
- private long upsert(KeyValue kv) {
- // Add the KeyValue to the MemStore
- // Use the internalAdd method here since we (a) already have a lock
- // and (b) cannot safely use the MSLAB here without potentially
- // hitting OOME - see TestMemStore.testUpsertMSLAB for a
- // test that triggers the pathological case if we don't avoid MSLAB
- // here.
- long addedSize = internalAdd(kv);
-
- // Get the KeyValues for the row/family/qualifier regardless of timestamp.
- // For this case we want to clean up any other puts
- KeyValue firstKv = KeyValue.createFirstOnRow(
- kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
- kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
- kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
- SortedSet ss = kvset.tailSet(firstKv);
- Iterator it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue cur = it.next();
-
- if (kv == cur) {
- // ignore the one just put in
- continue;
- }
- // if this isn't the row we are interested in, then bail
- if (!kv.matchingRow(cur)) {
- break;
- }
-
- // if the qualifier matches and it's a put, remove it
- if (kv.matchingQualifier(cur)) {
-
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getMemstoreTS() == 0) {
- // false means there was a change, so give us the size.
- addedSize -= heapSizeChange(kv, true);
- it.remove();
- }
- } else {
- // past the column, done
- break;
- }
- }
- return addedSize;
- }
-
/*
* Immutable data structure to hold member found in set and the set it was
* found in. Include set because it is carrying context.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1402882)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy)
@@ -75,34 +75,6 @@
throws IOException;
/**
- * Updates the value for the given row/family/qualifier. This function will always be seen as
- * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
- * control necessary.
- * @param row row to update
- * @param f family to update
- * @param qualifier qualifier to update
- * @param newValue the new value to set into memstore
- * @return memstore size delta
- * @throws IOException
- */
- public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue)
- throws IOException;
-
- /**
- * Adds or replaces the specified KeyValues.
- *
- * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
- * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
- *
- * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
- * across all of them.
- * @param kvs
- * @return memstore size delta
- * @throws IOException
- */
- public long upsert(Iterable kvs) throws IOException;
-
- /**
* Adds a value to the memstore
* @param kv
* @return memstore size delta
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1402882)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy)
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -793,53 +791,7 @@
//scan.setTimeRange(28, 42);
//assertTrue(!memstore.shouldSeek(scan));
}
-
- ////////////////////////////////////
- //Test for upsert with MSLAB
- ////////////////////////////////////
- /**
- * Test a pathological pattern that shows why we can't currently
- * use the MSLAB for upsert workloads. This test inserts data
- * in the following pattern:
- *
- * - row0001 through row1000 (fills up one 2M Chunk)
- * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
- * to the first chunk
- * - row0003 through row1002 (another chunk, another dangling reference)
- *
- * This causes OOME pretty quickly if we use MSLAB for upsert
- * since each 2M chunk is held onto by a single reference.
- */
- public void testUpsertMSLAB() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.setBoolean(MemStore.USEMSLAB_KEY, true);
- memstore = new MemStore(conf, KeyValue.COMPARATOR);
-
- int ROW_SIZE = 2048;
- byte[] qualifier = new byte[ROW_SIZE - 4];
-
- MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
- for (int i = 0; i < 3; i++) { System.gc(); }
- long usageBefore = bean.getHeapMemoryUsage().getUsed();
-
- long size = 0;
- long ts=0;
-
- for (int newValue = 0; newValue < 1000; newValue++) {
- for (int row = newValue; row < newValue + 1000; row++) {
- byte[] rowBytes = Bytes.toBytes(row);
- size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
- }
- }
- System.out.println("Wrote " + ts + " vals");
- for (int i = 0; i < 3; i++) { System.gc(); }
- long usageAfter = bean.getHeapMemoryUsage().getUsed();
- System.out.println("Memory used: " + (usageAfter - usageBefore)
- + " (heapsize: " + memstore.heapSize() +
- " size: " + size + ")");
- }
-
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@@ -904,15 +856,6 @@
}
}
- private KeyValue getDeleteKV(byte [] row) {
- return new KeyValue(row, Bytes.toBytes("test_col"), null,
- HConstants.LATEST_TIMESTAMP, KeyValue.Type.Delete, null);
- }
-
- private KeyValue getKV(byte [] row, byte [] value) {
- return new KeyValue(row, Bytes.toBytes("test_col"), null,
- HConstants.LATEST_TIMESTAMP, value);
- }
private static void addRows(int count, final MemStore mem) {
long nanos = System.nanoTime();
@@ -932,7 +875,6 @@
}
}
-
static void doScan(MemStore ms, int iteration) throws IOException {
long nanos = System.nanoTime();
KeyValueScanner s = ms.getScanners().get(0);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1402882)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy)
@@ -45,7 +45,6 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -60,7 +59,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.util.Progressable;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@@ -400,189 +398,12 @@
}
}
- //////////////////////////////////////////////////////////////////////////////
- // IncrementColumnValue tests
- //////////////////////////////////////////////////////////////////////////////
- /*
- * test the internal details of how ICV works, especially during a flush scenario.
- */
- public void testIncrementColumnValue_ICVDuringFlush()
- throws IOException, InterruptedException {
- init(this.getName());
-
- long oldValue = 1L;
- long newValue = 3L;
- this.store.add(new KeyValue(row, family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // snapshot the store.
- this.store.snapshot();
-
- // add other things:
- this.store.add(new KeyValue(row, family, qf2,
- System.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // update during the snapshot.
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
- // memstore should have grown by some amount.
- assertTrue(ret > 0);
-
- // then flush.
- flushStore(store, id++);
- assertEquals(1, this.store.getStorefiles().size());
- // from the one we inserted up there, and a new one
- assertEquals(2, this.store.memstore.kvset.size());
-
- // how many key/values for this row are there?
- Get get = new Get(row);
- get.addColumn(family, qf1);
- get.setMaxVersions(); // all versions.
- List results = new ArrayList();
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- long ts1 = results.get(0).getTimestamp();
- long ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
-
- assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
- assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
- }
-
@Override
protected void tearDown() throws Exception {
super.tearDown();
EnvironmentEdgeManagerTestHelper.reset();
}
- public void testICV_negMemstoreSize() throws IOException {
- init(this.getName());
-
- long time = 100;
- ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
- ee.setValue(time);
- EnvironmentEdgeManagerTestHelper.injectEdge(ee);
- long newValue = 3L;
- long size = 0;
-
-
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
- size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue)));
-
-
- for ( int i = 0 ; i < 10000 ; ++i) {
- newValue++;
-
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
- long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
-
- if (ret != 0) System.out.println("ret: " + ret);
- if (ret2 != 0) System.out.println("ret2: " + ret2);
-
- assertTrue("ret: " + ret, ret >= 0);
- size += ret;
- assertTrue("ret2: " + ret2, ret2 >= 0);
- size += ret2;
-
-
- if (i % 1000 == 0)
- ee.setValue(++time);
- }
-
- long computedSize=0;
- for (KeyValue kv : this.store.memstore.kvset) {
- long kvsize = this.store.memstore.heapSizeChange(kv, true);
- //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
- computedSize += kvsize;
- }
- assertEquals(computedSize, size);
- }
-
- public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
- ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
- EnvironmentEdgeManagerTestHelper.injectEdge(mee);
- init(this.getName());
-
- long oldValue = 1L;
- long newValue = 3L;
- this.store.add(new KeyValue(row, family, qf1,
- EnvironmentEdgeManager.currentTimeMillis(),
- Bytes.toBytes(oldValue)));
-
- // snapshot the store.
- this.store.snapshot();
-
- // update during the snapshot, the exact same TS as the Put (lololol)
- long ret = this.store.updateColumnValue(row, family, qf1, newValue);
-
- // memstore should have grown by some amount.
- assertTrue(ret > 0);
-
- // then flush.
- flushStore(store, id++);
- assertEquals(1, this.store.getStorefiles().size());
- assertEquals(1, this.store.memstore.kvset.size());
-
- // now increment again:
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- // the second TS should be TS=2 or higher., even though 'time=1' right now.
-
-
- // how many key/values for this row are there?
- Get get = new Get(row);
- get.addColumn(family, qf1);
- get.setMaxVersions(); // all versions.
- List results = new ArrayList();
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- long ts1 = results.get(0).getTimestamp();
- long ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
- assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
- assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
-
- mee.setValue(2); // time goes up slightly
- newValue += 1;
- this.store.updateColumnValue(row, family, qf1, newValue);
-
- results = HBaseTestingUtility.getFromStoreFile(store, get);
- assertEquals(2, results.size());
-
- ts1 = results.get(0).getTimestamp();
- ts2 = results.get(1).getTimestamp();
-
- assertTrue(ts1 > ts2);
- assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
- assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
- }
-
public void testHandleErrorsInFlush() throws Exception {
LOG.info("Setting up a faulty file system that cannot write");