diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index de66d4e5c9..0f3e301224 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -139,7 +139,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private ReentrantLock flushLock = new ReentrantLock(); private final long readPt; - + private boolean topChanged = false; // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { BEFORE_SEEK, @@ -531,7 +531,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; - + topChanged = false; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); switch(qcode) { @@ -620,36 +620,41 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); case SEEK_NEXT_ROW: - // This is just a relatively simple end of scan fix, to short-cut end - // us if there is an endKey in the scan. - if (!matcher.moreRowsMayExistAfter(cell)) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow() - // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do - // another compareRow to say the current row is DONE - matcher.row = null; - seekToNextRow(cell); - break; - case SEEK_NEXT_COL: - seekAsDirection(matcher.getKeyForNextColumn(cell)); + case SEEK_NEXT_USING_HINT: + if (qcode == ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW) { + // This is just a relatively simple end of scan fix, to short-cut end + // us if there is an endKey in the scan. + if (!matcher.moreRowsMayExistAfter(cell)) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + // Setting the matcher.row = null, will mean that after the subsequent seekToNextRow() + // the heap.peek() will any way be in the next row. So the SQM.match(cell) need do + // another compareRow to say the current row is DONE + matcher.row = null; + seekToNextRow(cell); + } else if (qcode == ScanQueryMatcher.MatchCode.SEEK_NEXT_COL) { + seekAsDirection(matcher.getKeyForNextColumn(cell)); + } else { + // TODO convert resee to Cell? + Cell nextKV = matcher.getNextKeyHint(cell); + if (nextKV != null) { + seekAsDirection(nextKV); + } else { + heap.next(); + break; + } + } + NextState ns = needToReturn(outResult); + if (ns != null) { + return scannerContext.setScannerState(ns).hasMoreValues(); + } break; case SKIP: this.heap.next(); break; - case SEEK_NEXT_USING_HINT: - // TODO convert resee to Cell? - Cell nextKV = matcher.getNextKeyHint(cell); - if (nextKV != null) { - seekAsDirection(nextKV); - } else { - heap.next(); - } - break; - default: throw new RuntimeException("UNEXPECTED"); } @@ -665,6 +670,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** + * If the top cell won't be flushed into disk, the new top cell may be + * changed after #reopenAfterFlush. Because the older top cell only exist + * in the memstore scanner but the memstore scanner is replaced by hfile + * scanner after #reopenAfterFlush. If the row of top cell is changed, + * we should return the current cells. Otherwise, we may return + * the cells across different rows. + * @param outResult the cells which are visible for user scan + * @return null is the top cell doesn't change. Otherwise, the NextState + * to return + */ + private NextState needToReturn(List outResult) { + if (!outResult.isEmpty() && topChanged) { + return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; + } + return null; + } + + /** * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). * This method works together with ColumnTrackers and Filters. ColumnTrackers may issue SEEK * hints, such as seek to next column, next row, or seek to an arbitrary seek key. @@ -817,14 +840,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner resetScannerStack(this.lastTop); if (this.heap.peek() == null || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { - LOG.debug("Storescanner.peek() is changed where before = " + LOG.info("Storescanner.peek() is changed where before = " + this.lastTop.toString() + ",and after = " + this.heap.peek()); this.lastTop = null; + topChanged = true; return true; } this.lastTop = null; // gone! } // else dont need to reseek + topChanged = false; return false; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index ed4ee579f7..030bcad89b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -19,20 +19,16 @@ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.google.common.collect.Lists; import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -100,6 +96,10 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test class for the Store @@ -179,7 +179,7 @@ public class TestStore { @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, - HColumnDescriptor hcd, MyScannerHook hook) throws IOException { + HColumnDescriptor hcd, MyStoreHook hook) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -1132,6 +1132,10 @@ public class TestStore { } private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { + return createCell(row, qualifier, ts, sequenceId, value); + } + + private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); CellUtil.setSequenceId(c, sequenceId); return c; @@ -1141,9 +1145,9 @@ public class TestStore { public void testScanWithDoubleFlush() throws IOException { Configuration conf = HBaseConfiguration.create(); // Initialize region - MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() { + MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { @Override - public void hook(final MyStore store) throws IOException { + public void getScanners(final MyStore store) throws IOException { final long tmpId = id++; ExecutorService s = Executors.newSingleThreadExecutor(); s.submit(new Runnable() { @@ -1209,6 +1213,148 @@ public class TestStore { } @Test + public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + return Filter.ReturnCode.INCLUDE; + } + }); + } + + @Test + public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException { + final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGoNextRow.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGoNextRow.get()) { + timeToGoNextRow.set(false); + return Filter.ReturnCode.NEXT_ROW; + } else { + return Filter.ReturnCode.INCLUDE; + } + } + }); + } + + @Test + public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException { + final AtomicBoolean timeToGetHint = new AtomicBoolean(false); + testFlushBeforeCompletingScan(new MyListHook() { + @Override + public void hook(int currentSize) { + if (currentSize == 2) { + try { + flushStore(store, id++); + timeToGetHint.set(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + }, new FilterBase() { + @Override + public Filter.ReturnCode filterKeyValue(Cell v) throws IOException { + if (timeToGetHint.get()) { + timeToGetHint.set(false); + return Filter.ReturnCode.SEEK_NEXT_USING_HINT; + } else { + return Filter.ReturnCode.INCLUDE; + } + } + @Override + public Cell getNextCellHint(Cell currentCell) throws IOException { + return currentCell; + } + }); + } + + private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter) + throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(1); + byte[] r0 = Bytes.toBytes("row0"); + byte[] r1 = Bytes.toBytes("row1"); + byte[] r2 = Bytes.toBytes("row2"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + long ts = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() { + @Override + public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 won't be flushed to disk because the value of max version is 1 + store.add(createCell(r0, qf1, ts, seqId, value0)); + store.add(createCell(r0, qf2, ts, seqId, value0)); + store.add(createCell(r0, qf3, ts, seqId, value0)); + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1)); + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1)); + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1)); + store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2)); + store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2)); + store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2)); + store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1)); + store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1)); + store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1)); + List myList = new MyList<>(hook); + Scan scan = new Scan(r1) + .setFilter(filter); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + scan, null, seqId + 3)){ + // r1 + scanner.next(myList); + assertEquals(3, myList.size()); + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value1)); + } + List normalList = new ArrayList<>(3); + // r2 + scanner.next(normalList); + assertEquals(3, normalList.size()); + for (Cell c : normalList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value2) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value2)); + } + } + } + + @Test public void testReclaimChunkWhenScaning() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.setFloat(CHUNK_POOL_MAXSIZE_KEY, 1); @@ -1269,7 +1415,7 @@ public class TestStore { } } - private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setMaxVersions(5); @@ -1278,10 +1424,10 @@ public class TestStore { private static class MyStore extends HStore { - private final MyScannerHook hook; + private final MyStoreHook hook; MyStore(final HRegion region, final HColumnDescriptor family, - final Configuration confParam, MyScannerHook hook) throws IOException { + final Configuration confParam, MyStoreHook hook) throws IOException { super(region, family, confParam); this.hook = hook; } @@ -1290,15 +1436,23 @@ public class TestStore { public List getScanners(List files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException { - hook.hook(this); + hook.getScanners(this); return super.getScanners(files, cacheBlocks, isGet, usePread, isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner); } - } - private interface MyScannerHook { + @Override + public long getSmallestReadPoint() { + return hook.getSmallestReadPoint(this); + } + } - void hook(MyStore store) throws IOException; + private abstract class MyStoreHook { + void getScanners(MyStore store) throws IOException { + } + long getSmallestReadPoint(HStore store) { + return store.getHRegion().getSmallestReadPoint(); + } } interface MyListHook {