diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 64e8397c56..5b0022787b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.collect.Lists; - import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -38,9 +37,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -53,6 +55,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -61,6 +64,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -70,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -161,9 +166,14 @@ public class TestStore { init(methodName, conf, htd, hcd); } - @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { + return init(methodName, conf, htd, hcd, null); + } + + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -185,8 +195,11 @@ public class TestStore { final WALFactory wals = new WALFactory(walConf, null, methodName); HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), fs, conf, info, htd, null); - - store = new HStore(region, hcd, conf); + if (hook == null) { + store = new HStore(region, hcd, conf); + } else { + store = new MyStore(region, hcd, conf, hook); + } return store; } @@ -939,4 +952,106 @@ public class TestStore { //ensure that replaceStoreFiles is not called if files are not refreshed verify(spiedStore, times(0)).replaceStoreFiles(null, null); } + + private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + + @Test + public void testScanWithDoubleFlush() throws IOException { + Configuration conf = HBaseConfiguration.create(); + // Initialize region + MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> { + final long tmpId = id++; + ExecutorService s = Executors.newSingleThreadExecutor(); + s.submit(() -> { + try { + // flush the store before storescanner updates the scanners from store. + // The current data will be flushed into files, and the memstore will + // be clear. + // -- phase (4/4) + flushStore(store1, tmpId); + }catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + s.shutdown(); + try { + // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. + s.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + } + }); + byte[] oldValue = Bytes.toBytes("oldValue"); + byte[] currentValue = Bytes.toBytes("currentValue"); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize); + myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize); + long snapshotId = id++; + // push older data into snapshot -- phase (1/4) + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); + storeFlushCtx.prepare(); + + // insert current data into active -- phase (2/4) + myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize); + myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner( + new Scan(new Get(row)), quals, seqId + 1)) { + // complete the flush -- phase (3/4) + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + + List results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(currentValue) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, currentValue)); + } + } + } + + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(5); + return (MyStore) init(methodName, conf, htd, hcd, hook); + } + + private static class MyStore extends HStore { + private final MyScannerHook hook; + MyStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam, MyScannerHook hook) throws IOException { + super(region, family, confParam); + this.hook = hook; + } + + @Override + public List getScanners(List files, boolean cacheBlocks, + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + if (hook != null) { + hook.hook(this); + } + return super.getScanners(files, cacheBlocks, usePread, + isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); + } + } + private interface MyScannerHook { + void hook(MyStore store) throws IOException; + } }