.../apache/hadoop/hbase/regionserver/HRegion.java | 59 +++++++++++++++++----- .../hadoop/hbase/regionserver/KeyValueHeap.java | 21 ++++++++ .../hadoop/hbase/regionserver/KeyValueScanner.java | 11 ++++ .../hbase/regionserver/NonLazyKeyValueScanner.java | 10 ++++ .../regionserver/ReversedRegionScannerImpl.java | 9 +++- .../hbase/regionserver/StoreFileScanner.java | 10 ++++ .../hadoop/hbase/regionserver/StoreScanner.java | 39 ++++++++++++-- 7 files changed, 140 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 37d0f08..e856a1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5272,6 +5272,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Heap of key-values that are not essential for the provided filters and are thus read * on demand, if on-demand column family loading is enabled.*/ KeyValueHeap joinedHeap = null; + // Indicates if the storeHeap is formed of only one StoreScanner + boolean singleStoreScannerHeap = false; + // Indicates if the joinedHeap is formed of only one StoreScanner. + boolean singleStoreScannerJoinedHeap = false; /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ @@ -5365,8 +5369,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List joinedScanners, HRegion region) throws IOException { this.storeHeap = new KeyValueHeap(scanners, comparator); + // Only if there is a single CF + singleStoreScannerHeap = (this.storeHeap.size() == 1); if (!joinedScanners.isEmpty()) { this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); + // Only if there is a single CF + singleStoreScannerJoinedHeap = (this.joinedHeap.size() == 1); } } @@ -5404,7 +5412,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public synchronized boolean next(List outResults, ScannerContext scannerContext) throws IOException { + public synchronized boolean next(List outResults, ScannerContext scannerContext) + throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -5485,7 +5494,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { assert joinedContinuationRow != null; boolean moreValues = populateResult(results, this.joinedHeap, scannerContext, - joinedContinuationRow); + joinedContinuationRow, singleStoreScannerJoinedHeap); if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { // We are done with this row, reset the continuation. @@ -5503,10 +5512,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. * @param scannerContext * @param currentRowCell + * @param singleStoreScanner if the heap is made up of single StoreScanner * @return state of last call to {@link KeyValueHeap#next()} */ private boolean populateResult(List results, KeyValueHeap heap, - ScannerContext scannerContext, Cell currentRowCell) throws IOException { + ScannerContext scannerContext, Cell currentRowCell, boolean singleStoreScanner) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; boolean tmpKeepProgress = scannerContext.getKeepProgress(); @@ -5522,7 +5533,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.setKeepProgress(tmpKeepProgress); nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + moreCellsInRow = moreCellsInRow(heap, nextKv, currentRowCell, singleStoreScanner); if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); @@ -5546,11 +5557,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Based on the nextKv in the heap, and the current row, decide whether or not there are more * cells to be read in the heap. If the row of the nextKv in the heap matches the current row * then there are more cells to be read in the row. + * @param heap the current KeyValueHeap * @param nextKv * @param currentRowCell + * @param singleStoreScanner indicates if the heap is made up of one element * @return true When there are more cells in the row to be read */ - private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { + protected boolean moreCellsInRow(final KeyValueHeap heap, final Cell nextKv, + Cell currentRowCell, boolean singleStoreScanner) { + if (singleStoreScanner) { + return nextKv != null && !heap.movedToNextRow(); + } return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell); } @@ -5655,7 +5672,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // here we are filtering a row based purely on its row key, preventing us from calling // #populateResult. Thus, perform the necessary increment here to rows scanned metric incrementCountOfRowsScannedMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); + boolean alreadyInNext = false; + if(singleStoreScannerHeap && this.storeHeap.movedToNextRow()) { + // We have already moved to the nextRow so we should do next() + alreadyInNext = true; + } + boolean moreRows = + nextRow(scannerContext, current, singleStoreScannerHeap, alreadyInNext); + // Reset only here because the #populateResult is not going + // to be called here if the next Cell is also filtered out by + // filterRowKey + if(singleStoreScannerHeap) { + this.storeHeap.resetMovedtoNextRow(); + } if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5664,7 +5693,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, current); + populateResult(results, this.storeHeap, scannerContext, current, singleStoreScannerHeap); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { @@ -5706,7 +5735,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { incrementCountOfRowsFilteredMetric(scannerContext); results.clear(); - boolean moreRows = nextRow(scannerContext, current); + boolean moreRows = nextRow(scannerContext, current, singleStoreScannerHeap, false); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5750,7 +5779,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { incrementCountOfRowsFilteredMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); + boolean moreRows = nextRow(scannerContext, current, singleStoreScannerHeap, false); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5820,12 +5849,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return filter != null && filter.filterRowKey(current); } - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { - assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; + protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell, + boolean singleStoreScanner, boolean alreadyInNext) throws IOException { + assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; Cell next; - while ((next = this.storeHeap.peek()) != null && - CellUtil.matchingRow(next, curRowCell)) { + while ((next = this.storeHeap.peek()) != null && (alreadyInNext + || moreCellsInRow(this.storeHeap, next, curRowCell, singleStoreScanner))) { this.storeHeap.next(MOCKED_LIST); + if (alreadyInNext) { + alreadyInNext = false; + } } resetFilters(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 806eeb5..e0a93ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -51,6 +51,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner // scans are collected and when the final scanner.close() happens will perform the // actual close. protected Set scannersForDelayedClose = new HashSet(); + protected int count = 0; /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -92,6 +93,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner for (KeyValueScanner scanner : scanners) { if (scanner.peek() != null) { this.heap.add(scanner); + count++; } else { this.scannersForDelayedClose.add(scanner); } @@ -434,4 +436,23 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } } } + + @Override + public boolean movedToNextRow() { + return this.current.movedToNextRow(); + } + + @Override + public void resetMovedtoNextRow() { + if (this.current != null) { + this.current.resetMovedtoNextRow(); + } + } + + /** + * @return the number of KeyValueScanner in the heap + */ + public int size() { + return count; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 9a62b8f..07216da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -162,4 +162,15 @@ public interface KeyValueScanner extends Shipper { * if known, or null otherwise */ public Cell getNextIndexedKey(); + + /** + * @return true if this scanner has moved over to the nextRow, false + * otherwise + */ + public boolean movedToNextRow(); + + /** + * Reset the flag that indicates the scanner has moved to next row + */ + public void resetMovedtoNextRow(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 9a9036b..9514182 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -76,4 +76,14 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { public void shipped() throws IOException { // do nothing } + + @Override + public boolean movedToNextRow() { + return false; + } + + @Override + public void resetMovedtoNextRow() { + // No impl + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ca09cdc..d70eace 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -65,8 +65,8 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } @Override - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) - throws IOException { + protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell, + boolean singleStoreScanner, boolean alreadyInNextRow) throws IOException { assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; byte[] row = new byte[curRowCell.getRowLength()]; CellUtil.copyRowTo(curRowCell, row, 0); @@ -79,4 +79,9 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { return true; } + @Override + protected boolean moreCellsInRow(final KeyValueHeap heap, final Cell nextKv, + Cell currentRowCell, final boolean singleStoreScanner) { + return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 9c04838..1b30c1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -491,4 +491,14 @@ public class StoreFileScanner implements KeyValueScanner { public void shipped() throws IOException { this.hfs.shipped(); } + + @Override + public boolean movedToNextRow() { + return false; + } + + @Override + public void resetMovedtoNextRow() { + // No impl + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index b81ca4c..0aa3c0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -95,6 +95,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ private long kvsScanned = 0; private Cell prevCell = null; + // Indicates if the next() was completed due to a change in row + boolean nextRow = false; /** We don't ever expect to change this, the constant is just for clarity. */ static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; @@ -501,7 +503,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean next(List outResult, ScannerContext scannerContext) throws IOException { lock.lock(); - + boolean alreadyInNextRow = false; + if (nextRow) { + // see the previous state + alreadyInNextRow = true; + resetMovedtoNextRow(); + } try { if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); @@ -531,14 +538,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // rows. Else it is possible we are still traversing the same row so we must perform the row // comparison. if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null || - !CellUtil.matchingRow(peeked, matcher.curCell)) { + alreadyInNextRow || !CellUtil.matchingRow(peeked, matcher.curCell)) { this.countPerRow = 0; matcher.setToNewRow(peeked); } // Clear progress away unless invoker has indicated it should be kept. if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); - + Cell cell; // Only do a sanity-check if store and comparator are available. @@ -560,7 +567,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); switch(qcode) { @@ -581,6 +587,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } seekToNextRow(cell); + nextRow = true; break LOOP; } @@ -608,6 +615,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } seekToNextRow(cell); + nextRow = true; } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { seekAsDirection(matcher.getKeyForNextColumn(cell)); } else { @@ -615,14 +623,19 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + // Compare here itself if the row has been crossed + // The comparison is any way going to happen at the HRegion level + checkMatchingRows(this.heap.peek()); break LOOP; } if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + checkMatchingRows(this.heap.peek()); break LOOP; } continue; case DONE: + nextRow = true; return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); case DONE_SCAN: @@ -637,6 +650,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } seekToNextRow(cell); + nextRow = true; break; case SEEK_NEXT_COL: @@ -673,6 +687,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } + private void checkMatchingRows(Cell cell) { + if(cell != null) { + if(!CellUtil.matchingRow(cell, prevCell)) { + nextRow = true; + } + } + } + + @Override + public boolean movedToNextRow() { + return nextRow; + } + + @Override + public void resetMovedtoNextRow() { + nextRow = false; + } /* * See if we should actually SEEK or rather just SKIP to the next Cell. * (see HBASE-13109)