From 6c042e0b8a6020685b9c735b569f00791c7a3ffd Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Fri, 18 Mar 2016 17:00:17 +0800 Subject: [PATCH] HBASE-15398 Cells loss or disorder when using family essential filter and partial scanning protocol --- .../org/apache/hadoop/hbase/filter/Filter.java | 4 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 286 +++++++++++++-------- .../regionserver/ReversedRegionScannerImpl.java | 17 +- .../hbase/TestPartialResultsFromClientSide.java | 141 ++++++++-- 4 files changed, 309 insertions(+), 139 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java index 22ca8ac..f4148ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/Filter.java @@ -238,7 +238,9 @@ public abstract class Filter { * true here. But some could have more sophisticated logic which could significantly reduce * scanning process by not even touching columns until we are 100% sure that it's data is needed * in result. - * + * If your filter doesn't always return true here, its hasFilterRow() must return true, or it + * will be banned by server. (See HBASE-15398) + * * Concrete implementers can signal a failure condition in their code by throwing an * {@link IOException}. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a35b9f1..b12779f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -115,10 +115,12 @@ import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; @@ -2582,8 +2584,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException { - RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem); - return scanner; + return getScanner(scan, null, copyCellsFromSharedMem); } protected RegionScanner getScanner(Scan scan, List additionalScanners, @@ -5488,6 +5489,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean copyCellsFromSharedMem) throws IOException { this.region = region; + if (scan.doLoadColumnFamiliesOnDemand()) { + boolean hasUnenssentialFamily = checkFilterHavingUnenssentialFamily(scan, + scan.getFamilyMap().size() > 0 ? scan.getFamilyMap().keySet() : + region.getTableDesc().getFamiliesKeys()); + if (hasUnenssentialFamily ) { + //See https://issues.apache.org/jira/browse/HBASE-15398 + if (scan.getAllowPartialResults() || scan.getBatch() > 0) { + throw new DoNotRetryIOException("Can not setAllowPartailResults(true) or setBatch " + + "when you have a filter that some family is not enssential"); + } + if (scan.getFilter() != null && !scan.getFilter().hasFilterRow()) { + throw new DoNotRetryIOException("Illegal filter. Can not use filter whose hasFilterRow" + +" return false and has unenssential family"); + } + } + } this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { this.filter = new FilterWrapper(scan.getFilter()); @@ -5637,7 +5654,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // partial Result means that we should not reset the filters; filters // should only be reset in // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + if (!scannerContext.partialResultFormed()) { + resetFilters(); + if (!outResults.isEmpty()) { + incrementCountOfRowsScannedMetric(scannerContext); + } + } if (isFilterDoneInternal()) { moreValues = false; @@ -5671,17 +5693,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean populateFromJoinedHeap(List results, ScannerContext scannerContext) throws IOException { - assert joinedContinuationRow != null; - boolean moreValues = populateResult(results, this.joinedHeap, scannerContext, - joinedContinuationRow); + boolean moreValues = populateRowFromHeap(results, this.joinedHeap, scannerContext, + this.joinedContinuationRow, true); if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { // We are done with this row, reset the continuation. - joinedContinuationRow = null; + // As the data is obtained from two independent heaps, we need to + // ensure that result list is sorted, because Result relies on that. + // Or we need response a partial result to client and let client sort them. + sort(results, comparator); } - // As the data is obtained from two independent heaps, we need to - // ensure that result list is sorted, because Result relies on that. - sort(results, comparator); + return moreValues; } @@ -5689,38 +5711,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param scannerContext - * @param currentRowCell + * @param isJoinedHeapOrNoJoinedHeap whether this heap is the last heap in this row * @return state of last call to {@link KeyValueHeap#next()} */ - private boolean populateResult(List results, KeyValueHeap heap, - ScannerContext scannerContext, Cell currentRowCell) throws IOException { + private boolean populateRowFromHeap(List results, KeyValueHeap heap, + ScannerContext scannerContext, Cell currentRowCell, boolean isJoinedHeapOrNoJoinedHeap) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; + while ((nextKv = heap.peek()) != null && compareRows(nextKv, currentRowCell) < 0) { + heap.next(MOCKED_LIST); + } try { do { // We want to maintain any progress that is made towards the limits while scanning across // different column families. To do this, we toggle the keep progress flag on during calls // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - + if (compareRows(nextKv, currentRowCell) == 0) { + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + } nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + boolean mustSetMidRowState = !isJoinedHeapOrNoJoinedHeap || moreCellsInRow; if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); } else if (scannerContext.checkSizeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; + mustSetMidRowState ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } else if (scannerContext.checkTimeLimit(limitScope)) { ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; + mustSetMidRowState ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); @@ -5754,6 +5780,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.filter != null && this.filter.filterAllRemaining(); } + private int compareRows(Cell a, Cell b) { + if (a == null) { + if (b == null) { + return 0; + } else { + return 1; + } + } else { + if (b == null) { + return -1; + } + } + int c = this.comparator.compareRows(a, b); + return (this instanceof ReversedRegionScannerImpl) ? -c : c; + } + + private int compareRows(Cell a, byte[] row) { + if (a == null) { + if (row == null) { + return 0; + } else { + return 1; + } + } else { + if (row == null) { + return -1; + } + } + int c = this.comparator.compareRows(a, row, 0, row.length); + return (this instanceof ReversedRegionScannerImpl) ? -c : c; + } + + /** + * Main logic of region scanner. Should return when we should merge cells to one Result + * for rpc response. + */ private boolean nextInternal(List results, ScannerContext scannerContext) throws IOException { if (!results.isEmpty()) { @@ -5770,19 +5832,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); long initialTimeProgress = scannerContext.getTimeProgress(); - // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { + assert results.isEmpty(); // Starting to scan a new row. Reset the scanner progress according to whether or not // progress should be kept. if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5801,10 +5862,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // Let's see what we have in the storeHeap. - Cell current = this.storeHeap.peek(); - - boolean stopRow = isStopRow(current); // When has filter row is true it means that the all the cells for a particular row must be // read before a filtering decision can be made. This means that filters where hasFilterRow // run the risk of encountering out of memory errors in the case that they are applied to a @@ -5824,36 +5881,50 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); } - // Check if we were getting data from the joinedHeap and hit the limit. - // If not, then it's main path - getting results from storeHeap. - if (joinedContinuationRow == null) { - // First, check if we are at a stop row. If so, there are no more results. - if (stopRow) { - if (hasFilterRow) { - filter.filterRowCells(results); - } - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + // We have two heaps here, storeHeap that should be filtered and joinedHeap that should not. + // There are 4 possible cases: + // 1) joinedHeap == null + // In this case we need only scan storeHeap. + // 2) storeHeap.peek().getRow() <= joinedHeap.peak().getRow() + // In this case we should scan storeHeap until its next cell's row is greater than + // joinedHeap's next cell. + // 3) joinedHeap.peak().getRow() < storeHeap.peek().getRow() + // && joinedHeap's next cell's row is half-read before + // In this case we should scan joinedHeap first until its next cell's row is not less + // than storeHeap's next cell. + // 4) joinedHeap.peak().getRow() < storeHeap.peek().getRow() + // && joinedHeap's next cell's row is not read before + // It means that joinedHeap has a row that storeHeap has not. We need skip this row.(?) + // NOTE: The comparing of row should consider reversed scanning. + + // Let's see what we have in the two heaps. + Cell currentStoreHeapTop = this.storeHeap.peek(); + Cell currentJoinedHeapTop = this.joinedHeap != null ? this.joinedHeap.peek() : null; + + + boolean stopRow = isStopRow(); + if (stopRow) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + if (joinedHeap == null || compareRows(currentStoreHeapTop, currentJoinedHeapTop) <= 0) { + // Case 1 and 2. + + joinedContinuationRow = currentStoreHeapTop; + // we should read form storeHeap until its row is larger than joinedHeap's // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. - if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); - // Typically the count of rows scanned is incremented inside #populateResult. However, - // here we are filtering a row based purely on its row key, preventing us from calling - // #populateResult. Thus, perform the necessary increment here to rows scanned metric - incrementCountOfRowsScannedMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); + if (filterRowKey(currentStoreHeapTop)) { + boolean moreRows = seekToNextRowForTwoHeaps(scannerContext, currentStoreHeapTop); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - results.clear(); continue; } // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, current); - + populateRowFromHeap(results, this.storeHeap, scannerContext, currentStoreHeapTop, + this.joinedHeap == null); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { throw new IncompatibleFilterException( @@ -5863,24 +5934,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } - Cell nextKv = this.storeHeap.peek(); - stopRow = nextKv == null || isStopRow(nextKv); - // save that the row was empty before filters applied to it. - final boolean isEmptyRow = results.isEmpty(); - // We have the part of the row necessary for filtering (all of it, usually). // First filter with the filterRow(List). FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); - // We don't know how the results have changed after being filtered. Must set progress // according to contents of results now. However, a change in the results should not // affect the time progress. Thus preserve whatever time progress has been made long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); + scannerContext + .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5891,60 +5956,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); - results.clear(); - boolean moreRows = nextRow(scannerContext, current); + if (results.isEmpty() || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { + boolean moreRows = seekToNextRowForTwoHeaps(scannerContext, currentStoreHeapTop); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - - // This row was totally filtered out, if this is NOT the last row, - // we should continue on. Otherwise, nothing else to do. - if (!stopRow) continue; - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + results.clear(); + continue; } - // Ok, we are done with storeHeap for this row. - // Now we may need to fetch additional, non-essential data into row. - // These values are not needed for filter to work, so we postpone their - // fetch to (possibly) reduce amount of data loads from disk. - if (this.joinedHeap != null) { - boolean mayHaveData = joinedHeapMayHaveData(current); - if (mayHaveData) { - joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); - - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - return true; - } - } - } - } else { - // Populating from the joined heap was stopped by limits, populate some more. + } + + // Ok, we are done with storeHeap for this row. + // Now we may need to fetch additional, non-essential data into row. + // These values are not needed for filter to work, so we postpone their + // fetch to (possibly) reduce amount of data loads from disk. + if (this.joinedHeap != null && joinedContinuationRow != null) { + // Case 3 populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; } } - // We may have just called populateFromJoinedMap and hit the limits. If that is - // the case, we need to call it again on the next next() invocation. - if (joinedContinuationRow != null) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - } - - // Finally, we are done with both joinedHeap and storeHeap. - // Double check to prevent empty rows from appearing in result. It could be - // the case when SingleColumnValueExcludeFilter is used. - if (results.isEmpty()) { - incrementCountOfRowsFilteredMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - if (!stopRow) continue; - } - + stopRow = isStopRow(); if (stopRow) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { @@ -5975,8 +6009,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException { Cell nextJoinedKv = joinedHeap.peek(); - boolean matchCurrentRow = - nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRowCell); + if (nextJoinedKv == null) { + return false; + } + if (currentRowCell == null) { + return true; + } + boolean matchCurrentRow = nextJoinedKv != null + && CellComparator.COMPARATOR.compareRows(nextJoinedKv, currentRowCell) <= 0; boolean matchAfterSeek = false; // If the next value in the joined heap does not match the current row, try to seek to the @@ -6010,25 +6050,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return filter != null && filter.filterRowKey(current); } - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { - assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; + protected boolean seekToNextRowForTwoHeaps(ScannerContext scannerContext, Cell curRowCell) throws IOException { Cell next; - while ((next = this.storeHeap.peek()) != null && - CellUtil.matchingRow(next, curRowCell)) { + while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRow(next, curRowCell)) { this.storeHeap.next(MOCKED_LIST); } + if (this.joinedHeap != null) { + while ((next = this.joinedHeap.peek()) != null + && comparator.compareRows(next, curRowCell) <= 0) { + this.joinedHeap.next(MOCKED_LIST); + } + } resetFilters(); + joinedContinuationRow = null; + incrementCountOfRowsScannedMetric(scannerContext); + incrementCountOfRowsFilteredMetric(scannerContext); // Calling the hook in CP which allows it to do a fast forward - return this.region.getCoprocessorHost() == null - || this.region.getCoprocessorHost() - .postScannerFilterRow(this, curRowCell); + return this.region.getCoprocessorHost() == null || this.region.getCoprocessorHost() + .postScannerFilterRow(this, curRowCell); } - protected boolean isStopRow(Cell currentRowCell) { - return currentRowCell == null - || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow - .length) >= isScan); + protected boolean isStopRow() { + Cell currentStoreHeapCell = this.storeHeap.peek(); + Cell currentJoinHeapCell = this.joinedHeap == null ? null : this.joinedHeap.peek(); + return (currentStoreHeapCell == null || compareRows(currentStoreHeapCell, stopRow) >= isScan) + && (currentJoinHeapCell == null || compareRows(currentJoinHeapCell, stopRow) >= isScan); } @Override @@ -6110,6 +6157,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // callback this.close(); } + + private boolean checkFilterHavingUnenssentialFamily(Scan scan, Set familySet) throws IOException { + if (scan.getFilter() == null) { + return false; + } + Filter filter = scan.getFilter(); + for (byte[] family : familySet) { + if(!filter.isFamilyEssential(family)){ + return true; + } + } + return false; + } } // Utility methods diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ca09cdc..913262b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -58,20 +58,19 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } @Override - protected boolean isStopRow(Cell currentRowCell) { - return currentRowCell == null - || (super.stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, - stopRow.length) <= super.isScan); - } - - @Override - protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) + protected boolean seekToNextRowForTwoHeaps(ScannerContext scannerContext, Cell curRowCell) throws IOException { - assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + if (curRowCell == null) { + return false; + } byte[] row = new byte[curRowCell.getRowLength()]; CellUtil.copyRowTo(curRowCell, row, 0); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + if (this.joinedHeap != null) { + this.joinedHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + } resetFilters(); + incrementCountOfRowsScannedMetric(scannerContext); // Calling the hook in CP which allows it to do a fast forward if (this.region.getCoprocessorHost() != null) { return this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index c6a2525..069b3d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; @@ -833,10 +834,25 @@ public class TestPartialResultsFromClientSide { } } - private void moveRegion(Table table, int index) throws IOException{ - List> regions = MetaTableAccessor - .getTableRegionsAndLocations(TEST_UTIL.getConnection(), - table.getName()); + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + try { + assertArrayEquals(row, Bytes.copy(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + assertArrayEquals(cf, Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())); + } catch (AssertionError e) { + throw new AssertionError( + "expected " + Bytes.toString(row) + "/" + Bytes.toString(cf) + ":" + Bytes.toString(cq) + + " but was:" + cell.toString()); + } + } + + private void moveRegion(Table table, int index) throws IOException { + List> regions = + MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), table.getName()); assertEquals(1, regions.size()); HRegionInfo regionInfo = regions.get(0).getFirst(); ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName(); @@ -844,19 +860,11 @@ public class TestPartialResultsFromClientSide { Bytes.toBytes(name.getServerName())); } - private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { - assertArrayEquals(row, - Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); - assertArrayEquals(cf, - Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); - assertArrayEquals(cq, - Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); - } - @Test public void testPartialResultWhenRegionMove() throws IOException { - Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), - ROWS, FAMILIES, QUALIFIERS, VALUE); + Table table = + createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), ROWS, FAMILIES, + QUALIFIERS, VALUE); moveRegion(table, 1); @@ -917,7 +925,7 @@ public class TestPartialResultsFromClientSide { Result result2 = scanner.next(); assertEquals(1, result2.rawCells().length); Cell c2 = result2.rawCells()[0]; - assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertCell(c2, ROWS[NUM_ROWS - 2], FAMILIES[0], QUALIFIERS[0]); assertTrue(result2.isPartial()); moveRegion(table, 3); @@ -1039,5 +1047,106 @@ public class TestPartialResultsFromClientSide { assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); } + public static class EssentialFilter extends FilterBase { + + @Override public ReturnCode filterKeyValue(Cell v) throws IOException { + return ReturnCode.INCLUDE; + } + + public boolean isFamilyEssential(byte[] cf) { + return Bytes.equals(cf, FAMILIES[1]); + } + + public boolean hasFilterRow() { + return true; + } + + public static Filter parseFrom(final byte[] pbBytes) { + return new EssentialFilter(); + } + + } + + public static class CellLevelEnssentialFilter extends EssentialFilter { + + public boolean hasFilterRow() { + return false; + } + + public static Filter parseFrom(final byte[] pbBytes) { + return new CellLevelEnssentialFilter(); + } + } + + @Test + public void testEssentialHeapOrderForCompleteRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForCompleteRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } + + @Test(expected = DoNotRetryIOException.class) + public void testEssentialHeapOrderForPartialRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForPartialRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } + + @Test(expected = DoNotRetryIOException.class) + public void testCellLevelEssentialFilterBanned() throws IOException { + Table table = + createTestTable(TableName.valueOf("testCellLevelEssentialFilterBanned"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new CellLevelEnssentialFilter()); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } } \ No newline at end of file -- 2.5.4 (Apple Git-61)