diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 3da0c0b..7b10aa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -99,6 +99,7 @@ public class DefaultMemStore implements MemStore { volatile MemStoreLAB allocator; volatile MemStoreLAB snapshotAllocator; volatile long snapshotId; + volatile long snapshotOpSeqId; /** * Default constructor. Used for tests. @@ -145,6 +146,16 @@ public class DefaultMemStore implements MemStore { */ @Override public MemStoreSnapshot snapshot() { + return snapshot(HConstants.NO_SEQNUM); + } + + /** + * Creates a snapshot of the current memstore. + * Snapshot must be cleared by call to {@link #clearSnapshot(long)} + * @param flushOpSeqId The sequence id of the flush operation. + */ + @Override + public MemStoreSnapshot snapshot(long flushOpSeqId) { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. if (!this.snapshot.isEmpty()) { @@ -152,6 +163,7 @@ public class DefaultMemStore implements MemStore { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); + this.snapshotOpSeqId = flushOpSeqId; this.snapshotSize = keySize(); if (!this.cellSet.isEmpty()) { this.snapshot = this.cellSet; @@ -170,10 +182,9 @@ public class DefaultMemStore implements MemStore { timeOfOldestEdit = Long.MAX_VALUE; } } - return new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, - this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator)); + return new MemStoreSnapshot(this.snapshotId, this.snapshotOpSeqId, snapshot.size(), this.snapshotSize, + this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator)); } - /** * The passed snapshot was successfully persisted; it can be let go. * @param id Id of the snapshot to clean out. @@ -195,6 +206,7 @@ public class DefaultMemStore implements MemStore { } this.snapshotSize = 0; this.snapshotId = -1; + this.snapshotOpSeqId = HConstants.NO_SEQNUM; if (this.snapshotAllocator != null) { tmpAllocator = this.snapshotAllocator; this.snapshotAllocator = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 788b5bc..4628c2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -890,11 +890,12 @@ public class HStore implements Store { * Snapshot this stores memstore. Call before running * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)} * so it has some work to do. + * @param flushOpSeqId */ - void snapshot() { + void snapshot(long flushOpSeqId) { this.lock.writeLock().lock(); try { - this.memstore.snapshot(); + this.memstore.snapshot(flushOpSeqId); } finally { this.lock.writeLock().unlock(); } @@ -2254,7 +2255,7 @@ public class HStore implements Store { */ @Override public void prepare() { - this.snapshot = memstore.snapshot(); + this.snapshot = memstore.snapshot(cacheFlushSeqNum); this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getSize(); committedFiles = new ArrayList(1); @@ -2262,7 +2263,12 @@ public class HStore implements Store { @Override public void flushCache(MonitoredTask status) throws IOException { - tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status); + tempFiles = HStore.this.flushCache(getSnapshotSeqNum(), snapshot, status); + } + + private long getSnapshotSeqNum() { + return snapshot.getSnapshotOpSeqId() != HConstants.NO_SEQNUM ? + Math.min(snapshot.getSnapshotOpSeqId(), cacheFlushSeqNum) : cacheFlushSeqNum; } @Override @@ -2273,7 +2279,7 @@ public class HStore implements Store { List storeFiles = new ArrayList(this.tempFiles.size()); for (Path storeFilePath : tempFiles) { try { - storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status)); + storeFiles.add(HStore.this.commitFile(storeFilePath, getSnapshotSeqNum(), status)); } catch (IOException ex) { LOG.error("Failed to commit store file " + storeFilePath, ex); // Try to delete the files we have committed before. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 364b9c9..85e0bd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -42,6 +42,14 @@ public interface MemStore extends HeapSize { MemStoreSnapshot snapshot(); /** + * Creates a snapshot of the current memstore. Snapshot must be cleared by call to + * {@link #clearSnapshot(long)}. + * @param flushOpSeqId The sequence id of the flush operation. + * @return {@link MemStoreSnapshot} + */ + MemStoreSnapshot snapshot(long flushOpSeqId); + + /** * Clears the current snapshot of the Memstore. * @param id * @throws UnexpectedStateException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 619cff5..68be1f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -32,10 +32,12 @@ public class MemStoreSnapshot { private final long size; private final TimeRangeTracker timeRangeTracker; private final KeyValueScanner scanner; + private long snapshotOpSeqId; - public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker, + public MemStoreSnapshot(long id, long snapshotOpSeqId, int cellsCount, long size, TimeRangeTracker timeRangeTracker, KeyValueScanner scanner) { this.id = id; + this.snapshotOpSeqId = snapshotOpSeqId; this.cellsCount = cellsCount; this.size = size; this.timeRangeTracker = timeRangeTracker; @@ -50,6 +52,13 @@ public class MemStoreSnapshot { } /** + * @return the sequence id of the flush operation, when this snapshot was created. + */ + public long getSnapshotOpSeqId() { + return snapshotOpSeqId; + } + + /** * @return Number of Cells in this snapshot. */ public int getCellsCount() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d610523..50de0cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -402,6 +402,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Otherwise, we might have to return KVs that have technically expired. long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; + + long maxSequenceId = store.getMaxSequenceId(); // include only those scan files which pass all filters for (KeyValueScanner kvs : allScanners) { @@ -410,6 +412,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner continue; } + // exclude memstore scanners, + // if all cells that have sequence id greater than or equal to read point in memstore were already flushed. + if (!isFile && readPt <= maxSequenceId) { + continue; + } + if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) { scanners.add(kvs); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 43e4a09..9249498 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3422,7 +3422,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { scan.getFamilyMap().get(store.getFamily().getName()), // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set // readpoint 0. - 0); + store.getHRegion().getMVCC().memstoreReadPoint()); List result = new ArrayList(); scanner.next(result); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9780e2a..d80bc30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Progressable; import org.junit.After; import org.junit.Assert; @@ -187,6 +188,7 @@ public class TestStore { info, htd, null); store = new HStore(region, hcd, conf); + return store; } @@ -220,7 +222,7 @@ public class TestStore { long size = store.memstore.getFlushableSize(); Assert.assertEquals(0, size); LOG.info("Adding some data"); - long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)).getFirst(); + long kvSize = addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)).getFirst(); size = store.memstore.getFlushableSize(); Assert.assertEquals(kvSize, size); // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. @@ -233,7 +235,7 @@ public class TestStore { } size = store.memstore.getFlushableSize(); Assert.assertEquals(kvSize, size); - store.add(new KeyValue(row, family, qf2, 2, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 2, (byte[])null)); // Even though we add a new kv, we expect the flushable size to be 'same' since we have // not yet cleared the snapshot -- the above flush failed. Assert.assertEquals(kvSize, size); @@ -314,9 +316,9 @@ public class TestStore { for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #" + i); timeStamp = EnvironmentEdgeManager.currentTime(); - this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null)); - this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null)); - this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null)); + addToStore(new KeyValue(row, family, qf1, timeStamp, (byte[]) null)); + addToStore(new KeyValue(row, family, qf2, timeStamp, (byte[]) null)); + addToStore(new KeyValue(row, family, qf3, timeStamp, (byte[]) null)); flush(i); edge.incrementTime(sleepTime); } @@ -368,9 +370,9 @@ public class TestStore { int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #"+i); - this.store.add(new KeyValue(row, family, qf1, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, i, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, i, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, i, (byte[])null)); + addToStore(new KeyValue(row, family, qf3, i, (byte[])null)); flush(i); } // after flush; check the lowest time stamp @@ -421,8 +423,8 @@ public class TestStore { public void testEmptyStoreFile() throws IOException { init(this.name.getMethodName()); // Write a store file. - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 1, (byte[])null)); flush(1); // Now put in place an empty store file. Its a little tricky. Have to // do manually with hacked in sequence id. @@ -459,13 +461,13 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); - + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf3, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf4, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf5, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf6, 1, (byte[])null)); + //Get result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); @@ -483,20 +485,20 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 1, (byte[])null)); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf3, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf4, 1, (byte[])null)); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf5, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf6, 1, (byte[])null)); //flush flush(3); @@ -522,20 +524,20 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 1, (byte[])null)); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf3, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf4, 1, (byte[])null)); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf5, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf6, 1, (byte[])null)); //Get result = HBaseTestingUtility.getFromStoreFile(store, @@ -548,8 +550,16 @@ public class TestStore { assertCheck(); } + private Pair addToStore(Cell cell) { + Pair ret = store.add(cell); + store.getHRegion().getMVCC().initialize(id++); + + return ret; + } + private void flush(int storeFilessize) throws IOException{ - this.store.snapshot(); + this.store.snapshot(id); + flushStore(store, id++); Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); @@ -575,20 +585,20 @@ public class TestStore { long oldValue = 1L; long newValue = 3L; - this.store.add(new KeyValue(row, family, qf1, - System.currentTimeMillis(), - Bytes.toBytes(oldValue))); + + addToStore(new KeyValue(row, family, qf1, + System.currentTimeMillis(), + Bytes.toBytes(oldValue))); // snapshot the store. - this.store.snapshot(); + this.store.snapshot(id); // add other things: - this.store.add(new KeyValue(row, family, qf2, - System.currentTimeMillis(), - Bytes.toBytes(oldValue))); + addToStore(new KeyValue(row, family, qf2, + System.currentTimeMillis(), + Bytes.toBytes(oldValue))); - // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = updateColumnValue(row, family, qf1, newValue); // memstore should have grown by some amount. Assert.assertTrue(ret > 0); @@ -634,19 +644,19 @@ public class TestStore { long size = 0; - size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1, + size += addToStore(new KeyValue(Bytes.toBytes("200909091000"), family, qf1, System.currentTimeMillis(), Bytes.toBytes(newValue))).getFirst(); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1, + size += addToStore(new KeyValue(Bytes.toBytes("200909091200"), family, qf1, System.currentTimeMillis(), Bytes.toBytes(newValue))).getFirst(); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1, + size += addToStore(new KeyValue(Bytes.toBytes("200909091300"), family, qf1, System.currentTimeMillis(), Bytes.toBytes(newValue))).getFirst(); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1, + size += addToStore(new KeyValue(Bytes.toBytes("200909091400"), family, qf1, System.currentTimeMillis(), Bytes.toBytes(newValue))).getFirst(); - size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1, + size += addToStore(new KeyValue(Bytes.toBytes("200909091500"), family, qf1, System.currentTimeMillis(), Bytes.toBytes(newValue))).getFirst(); @@ -654,8 +664,8 @@ public class TestStore { for ( int i = 0 ; i < 10000 ; ++i) { newValue++; - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); + long ret = updateColumnValue(row, family, qf1, newValue); + long ret2 = updateColumnValue(row2, family, qf1, newValue); if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); @@ -687,15 +697,16 @@ public class TestStore { long oldValue = 1L; long newValue = 3L; - this.store.add(new KeyValue(row, family, qf1, - EnvironmentEdgeManager.currentTime(), - Bytes.toBytes(oldValue))); + + addToStore(new KeyValue(row, family, qf1, + EnvironmentEdgeManager.currentTime(), + Bytes.toBytes(oldValue))); // snapshot the store. - this.store.snapshot(); + this.store.snapshot(id); // update during the snapshot, the exact same TS as the Put (lololol) - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = updateColumnValue(row, family, qf1, newValue); // memstore should have grown by some amount. Assert.assertTrue(ret > 0); @@ -707,15 +718,14 @@ public class TestStore { // now increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + updateColumnValue(row, family, qf1, newValue); // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + updateColumnValue(row, family, qf1, newValue); // the second TS should be TS=2 or higher., even though 'time=1' right now. - // how many key/values for this row are there? Get get = new Get(row); get.addColumn(family, qf1); @@ -734,7 +744,7 @@ public class TestStore { mee.setValue(2); // time goes up slightly newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + updateColumnValue(row, family, qf1, newValue); results = HBaseTestingUtility.getFromStoreFile(store, get); Assert.assertEquals(2, results.size()); @@ -747,6 +757,12 @@ public class TestStore { Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); } + private long updateColumnValue(byte[] row, byte[] family, byte[] qf, long newValue) throws IOException { + long ret = this.store.updateColumnValue(row, family, qf, newValue); + store.getHRegion().getMVCC().initialize(id++); + return ret; + } + @Test public void testHandleErrorsInFlush() throws Exception { LOG.info("Setting up a faulty file system that cannot write"); @@ -768,9 +784,9 @@ public class TestStore { init(name.getMethodName(), conf); LOG.info("Adding some data"); - store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); - store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf2, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf3, 1, (byte[])null)); LOG.info("Before flush, we should have no files"); @@ -901,15 +917,15 @@ public class TestStore { List kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); for (Cell kv : kvList1) { - this.store.add(KeyValueUtil.ensureKeyValue(kv)); + addToStore(KeyValueUtil.ensureKeyValue(kv)); } - this.store.snapshot(); + this.store.snapshot(id); flushStore(store, id++); List kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); for(Cell kv : kvList2) { - this.store.add(KeyValueUtil.ensureKeyValue(kv)); + addToStore(KeyValueUtil.ensureKeyValue(kv)); } List result; @@ -1039,7 +1055,7 @@ public class TestStore { assertEquals(0, this.store.getStorefilesCount()); // add some data, flush - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); flush(1); assertEquals(1, this.store.getStorefilesCount()); @@ -1086,7 +1102,7 @@ public class TestStore { assertEquals(0, this.store.getStorefilesCount()); // add some data, flush - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + addToStore(new KeyValue(row, family, qf1, 1, (byte[])null)); flush(1); // add one more file addStoreFile();