From 8243cb1f7569d1c0fbf311fdb3d39d29d256a9e9 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Mon, 7 Mar 2016 19:47:31 +0800 Subject: [PATCH] HBASE-15398 Cells loss or disorder when using family essential filter and partial scanning protocol --- .../apache/hadoop/hbase/client/ClientScanner.java | 32 +++- .../org/apache/hadoop/hbase/client/Result.java | 12 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 183 ++++++++++++--------- .../regionserver/ReversedRegionScannerImpl.java | 6 + .../hbase/TestPartialResultsFromClientSide.java | 85 +++++++++- 5 files changed, 231 insertions(+), 87 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5b..9a7edfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; @@ -44,7 +45,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.NavigableSet; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -97,6 +101,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final int primaryOperationTimeout; private int retries; protected final ExecutorService pool; + protected boolean hasEssentialFilter = false; /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -157,7 +162,24 @@ public abstract class ClientScanner extends AbstractClientScanner { protected abstract void initCache(); - protected void initializeScannerInConstruction() throws IOException{ + private boolean checkEssentialFilter() throws IOException { + if (this.scan.getFilter() == null) { + return false; + } + Filter filter = this.scan.getFilter(); + Set familySet = this.scan.getFamilyMap().isEmpty() ? + getConnection().getAdmin().getTableDescriptor(this.tableName).getFamiliesKeys() : + this.scan.getFamilyMap().keySet(); + for (byte[] family : familySet) { + if(!filter.isFamilyEssential(family)){ + return true; + } + } + return false; + } + + protected void initializeScannerInConstruction() throws IOException { + hasEssentialFilter = checkEssentialFilter(); // initialize the scanner nextScanner(this.caching, false); } @@ -587,7 +609,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // implication of treating batching as equivalent to partial results is that it is possible // the caller will receive a result back where the number of cells in the result is less than // the batch size even though it may not be the last group of cells for that row. - if (allowPartials || isBatchSet) { + if (!hasEssentialFilter && (allowPartials || isBatchSet)) { addResultsToList(resultsToAddToCache, resultsFromServer, 0, (null == resultsFromServer ? 0 : resultsFromServer.length)); return resultsToAddToCache; @@ -601,7 +623,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(Result.createCompleteResult(partialResults, hasEssentialFilter)); clearPartialResults(); } @@ -658,7 +680,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the result is not a partial, it is a signal to us that it is the last Result we // need to form the complete Result client-side if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(Result.createCompleteResult(partialResults, hasEssentialFilter)); clearPartialResults(); } } else { @@ -666,7 +688,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // far. If our list of partials isn't empty, this is a signal to form the complete Result // since the row has now changed if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(Result.createCompleteResult(partialResults, hasEssentialFilter)); clearPartialResults(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..ad00a64 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,6 +24,7 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -734,7 +735,8 @@ public class Result implements CellScannable, CellScanner { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("keyvalues="); + sb.append("isPartail="+isPartial()); + sb.append(" keyvalues="); if(isEmpty()) { sb.append("NONE"); return sb.toString(); @@ -788,7 +790,7 @@ public class Result implements CellScannable, CellScanner { * @throws IOException A complete result cannot be formed because the results in the partial list * come from different rows */ - public static Result createCompleteResult(List partialResults) + public static Result createCompleteResult(List partialResults, boolean shouldSort) throws IOException { List cells = new ArrayList(); boolean stale = false; @@ -828,7 +830,11 @@ public class Result implements CellScannable, CellScanner { } } } - + if (shouldSort) { + // COMPARATOR is ok because META_COMPARATOR only differ on compareRows + // and we have same row for cells. + Collections.sort(cells, CellComparator.COMPARATOR); + } return Result.create(cells, null, stale); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c090b54..48184d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5667,7 +5667,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { assert joinedContinuationRow != null; boolean moreValues = populateResult(results, this.joinedHeap, scannerContext, - joinedContinuationRow); + joinedContinuationRow, true); if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { // We are done with this row, reset the continuation. @@ -5685,10 +5685,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. * @param scannerContext * @param currentRowCell + * @param isJoinedHeapOrNoJoinedHeap whether this heap is the last heap in this row * @return state of last call to {@link KeyValueHeap#next()} */ private boolean populateResult(List results, KeyValueHeap heap, - ScannerContext scannerContext, Cell currentRowCell) throws IOException { + ScannerContext scannerContext, Cell currentRowCell, + boolean isJoinedHeapOrNoJoinedHeap) throws IOException { Cell nextKv; boolean moreCellsInRow = false; boolean tmpKeepProgress = scannerContext.getKeepProgress(); @@ -5704,7 +5706,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi scannerContext.setKeepProgress(tmpKeepProgress); nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + moreCellsInRow = !isJoinedHeapOrNoJoinedHeap || moreCellsInRow(nextKv, currentRowCell); if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); @@ -5796,9 +5798,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Let's see what we have in the storeHeap. - Cell current = this.storeHeap.peek(); - - boolean stopRow = isStopRow(current); + Cell currentStoreHeapTop = this.storeHeap.peek(); + Cell currentJoinedHeapTop = this.joinedHeap != null ? this.joinedHeap.peek() : null; + if (joinedContinuationRow != null) { + if (currentJoinedHeapTop == null || + !CellUtil.matchingRows(currentJoinedHeapTop, joinedContinuationRow)) { + // joinedContinuationRow or even joined heap has done. + joinedContinuationRow = null; + } + } + boolean stopRow = isStopRow(currentStoreHeapTop); // When has filter row is true it means that the all the cells for a particular row must be // read before a filtering decision can be made. This means that filters where hasFilterRow // run the risk of encountering out of memory errors in the case that they are applied to a @@ -5821,92 +5830,101 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Check if we were getting data from the joinedHeap and hit the limit. // If not, then it's main path - getting results from storeHeap. if (joinedContinuationRow == null) { - // First, check if we are at a stop row. If so, there are no more results. - if (stopRow) { - if (hasFilterRow) { - filter.filterRowCells(results); + if ((currentStoreHeapTop != null) + && (currentJoinedHeapTop == null || + (this instanceof ReversedRegionScannerImpl ? + CellComparator.COMPARATOR.compareRows(currentStoreHeapTop, currentJoinedHeapTop) + >= 0: + CellComparator.COMPARATOR.compareRows(currentStoreHeapTop, currentJoinedHeapTop) + <= 0))) { + // First, check if we are at a stop row. If so, there are no more results. + if (stopRow) { + if (hasFilterRow) { + filter.filterRowCells(results); + } + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } - // Check if rowkey filter wants to exclude this row. If so, loop to next. - // Technically, if we hit limits before on this row, we don't need this call. - if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); - // Typically the count of rows scanned is incremented inside #populateResult. However, - // here we are filtering a row based purely on its row key, preventing us from calling - // #populateResult. Thus, perform the necessary increment here to rows scanned metric - incrementCountOfRowsScannedMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + // Check if rowkey filter wants to exclude this row. If so, loop to next. + // Technically, if we hit limits before on this row, we don't need this call. + if (filterRowKey(currentStoreHeapTop)) { + incrementCountOfRowsFilteredMetric(scannerContext); + // Typically the count of rows scanned is incremented inside #populateResult. However, + // here we are filtering a row based purely on its row key, preventing us from calling + // #populateResult. Thus, perform the necessary increment here to rows scanned metric + incrementCountOfRowsScannedMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, currentStoreHeapTop); + if (!moreRows) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + results.clear(); + continue; } - results.clear(); - continue; - } - // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, current); + // Ok, we are good, let's try to get some results from the main heap. + populateResult(results, this.storeHeap, scannerContext, currentStoreHeapTop, + this.joinedHeap == null); - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - if (hasFilterRow) { - throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scans that must " - + " stop mid-row because of a limit. ScannerContext:" + scannerContext); + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + if (hasFilterRow) { + throw new IncompatibleFilterException( + "Filter whose hasFilterRow() returns true is incompatible with scans that must " + + " stop mid-row because of a limit. ScannerContext:" + scannerContext); + } + return true; } - return true; - } - Cell nextKv = this.storeHeap.peek(); - stopRow = nextKv == null || isStopRow(nextKv); - // save that the row was empty before filters applied to it. - final boolean isEmptyRow = results.isEmpty(); - - // We have the part of the row necessary for filtering (all of it, usually). - // First filter with the filterRow(List). - FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; - if (hasFilterRow) { - ret = filter.filterRowCellsWithRet(results); - - // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. However, a change in the results should not - // affect the time progress. Thus preserve whatever time progress has been made - long timeProgress = scannerContext.getTimeProgress(); - if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialTimeProgress); - } else { - scannerContext.clearProgress(); - } - scannerContext.setTimeProgress(timeProgress); - scannerContext.incrementBatchProgress(results.size()); - for (Cell cell : results) { - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + Cell nextKv = this.storeHeap.peek(); + stopRow = nextKv == null || isStopRow(nextKv); + // save that the row was empty before filters applied to it. + final boolean isEmptyRow = results.isEmpty(); + + // We have the part of the row necessary for filtering (all of it, usually). + // First filter with the filterRow(List). + FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; + if (hasFilterRow) { + ret = filter.filterRowCellsWithRet(results); + + // We don't know how the results have changed after being filtered. Must set progress + // according to contents of results now. However, a change in the results should not + // affect the time progress. Thus preserve whatever time progress has been made + long timeProgress = scannerContext.getTimeProgress(); + if (scannerContext.getKeepProgress()) { + scannerContext + .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); + } else { + scannerContext.clearProgress(); + } + scannerContext.setTimeProgress(timeProgress); + scannerContext.incrementBatchProgress(results.size()); + for (Cell cell : results) { + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); + } } - } - if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); - results.clear(); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { + if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { + incrementCountOfRowsFilteredMetric(scannerContext); + results.clear(); + boolean moreRows = nextRow(scannerContext, currentStoreHeapTop); + if (!moreRows) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } + + // This row was totally filtered out, if this is NOT the last row, + // we should continue on. Otherwise, nothing else to do. + if (!stopRow) + continue; return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - - // This row was totally filtered out, if this is NOT the last row, - // we should continue on. Otherwise, nothing else to do. - if (!stopRow) continue; - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - // Ok, we are done with storeHeap for this row. // Now we may need to fetch additional, non-essential data into row. // These values are not needed for filter to work, so we postpone their // fetch to (possibly) reduce amount of data loads from disk. if (this.joinedHeap != null) { - boolean mayHaveData = joinedHeapMayHaveData(current); + boolean mayHaveData = joinedHeapMayHaveData(currentStoreHeapTop); if (mayHaveData) { - joinedContinuationRow = current; + joinedContinuationRow = currentJoinedHeapTop; populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { @@ -5932,7 +5950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { incrementCountOfRowsFilteredMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); + boolean moreRows = nextRow(scannerContext, currentStoreHeapTop); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5969,8 +5987,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException { Cell nextJoinedKv = joinedHeap.peek(); - boolean matchCurrentRow = - nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRowCell); + if (nextJoinedKv == null) { + return false; + } + if (currentRowCell == null) { + return true; + } + boolean matchCurrentRow = nextJoinedKv != null + && CellComparator.COMPARATOR.compareRows(nextJoinedKv, currentRowCell) <= 0; boolean matchAfterSeek = false; // If the next value in the joined heap does not match the current row, try to seek to the @@ -6011,6 +6035,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi CellUtil.matchingRow(next, curRowCell)) { this.storeHeap.next(MOCKED_LIST); } + if (this.joinedHeap != null) { + while ((next = this.joinedHeap.peek()) != null && CellUtil.matchingRow(next, curRowCell)) { + this.joinedHeap.next(MOCKED_LIST); + } + } resetFilters(); // Calling the hook in CP which allows it to do a fast forward diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ca09cdc..db5b3e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -67,10 +67,16 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { @Override protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { + if (curRowCell == null) { + return false; + } assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; byte[] row = new byte[curRowCell.getRowLength()]; CellUtil.copyRowTo(curRowCell, row, 0); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + if (this.joinedHeap != null) { + this.joinedHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); + } resetFilters(); // Calling the hook in CP which allows it to do a fast forward if (this.region.getCoprocessorHost() != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index a6f8373..5e0db9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; @@ -463,7 +465,7 @@ public class TestPartialResultsFromClientSide { partials.add(partialResult); } while (partialResult != null && partialResult.isPartial()); - completeResult = Result.createCompleteResult(partials); + completeResult = Result.createCompleteResult(partials, false); oneShotResult = oneShotScanner.next(); compareResults(completeResult, oneShotResult, null); @@ -496,7 +498,7 @@ public class TestPartialResultsFromClientSide { assertFalse(Bytes.equals(r1.getRow(), r2.getRow())); try { - Result.createCompleteResult(partials); + Result.createCompleteResult(partials, false); fail("r1 and r2 are from different rows. It should not be possible to combine them into" + " a single result"); } catch (IOException e) { @@ -829,4 +831,83 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + + public static class EssentialFilter extends FilterBase { + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + return ReturnCode.INCLUDE; + } + + public boolean isFamilyEssential(byte[] cf){ + return Bytes.equals(cf,FAMILIES[1]); + } + + public static Filter parseFrom(final byte [] pbBytes){ + return new EssentialFilter(); + } + + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + try { + assertArrayEquals(row, Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(cf, Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + } catch (AssertionError e) { + throw new AssertionError( + "expected " + Bytes.toString(row) + "/" + Bytes.toString(cf) + ":" + Bytes.toString(cq) + + " but was:" + cell.toString()); + } + } + + @Test + public void testEssentialHeapOrderForCompleteRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForCompleteRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } + + @Test + public void testEssentialHeapOrderForPartialRow() throws IOException { + Table table = + createTestTable(TableName.valueOf("testEssentialHeapOrderForPartialRow"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + Scan scan = new Scan(); + scan.setFilter(new EssentialFilter()); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_ROWS; i++) { + Result result = scanner.next(); + assertFalse(result.isPartial()); + Cell[] row = result.rawCells(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, row.length); + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k++) { + assertCell(row[j * NUM_FAMILIES + k], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + } + } + } + assertTrue(scanner.next() == null); + } + } \ No newline at end of file -- 2.5.4 (Apple Git-61)