.../apache/hadoop/hbase/regionserver/HRegion.java | 55 +++++++++++++++--- .../hadoop/hbase/regionserver/KeyValueHeap.java | 28 ++++++++++ .../hadoop/hbase/regionserver/KeyValueScanner.java | 24 ++++++++ .../hbase/regionserver/NonLazyKeyValueScanner.java | 15 +++++ .../regionserver/ReversedRegionScannerImpl.java | 4 ++ .../hbase/regionserver/StoreFileScanner.java | 33 ++++++++--- .../hadoop/hbase/regionserver/StoreScanner.java | 65 ++++++++++++++++++++-- 7 files changed, 204 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a8ffa8d..b57c953 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner.NextRowState; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -5644,7 +5645,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.setKeepProgress(tmpKeepProgress); nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + + moreCellsInRow = moreCellsInRow(heap, nextKv, currentRowCell); + //System.out.println("nextkv "+nextKv + " currentRowcell "+currentRowCell); + //System.out.println(moreCellsInRow +" "+matchingRows(nextKv, currentRowCell)); if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); @@ -5664,6 +5668,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return nextKv != null; } + protected boolean moreCellsInRow(KeyValueHeap heap, + Cell nextKv, Cell currentRowCell) { + if (nextKv == null) { + return false; + } else { + switch (heap.getNextRowState()) { + case INIT: + return CellUtil.matchingRow(nextKv, currentRowCell); + case MOVED_TO_NEXT_ROW: + return false; + default: + return true; + } + } + } + /** * Based on the nextKv in the heap, and the current row, decide whether or not there are more * cells to be read in the heap. If the row of the nextKv in the heap matches the current row @@ -5672,7 +5692,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param currentRowCell * @return true When there are more cells in the row to be read */ - private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { + protected boolean matchingRows(final Cell nextKv, Cell currentRowCell) { return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell); } @@ -5777,7 +5797,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // here we are filtering a row based purely on its row key, preventing us from calling // #populateResult. Thus, perform the necessary increment here to rows scanned metric incrementCountOfRowsScannedMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, current); + // Reset only here because the #populateResult is not going + // to be called here if the next Cell is also filtered out by + // filterRowKey if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5794,6 +5818,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "Filter whose hasFilterRow() returns true is incompatible with scans that must " + " stop mid-row because of a limit. ScannerContext:" + scannerContext); } + this.storeHeap.resetNextRowState(); return true; } @@ -5832,13 +5857,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - // Ok, we are done with storeHeap for this row. // Now we may need to fetch additional, non-essential data into row. // These values are not needed for filter to work, so we postpone their @@ -5848,8 +5871,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (mayHaveData) { joinedContinuationRow = current; populateFromJoinedHeap(results, scannerContext); - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + resetNextRowState(); return true; } } @@ -5858,12 +5881,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Populating from the joined heap was stopped by limits, populate some more. populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + resetNextRowState(); return true; } } // We may have just called populateFromJoinedMap and hit the limits. If that is // the case, we need to call it again on the next next() invocation. if (joinedContinuationRow != null) { + resetNextRowState(); return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } @@ -5878,7 +5903,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (!stopRow) continue; } - + resetNextRowState(); if (stopRow) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { @@ -5887,6 +5912,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void resetNextRowState() { + if(this.storeHeap != null) { + // reset here + this.storeHeap.resetNextRowState(); + } + if (this.joinedHeap != null) { + // Reset here + this.joinedHeap.resetNextRowState(); + } + } + protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; @@ -5945,10 +5981,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; Cell next; - while ((next = this.storeHeap.peek()) != null && - CellUtil.matchingRow(next, curRowCell)) { + while ((next = this.storeHeap.peek()) != null && ( + moreCellsInRow(storeHeap, next, curRowCell))) { this.storeHeap.next(MOCKED_LIST); } + // Better to call reset here because we are sure that we have + // moved to the next row + this.storeHeap.resetNextRowState(); resetFilters(); // Calling the hook in CP which allows it to do a fast forward diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 806eeb5..fcffa8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -434,4 +434,32 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } } } + + @Override + public NextRowState getNextRowState() { + if (this.current != null) { + return this.current.getNextRowState(); + } + return KeyValueScanner.NextRowState.INIT; + } + + @Override + public void resetNextRowState() { + if (this.current != null) { + this.current.resetNextRowState(); + } + if (this.heap != null) { + for (KeyValueScanner scanner : this.heap) { + scanner.resetNextRowState(); + } + } + } + + @Override + public boolean isFakeCell() { + if(this.current != null) { + return this.current.isFakeCell(); + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 9a62b8f..c289d74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -162,4 +162,28 @@ public interface KeyValueScanner extends Shipper { * if known, or null otherwise */ public Cell getNextIndexedKey(); + + /** + * @return the state of the scanner if it has moved to the next row or not + */ + public NextRowState getNextRowState(); + + /** + * Reset the scanners state to indicate that they are in a new row + */ + public void resetNextRowState(); + + /** + * Indicates the scanner's state if it has moved to the next row + */ + public enum NextRowState { + MOVED_TO_NEXT_ROW, // If the scanner has moved to the next row + CURRENT_ROW, // If the scanner has not moved to the next row + INIT; // The state of the scanner before the scanners starts scanning any row + } + /** + * @return true if the cell peeked/retrieved is a fakeCell + * @return + */ + public boolean isFakeCell(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 9a9036b..c9a46d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -76,4 +76,19 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { public void shipped() throws IOException { // do nothing } + + @Override + public NextRowState getNextRowState() { + return NextRowState.INIT; + } + + @Override + public void resetNextRowState() { + // No op + } + + @Override + public boolean isFakeCell() { + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ca09cdc..f4a7e4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -79,4 +79,8 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { return true; } + @Override + protected boolean moreCellsInRow(KeyValueHeap heap, Cell nextKv, Cell currentRowCell) { + return matchingRows(nextKv, currentRowCell); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 108b889..5cda39e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -48,6 +48,7 @@ public class StoreFileScanner implements KeyValueScanner { private final StoreFile.Reader reader; private final HFileScanner hfs; private Cell cur = null; + private boolean fakeCell = false; private boolean realSeekDone; private boolean delayedReseek; @@ -149,7 +150,7 @@ public class StoreFileScanner implements KeyValueScanner { // only seek if we aren't at the end. cur == null implies 'end'. if (cur != null) { hfs.next(); - setCurrentCell(hfs.getCell()); + setCurrentCell(hfs.getCell(), false); if (hasMVCCInfo || this.reader.isBulkLoaded()) { skipKVsNewerThanReadpoint(); } @@ -172,7 +173,7 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - setCurrentCell(hfs.getCell()); + setCurrentCell(hfs.getCell(), false); if (!hasMVCCInfo && this.reader.isBulkLoaded()) { return skipKVsNewerThanReadpoint(); @@ -198,7 +199,7 @@ public class StoreFileScanner implements KeyValueScanner { this.cur = null; return false; } - setCurrentCell(hfs.getCell()); + setCurrentCell(hfs.getCell(), false); if (!hasMVCCInfo && this.reader.isBulkLoaded()) { return skipKVsNewerThanReadpoint(); @@ -216,8 +217,9 @@ public class StoreFileScanner implements KeyValueScanner { } } - protected void setCurrentCell(Cell newVal) throws IOException { + protected void setCurrentCell(Cell newVal, boolean fakeCell) throws IOException { this.cur = newVal; + this.fakeCell = fakeCell; if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } @@ -231,7 +233,7 @@ public class StoreFileScanner implements KeyValueScanner { && cur != null && (cur.getSequenceId() > readPt)) { hfs.next(); - setCurrentCell(hfs.getCell()); + setCurrentCell(hfs.getCell(), false); if (this.stopSkippingKVsIfNextRow && getComparator().compareRows(cur, startKV) > 0) { return false; @@ -354,7 +356,7 @@ public class StoreFileScanner implements KeyValueScanner { // a higher timestamp than the max timestamp in this file. We know that // the next point when we have to consider this file again is when we // pass the max timestamp of this file (with the same row/column). - setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile)); + setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile), true); } else { // This will be the case e.g. when we need to seek to the next // row/column, and we don't know exactly what they are, so we set the @@ -372,7 +374,7 @@ public class StoreFileScanner implements KeyValueScanner { // key/value and the store scanner will progress to the next column. This // is obviously not a "real real" seek, but unlike the fake KV earlier in // this method, we want this to be propagated to ScanQueryMatcher. - setCurrentCell(CellUtil.createLastOnRowCol(kv)); + setCurrentCell(CellUtil.createLastOnRowCol(kv), true); realSeekDone = true; return true; @@ -446,7 +448,7 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - setCurrentCell(hfs.getCell()); + setCurrentCell(hfs.getCell(), false); this.stopSkippingKVsIfNextRow = true; boolean resultOfSkipKVs; try { @@ -504,4 +506,19 @@ public class StoreFileScanner implements KeyValueScanner { public void shipped() throws IOException { this.hfs.shipped(); } + + @Override + public NextRowState getNextRowState() { + return NextRowState.INIT; + } + + @Override + public void resetNextRowState() { + // No op + } + + @Override + public boolean isFakeCell() { + return fakeCell; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 2cb0b61..75362a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -95,6 +95,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ private long kvsScanned = 0; private Cell prevCell = null; + // Indicates the current state if the current store scanner has moved + // to the next row + private KeyValueScanner.NextRowState nextRow = NextRowState.INIT; + // Indicates if the NextRowState previously set to MOVED + private boolean wasInNextRow = false; /** We don't ever expect to change this, the constant is just for clarity. */ static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; @@ -501,7 +506,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean next(List outResult, ScannerContext scannerContext) throws IOException { lock.lock(); - + // Indicates if the cell that is passed to the scanner is a fake one. + // This happens when the scanner initially seeks to a key which is actually + // not present in the store + boolean fakeCell = false; try { if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); @@ -531,8 +539,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // rows. Else it is possible we are still traversing the same row so we must perform the row // comparison. if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null || - !CellUtil.matchingRow(cell, matcher.curCell)) { + wasInNextRow || !CellUtil.matchingRow(cell, matcher.curCell)) { this.countPerRow = 0; + wasInNextRow = false; matcher.setToNewRow(cell); } @@ -556,11 +565,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. + // when we go with ExplicitColumnTracker we seek to a fake key and that fake + // key could have a row that is not the actual row. In such cases though + // we have moved to the new row we still can't say that we have moved to the + // new row + if (this.heap.isFakeCell()) { + fakeCell = true; + } else { + fakeCell = false; + } + nextRow = NextRowState.CURRENT_ROW; checkScanOrder(prevCell, cell, comparator); prevCell = cell; - ScanQueryMatcher.MatchCode qcode = matcher.match(cell); qcode = optimize(qcode, cell); + //System.out.println(qcode); switch(qcode) { case INCLUDE: case INCLUDE_AND_SEEK_NEXT_ROW: @@ -579,6 +598,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } seekToNextRow(cell); + if (!fakeCell) { + nextRow = NextRowState.MOVED_TO_NEXT_ROW; + } break LOOP; } @@ -606,6 +628,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } seekToNextRow(cell); + if (!fakeCell) { + nextRow = NextRowState.MOVED_TO_NEXT_ROW; + } } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { seekAsDirection(matcher.getKeyForNextColumn(cell)); } else { @@ -613,15 +638,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { + // Compare here itself if the row has been crossed + // The comparison is any way going to happen at the HRegion level + checkMatchingRows(this.heap.peek()); break LOOP; } if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + checkMatchingRows(this.heap.peek()); break LOOP; } continue; case DONE: - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + NextState nextState = scannerContext.setScannerState(NextState.MORE_VALUES); + if (!fakeCell) { + nextRow = NextRowState.MOVED_TO_NEXT_ROW; + } + return nextState.hasMoreValues(); case DONE_SCAN: close(false);// Do all cleanup except heap.close() @@ -635,6 +668,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } seekToNextRow(cell); + if (!fakeCell) { + nextRow = NextRowState.MOVED_TO_NEXT_ROW; + } break; case SEEK_NEXT_COL: @@ -671,6 +707,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } + private void checkMatchingRows(Cell cell) { + if (cell != null) { + if (!CellUtil.matchingRow(cell, prevCell)) { + nextRow = NextRowState.MOVED_TO_NEXT_ROW; + } + } + } + + @Override + public NextRowState getNextRowState() { + return nextRow; + } + + @Override + public void resetNextRowState() { + if (nextRow == NextRowState.MOVED_TO_NEXT_ROW) { + wasInNextRow = true; + } + nextRow = NextRowState.INIT; + } + /* * See if we should actually SEEK or rather just SKIP to the next Cell. * (see HBASE-13109)