From 9791b92abc6885d577176794147a3f6950f98803 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 9 Mar 2016 14:45:55 +0800 Subject: [PATCH] HBASE-15398 Cells loss or disorder when using family essential filter and partial scanning protocol --- .../apache/hadoop/hbase/client/ClientScanner.java | 7 +- .../org/apache/hadoop/hbase/client/Result.java | 6 + .../java/org/apache/hadoop/hbase/client/Scan.java | 15 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 268 ++++++++++++--------- .../hadoop/hbase/regionserver/RSRpcServices.java | 1 + .../regionserver/ReversedRegionScannerImpl.java | 17 +- .../hbase/TestPartialResultsFromClientSide.java | 88 +++++++ 7 files changed, 279 insertions(+), 123 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5b..83a9c56 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -31,7 +31,9 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; @@ -45,6 +47,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -97,6 +100,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final int primaryOperationTimeout; private int retries; protected final ExecutorService pool; + protected boolean hasUnenssentialFamily = false; /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -157,7 +161,8 @@ public abstract class ClientScanner extends AbstractClientScanner { protected abstract void initCache(); - protected void initializeScannerInConstruction() throws IOException{ + + protected void initializeScannerInConstruction() throws IOException { // initialize the scanner nextScanner(this.caching, false); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..bd64bed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,6 +24,7 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -829,6 +830,11 @@ public class Result implements CellScannable, CellScanner { } } + // We need sort to prevent server sending disordered cells. See HBASE-15398 + // COMPARATOR is ok because META_COMPARATOR only differ on compareRows + // and we have same row for cells. + Collections.sort(cells, CellComparator.COMPARATOR); + return Result.create(cells, null, stale); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 1892f54..d2d2ea0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -985,4 +986,18 @@ public class Scan extends Query { this.asyncPrefetch = asyncPrefetch; return this; } + + + public boolean checkFilterHavingUnenssentialFamily(Set familySet) throws IOException { + if (this.getFilter() == null) { + return false; + } + Filter filter = this.getFilter(); + for (byte[] family : familySet) { + if(!filter.isFamilyEssential(family)){ + return true; + } + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c090b54..ab6fa58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; @@ -2579,8 +2580,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException { - RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem); - return scanner; + + boolean hasUnenssentialFamily = scan.checkFilterHavingUnenssentialFamily( + scan.getFamilyMap().size() > 0 ? + scan.getFamilyMap().keySet() : + getTableDesc().getFamiliesKeys()); + if (hasUnenssentialFamily && scan.doLoadColumnFamiliesOnDemand() + && (scan.getAllowPartialResults() || scan.getBatch() > 0)) { + //See https://issues.apache.org/jira/browse/HBASE-15398 + throw new DoNotRetryIOException("can not setAllowPartailResults(true) or setBatch " + + "when you have a filter that some family is not enssential"); + } + + return getScanner(scan, null, copyCellsFromSharedMem); } protected RegionScanner getScanner(Scan scan, List additionalScanners, @@ -5631,7 +5643,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // partial Result means that we should not reset the filters; filters // should only be reset in // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + if (!scannerContext.partialResultFormed()) { + resetFilters(); + if (!outResults.isEmpty()) { + incrementCountOfRowsScannedMetric(scannerContext); + } + } if (isFilterDoneInternal()) { moreValues = false; @@ -5665,17 +5682,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean populateFromJoinedHeap(List results, ScannerContext scannerContext) throws IOException { - assert joinedContinuationRow != null; - boolean moreValues = populateResult(results, this.joinedHeap, scannerContext, - joinedContinuationRow); + boolean moreValues = populateRowFromHeap(results, this.joinedHeap, scannerContext, + this.joinedContinuationRow, true); if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { // We are done with this row, reset the continuation. - joinedContinuationRow = null; + // As the data is obtained from two independent heaps, we need to + // ensure that result list is sorted, because Result relies on that. + // Or we need response a partial result to client and let client sort them. + sort(results, comparator); } - // As the data is obtained from two independent heaps, we need to - // ensure that result list is sorted, because Result relies on that. - sort(results, comparator); + return moreValues; } @@ -5683,38 +5700,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param scannerContext - * @param currentRowCell + * @param isJoinedHeapOrNoJoinedHeap whether this heap is the last heap in this row * @return state of last call to {@link KeyValueHeap#next()} */ - private boolean populateResult(List results, KeyValueHeap heap, - ScannerContext scannerContext, Cell currentRowCell) throws IOException { + private boolean populateRowFromHeap(List results, KeyValueHeap heap, + ScannerContext scannerContext, Cell currentRowCell, boolean isJoinedHeapOrNoJoinedHeap) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; + while ((nextKv = heap.peek()) != null && compareRows(nextKv, currentRowCell) < 0) { + heap.next(MOCKED_LIST); + } try { do { // We want to maintain any progress that is made towards the limits while scanning across // different column families. To do this, we toggle the keep progress flag on during calls // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - + if (compareRows(nextKv, currentRowCell) == 0) { + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + } nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + boolean mustSetMidRowState = !isJoinedHeapOrNoJoinedHeap || moreCellsInRow; if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); } else if (scannerContext.checkSizeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; + mustSetMidRowState ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } else if (scannerContext.checkTimeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; + mustSetMidRowState ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); @@ -5748,6 +5769,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.filter != null && this.filter.filterAllRemaining(); } + private int compareRows(Cell a, Cell b) { + if (a == null) { + if (b == null) { + return 0; + } else { + return 1; + } + } else { + if (b == null) { + return -1; + } + } + int c = this.comparator.compareRows(a, b); + return (this instanceof ReversedRegionScannerImpl) ? -c : c; + } + + private int compareRows(Cell a, byte[] row) { + if (a == null) { + if (row == null) { + return 0; + } else { + return 1; + } + } else { + if (row == null) { + return -1; + } + } + int c = this.comparator.compareRows(a, row, 0, row.length); + return (this instanceof ReversedRegionScannerImpl) ? -c : c; + } + + /** + * Main logic of region scanner. Should return when we should merge cells to one Result + * for rpc response. + */ private boolean nextInternal(List results, ScannerContext scannerContext) throws IOException { if (!results.isEmpty()) { @@ -5764,19 +5821,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); long initialTimeProgress = scannerContext.getTimeProgress(); - // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { + assert results.isEmpty(); // Starting to scan a new row. Reset the scanner progress according to whether or not // progress should be kept. if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5795,10 +5851,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // Let's see what we have in the storeHeap. - Cell current = this.storeHeap.peek(); - - boolean stopRow = isStopRow(current); // When has filter row is true it means that the all the cells for a particular row must be // read before a filtering decision can be made. This means that filters where hasFilterRow // run the risk of encountering out of memory errors in the case that they are applied to a @@ -5818,36 +5870,50 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); } - // Check if we were getting data from the joinedHeap and hit the limit. - // If not, then it's main path - getting results from storeHeap. - if (joinedContinuationRow == null) { - // First, check if we are at a stop row. If so, there are no more results. - if (stopRow) { - if (hasFilterRow) { - filter.filterRowCells(results); - } - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + // We have two heaps here, storeHeap that should be filtered and joinedHeap that should not. + // There are 4 possible cases: + // 1) joinedHeap == null + // In this case we need only scan storeHeap. + // 2) storeHeap.peek().getRow() <= joinedHeap.peak().getRow() + // In this case we should scan storeHeap until its next cell's row is greater than + // joinedHeap's next cell. + // 3) joinedHeap.peak().getRow() < storeHeap.peek().getRow() + // && joinedHeap's next cell's row is half-read before + // In this case we should scan joinedHeap first until its next cell's row is not less + // than storeHeap's next cell. + // 4) joinedHeap.peak().getRow() < storeHeap.peek().getRow() + // && joinedHeap's next cell's row is not read before + // It means that joinedHeap has a row that storeHeap has not. We need skip this row.(?) + // NOTE: The comparing of row should consider reversed scanning. + + // Let's see what we have in the two heaps. + Cell currentStoreHeapTop = this.storeHeap.peek(); + Cell currentJoinedHeapTop = this.joinedHeap != null ? this.joinedHeap.peek() : null; + + + boolean stopRow = isStopRow(); + if (stopRow) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + if (joinedHeap == null || compareRows(currentStoreHeapTop, currentJoinedHeapTop) <= 0) { + // Case 1 and 2. + + joinedContinuationRow = currentStoreHeapTop; + // we should read form storeHeap until its row is larger than joinedHeap's // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. - if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); - // Typically the count of rows scanned is incremented inside #populateResult. However, - // here we are filtering a row based purely on its row key, preventing us from calling - // #populateResult. Thus, perform the necessary increment here to rows scanned metric - incrementCountOfRowsScannedMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); + if (filterRowKey(currentStoreHeapTop)) { + boolean moreRows = seekToNextRowForTwoHeaps(scannerContext, currentStoreHeapTop); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - results.clear(); continue; } // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, current); - + populateRowFromHeap(results, this.storeHeap, scannerContext, currentStoreHeapTop, + this.joinedHeap == null); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { throw new IncompatibleFilterException( @@ -5857,24 +5923,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } - Cell nextKv = this.storeHeap.peek(); - stopRow = nextKv == null || isStopRow(nextKv); - // save that the row was empty before filters applied to it. - final boolean isEmptyRow = results.isEmpty(); - // We have the part of the row necessary for filtering (all of it, usually). // First filter with the filterRow(List). FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); - // We don't know how the results have changed after being filtered. Must set progress // according to contents of results now. However, a change in the results should not // affect the time progress. Thus preserve whatever time progress has been made long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + scannerContext + .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5885,60 +5945,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); - results.clear(); - boolean moreRows = nextRow(scannerContext, current); + if (results.isEmpty() || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { + boolean moreRows = seekToNextRowForTwoHeaps(scannerContext, currentStoreHeapTop); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - - // This row was totally filtered out, if this is NOT the last row, - // we should continue on. Otherwise, nothing else to do. - if (!stopRow) continue; - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + results.clear(); + continue; } - // Ok, we are done with storeHeap for this row. - // Now we may need to fetch additional, non-essential data into row. - // These values are not needed for filter to work, so we postpone their - // fetch to (possibly) reduce amount of data loads from disk. - if (this.joinedHeap != null) { - boolean mayHaveData = joinedHeapMayHaveData(current); - if (mayHaveData) { - joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); - - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - return true; - } - } - } - } else { - // Populating from the joined heap was stopped by limits, populate some more. + } + + // Ok, we are done with storeHeap for this row. + // Now we may need to fetch additional, non-essential data into row. + // These values are not needed for filter to work, so we postpone their + // fetch to (possibly) reduce amount of data loads from disk. + if (this.joinedHeap != null && joinedContinuationRow != null) { + // Case 3 populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; } } - // We may have just called populateFromJoinedMap and hit the limits. If that is - // the case, we need to call it again on the next next() invocation. - if (joinedContinuationRow != null) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - } - - // Finally, we are done with both joinedHeap and storeHeap. - // Double check to prevent empty rows from appearing in result. It could be - // the case when SingleColumnValueExcludeFilter is used. - if (results.isEmpty()) { - incrementCountOfRowsFilteredMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - if (!stopRow) continue; - } - + stopRow = isStopRow(); if (stopRow) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { @@ -5969,8 +5998,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException { Cell nextJoinedKv = joinedHeap.peek(); - boolean matchCurrentRow = - nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRowCell); + if (nextJoinedKv == null) { + return false; + } + if (currentRowCell == null) { + return true; + } + boolean matchCurrentRow = nextJoinedKv != null + && CellComparator.COMPARATOR.compareRows(nextJoinedKv, currentRowCell) <= 0; boolean matchAfterSeek = false; // If the next value in the joined heap does not match the current row, try to seek to the @@ -6004,25 +6039,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return filter != null && filter.filterRowKey(current); } - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { - assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; + protected boolean seekToNextRowForTwoHeaps(ScannerContext scannerContext, Cell curRowCell) throws IOException { Cell next; - while ((next = this.storeHeap.peek()) != null && - CellUtil.matchingRow(next, curRowCell)) { + while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRow(next, curRowCell)) { this.storeHeap.next(MOCKED_LIST); } + if (this.joinedHeap != null) { + while ((next = this.joinedHeap.peek()) != null + && comparator.compareRows(next, curRowCell) <= 0) { + this.joinedHeap.next(MOCKED_LIST); + } + } resetFilters(); + joinedContinuationRow = null; + incrementCountOfRowsScannedMetric(scannerContext); + incrementCountOfRowsFilteredMetric(scannerContext); // Calling the hook in CP which allows it to do a fast forward - return this.region.getCoprocessorHost() == null - || this.region.getCoprocessorHost() - .postScannerFilterRow(this, curRowCell); + return this.region.getCoprocessorHost() == null || this.region.getCoprocessorHost() + .postScannerFilterRow(this, curRowCell); } - protected boolean isStopRow(Cell currentRowCell) { - return currentRowCell == null - || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow - .length) >= isScan); + protected boolean isStopRow() { + Cell currentStoreHeapCell = this.storeHeap.peek(); + Cell currentJoinHeapCell = this.joinedHeap == null ? null : this.joinedHeap.peek(); + return (currentStoreHeapCell == null || compareRows(currentStoreHeapCell, stopRow) >= isScan) + && (currentJoinHeapCell == null || compareRows(currentJoinHeapCell, stopRow) >= isScan); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 035b2d1..acd53dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ca09cdc..913262b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -58,20 +58,19 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } @Override - protected boolean isStopRow(Cell currentRowCell) { - return currentRowCell == null - || (super.stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, - stopRow.length) <= super.isScan); - } - - @Override - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) + protected boolean seekToNextRowForTwoHeaps(ScannerContext scannerContext, Cell curRowCell) throws IOException { - assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + if (curRowCell == null) { + return false; + } byte[] row = new byte[curRowCell.getRowLength()]; CellUtil.copyRowTo(curRowCell, row, 0); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + if (this.joinedHeap != null) { + this.joinedHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + } resetFilters(); + incrementCountOfRowsScannedMetric(scannerContext); // Calling the hook in CP which allows it to do a fast forward if (this.region.getCoprocessorHost() != null) { return this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index a6f8373..0d9acdc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -37,9 +38,11 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; @@ -829,4 +832,89 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + + public static class EssentialFilter extends FilterBase { + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + return ReturnCode.INCLUDE; + } + + public boolean isFamilyEssential(byte[] cf){ + return Bytes.equals(cf,FAMILIES[1]); + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new EssentialFilter(); + } + + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + try { + assertArrayEquals(row, Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(cf, Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + } catch (AssertionError e) { + throw new AssertionError( + "expected " + Bytes.toString(row) + "/" + Bytes.toString(cf) + ":" + Bytes.toString(cq) + + " but was:" + cell.toString()); + } + } + + @Test + public void testEssentialHeapOrderForCompleteRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForCompleteRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } + + @Test + public void testEssentialHeapOrderForPartialRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForPartialRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + try { + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } catch (DoNotRetryIOException e) { + // expected + return; + } + throw new IOException("we expect there can be not reached"); + } + } \ No newline at end of file -- 2.5.4 (Apple Git-61)