From d9e95fc341c34f7caf723bf29b36263753439fe6 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Tue, 24 Mar 2015 15:52:46 -0700 Subject: [PATCH] HBASE-11544 [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME --- .../hbase/client/ClientSideRegionScanner.java | 8 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 219 ++++++------ .../hadoop/hbase/regionserver/InternalScanner.java | 200 ++--------- .../hadoop/hbase/regionserver/KeyValueHeap.java | 30 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 49 ++- .../hadoop/hbase/regionserver/RegionScanner.java | 42 +-- .../hadoop/hbase/regionserver/ScannerContext.java | 387 +++++++++++++++++++++ .../hadoop/hbase/regionserver/StoreFlusher.java | 6 +- .../hadoop/hbase/regionserver/StoreScanner.java | 71 ++-- .../hbase/regionserver/compactions/Compactor.java | 6 +- .../hbase/security/access/AccessController.java | 6 +- .../hbase/TestPartialResultsFromClientSide.java | 8 +- .../coprocessor/TestCoprocessorInterface.java | 19 +- .../coprocessor/TestRegionObserverInterface.java | 12 +- .../hbase/regionserver/TestAtomicOperation.java | 6 +- .../hadoop/hbase/regionserver/TestHRegion.java | 6 +- .../hbase/regionserver/TestStripeCompactor.java | 12 +- .../compactions/TestStripeCompactionPolicy.java | 14 +- 18 files changed, 639 insertions(+), 462 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index a80a07e..bd752d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,9 +29,11 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.mortbay.log.Log; /** @@ -72,10 +73,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public Result next() throws IOException { values.clear(); - // negative values indicate no limits - final long remainingResultSize = -1; - final int batchLimit = -1; - scanner.nextRaw(values, batchLimit, remainingResultSize); + scanner.nextRaw(values, ScannerContext.NO_LIMIT); if (values.isEmpty()) { //we are done return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3b1f267..6c3b94e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -142,6 +142,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; @@ -5384,6 +5385,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected final byte[] stopRow; private final FilterWrapper filter; private int batch; + private ScannerContext scannerContext; protected int isScan; private boolean filterClosed = false; private long readPt; @@ -5407,6 +5409,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } this.batch = scan.getBatch(); + scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(batch); + if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { @@ -5485,16 +5490,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public NextState next(List outResults) throws IOException { // apply the batching limit by default - return next(outResults, batch); + return next(outResults, scannerContext); } @Override - public NextState next(List outResults, int limit) throws IOException { - return next(outResults, limit, -1); - } - - @Override - public synchronized NextState next(List outResults, int limit, long remainingResultSize) + public synchronized NextState next(List outResults, ScannerContext scannerContext) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + @@ -5504,7 +5504,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { - return nextRaw(outResults, limit, remainingResultSize); + return nextRaw(outResults, scannerContext); } finally { closeRegionOperation(Operation.SCAN); } @@ -5512,17 +5512,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // @Override public NextState nextRaw(List outResults) throws IOException { - return nextRaw(outResults, batch); + // Use the RegionScanner's context by default + return nextRaw(outResults, scannerContext); } @Override - public NextState nextRaw(List outResults, int limit) - throws IOException { - return nextRaw(outResults, limit, -1); - } - - @Override - public NextState nextRaw(List outResults, int batchLimit, long remainingResultSize) + public NextState nextRaw(List outResults, ScannerContext scannerContext) throws IOException { if (storeHeap == null) { // scanner is closed @@ -5532,10 +5527,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - state = nextInternal(outResults, batchLimit, remainingResultSize); + state = nextInternal(outResults, scannerContext); } else { List tmpList = new ArrayList(); - state = nextInternal(tmpList, batchLimit, remainingResultSize); + state = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } // Invalid states should never be returned. Receiving an invalid state means that we have @@ -5547,10 +5542,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows - if (!state.sizeLimitReached()) resetFilters(); + if (state != NextState.SIZE_LIMIT_REACHED_MID_ROW) resetFilters(); if (isFilterDoneInternal()) { - state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); + state = NextState.NO_MORE_VALUES; } return state; } @@ -5559,14 +5554,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @return the state the joinedHeap returned on the call to * {@link KeyValueHeap#next(List, int, long)} */ - private NextState populateFromJoinedHeap(List results, int limit, long resultSize) + private NextState populateFromJoinedHeap(List results, ScannerContext scannerContext) throws IOException { assert joinedContinuationRow != null; NextState state = - populateResult(results, this.joinedHeap, limit, resultSize, + populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength()); - if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) { + if (!state.limitReached()) { // We are done with this row, reset the continuation. joinedContinuationRow = null; } @@ -5580,46 +5575,40 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param remainingResultSize The remaining space within our result size limit. A negative value - * indicate no limit - * @param batchLimit Max amount of KVs to place in result list, -1 means no limit. + * @param scannerContext * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ - private NextState populateResult(List results, KeyValueHeap heap, int batchLimit, - long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException { + private NextState populateResult(List results, KeyValueHeap heap, + ScannerContext scannerContext, byte[] currentRow, int offset, short length) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; - long accumulatedResultSize = 0; - List tmpResults = new ArrayList(); + boolean tmpKeepProgress = scannerContext.keepProgress(); + // Scanning between column families and thus the scope is between cells + LimitScope limitScope = LimitScope.BETWEEN_CELLS; do { - int remainingBatchLimit = batchLimit - results.size(); - NextState heapState = - heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize); - results.addAll(tmpResults); - accumulatedResultSize += calculateResultSize(tmpResults, heapState); - tmpResults.clear(); - - if (batchLimit > 0 && results.size() == batchLimit) { - return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize); - } + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - boolean sizeLimitReached = - remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize; - if (moreCellsInRow && sizeLimitReached) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize); + + if (scannerContext.checkBatchLimit(limitScope)) { + return NextState.BATCH_LIMIT_REACHED; + } else if (scannerContext.checkSizeLimit(limitScope)) { + return moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW + : NextState.SIZE_LIMIT_REACHED; } } while (moreCellsInRow); - if (nextKv != null) { - return NextState.makeState(NextState.State.MORE_VALUES, accumulatedResultSize); - } else { - return NextState.makeState(NextState.State.NO_MORE_VALUES, accumulatedResultSize); - } + return nextKv != null ? NextState.MORE_VALUES : NextState.NO_MORE_VALUES; } /** @@ -5637,30 +5626,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length); } - /** - * Calculates the size of the results. If the state of the scanner that these results came from - * indicates that an estimate of the result size has already been generated, we can skip the - * calculation and use that instead. - * @param results List of cells we want to calculate size of - * @param state The state returned from the scanner that generated these results - * @return aggregate size of results - */ - private long calculateResultSize(List results, NextState state) { - if (results == null || results.isEmpty()) return 0; - - // In general, the state should contain the estimate because the result size used to - // determine when the scan has exceeded its size limit. If the estimate is contained in the - // state then we can avoid an unnecesasry calculation. - if (state != null && state.hasResultSizeEstimate()) return state.getResultSize(); - - long size = 0; - for (Cell c : results) { - size += CellUtil.estimatedHeapSizeOf(c); - } - - return size; - } - /* * @return True if a filter rules the scanner is over, done. */ @@ -5673,20 +5638,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.filter != null && this.filter.filterAllRemaining(); } - private NextState nextInternal(List results, int batchLimit, long remainingResultSize) + private NextState nextInternal(List results, ScannerContext scannerContext) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } - // Estimate of the size (heap size) of the results returned from this method - long resultSize = 0; + if (scannerContext == null) { + throw new IllegalArgumentException("Scanner context cannot be null"); + } RpcCallContext rpcCall = RpcServer.getCurrentCall(); + + // Save the initial progress from the Scanner context in these local variables. The progress + // may need to be reset a few times if rows are being filtered out so we save the initial + // progress. + int initialBatchProgress = scannerContext.getBatchProgress(); + long initialSizeProgress = scannerContext.getSizeProgress(); + // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { + // Starting to scan a new row. Reset the scanner progress according to whether or not + // progress should be kept. + if (scannerContext.keepProgress()) { + // Progress should be kept. Reset to initial values seen at start of method invocation. + scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + } else { + scannerContext.clearProgress(); + } + if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side @@ -5718,14 +5700,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // If filter#hasFilterRow is true, partial results are not allowed since allowing them // would prevent the filters from being evaluated. Thus, if it is true, change the - // remainingResultSize to -1 so that the entire row's worth of cells are fetched. - if (hasFilterRow && remainingResultSize > 0) { - remainingResultSize = -1; + // scope of any limits that could potentially create partial results to + // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row + if (hasFilterRow) { if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + - " formed. The remainingResultSize of: " + remainingResultSize + " will not " + - " be considered when fetching the cells for this row."); + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. Changing scope of limits that may create partials"); } + scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); } NextState joinedHeapState; @@ -5737,22 +5719,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (hasFilterRow) { filter.filterRowCells(results); } - return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + return NextState.NO_MORE_VALUES; } // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + if (!moreRows) return NextState.NO_MORE_VALUES; results.clear(); continue; } NextState storeHeapState = - populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow, - offset, length); - resultSize += calculateResultSize(results, storeHeapState); + populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); + // Invalid states should never be returned. If one is seen, throw exception // since we have no way of telling how we should proceed if (!NextState.isValidState(storeHeapState)) { @@ -5760,24 +5741,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // Ok, we are good, let's try to get some results from the main heap. - if (storeHeapState.batchLimitReached()) { + if (storeHeapState.limitReached()) { if (hasFilterRow) { throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); + "Filter whose hasFilterRow() returns true is incompatible with scans that must " + + " stop mid-row because of a limit. State:" + storeHeapState); } - // We hit the batch limit. - return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize); - } else if (storeHeapState.sizeLimitReached()) { - if (hasFilterRow) { - // We try to guard against this case above when remainingResultSize is set to -1 if - // hasFilterRow is true. In the even that the guard doesn't work, an exception must be - // thrown - throw new IncompatibleFilterException( - "Filter whose hasFilterRows() returns true is incompatible with scans that" - + " return partial results"); - } - // We hit the size limit. - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + return storeHeapState; } Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || @@ -5790,17 +5760,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); + + // We don't know how the results have changed after being filtered. Must set progress + // according to contents of results now. + if (scannerContext.keepProgress()) { + scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + } else { + scannerContext.clearProgress(); + } + scannerContext.incrementBatchProgress(results.size()); + for (Cell cell : results) { + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); + } } if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { results.clear(); boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + if (!moreRows) return NextState.NO_MORE_VALUES; // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; - return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + return NextState.NO_MORE_VALUES; } // Ok, we are done with storeHeap for this row. @@ -5818,31 +5800,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // currentRow, offset, length)); if (mayHaveData) { joinedContinuationRow = current; - joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); - resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + joinedHeapState = populateFromJoinedHeap(results, scannerContext); + + if (joinedHeapState.limitReached()) { + return joinedHeapState; } } } } else { // Populating from the joined heap was stopped by limits, populate some more. - joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); - resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + joinedHeapState = populateFromJoinedHeap(results, scannerContext); + if (joinedHeapState.limitReached()) { + return joinedHeapState; } } // We may have just called populateFromJoinedMap and hit the limits. If that is // the case, we need to call it again on the next next() invocation. if (joinedContinuationRow != null) { - return NextState.makeState(NextState.State.MORE_VALUES, resultSize); + return NextState.MORE_VALUES; } // Finally, we are done with both joinedHeap and storeHeap. @@ -5850,15 +5825,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + if (!moreRows) return NextState.NO_MORE_VALUES; if (!stopRow) continue; } // We are done. Return the result. if (stopRow) { - return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + return NextState.NO_MORE_VALUES; } else { - return NextState.makeState(NextState.State.MORE_VALUES, resultSize); + return NextState.MORE_VALUES; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index ea5a75f..614c7a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -41,186 +41,50 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface InternalScanner extends Closeable { + /** - * This class encapsulates all the meaningful state information that we would like the know about - * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on - * the possible states is implied through the exposed {@link #makeState(State)} method. + * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)} */ - public static class NextState { - /** - * The possible states we want to restrict ourselves to. This enum is not sufficient to - * encapsulate all of the state information since some of the fields of the state must be - * dynamic (e.g. resultSize). - */ - public enum State { - MORE_VALUES(true), - NO_MORE_VALUES(false), - SIZE_LIMIT_REACHED(true), - BATCH_LIMIT_REACHED(true); - - private boolean moreValues; - - private State(final boolean moreValues) { - this.moreValues = moreValues; - } - - /** - * @return true when the state indicates that more values may follow those that have been - * returned - */ - public boolean hasMoreValues() { - return this.moreValues; - } - } - - /** - * state variables - */ - private final State state; - private long resultSize; + public enum NextState { + MORE_VALUES(true, false), + NO_MORE_VALUES(false, false), + SIZE_LIMIT_REACHED(true, true), /** - * Value to use for resultSize when the size has not been calculated. Must be a negative number - * so that {@link NextState#hasResultSizeEstimate()} returns false. + * Special case of size limit reached to indicate that the size limit was reached in the middle + * of a row and thus a partial results was formed */ - private static final long DEFAULT_RESULT_SIZE = -1; - - private NextState(State state, long resultSize) { - this.state = state; - this.resultSize = resultSize; - } + SIZE_LIMIT_REACHED_MID_ROW(true, true), + BATCH_LIMIT_REACHED(true, true); - /** - * @param state - * @return An instance of {@link NextState} where the size of the results returned from the call - * to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the - * caller of {@link InternalScanner#next(List)} to calculate the result size if needed - */ - public static NextState makeState(final State state) { - return makeState(state, DEFAULT_RESULT_SIZE); + private boolean moreValues; + private boolean limitReached; + private NextState(boolean moreValues, boolean limitReached) { + this.moreValues = moreValues; + this.limitReached = limitReached; } /** - * @param state - * @param resultSize - * @return An instance of {@link NextState} where the size of the values returned from the call - * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating - * the result size by using the cached value retrievable via {@link #getResultSize()} - */ - public static NextState makeState(final State state, long resultSize) { - switch (state) { - case MORE_VALUES: - return createMoreValuesState(resultSize); - case NO_MORE_VALUES: - return createNoMoreValuesState(resultSize); - case BATCH_LIMIT_REACHED: - return createBatchLimitReachedState(resultSize); - case SIZE_LIMIT_REACHED: - return createSizeLimitReachedState(resultSize); - default: - // If the state is not recognized, default to no more value state - return createNoMoreValuesState(resultSize); - } - } - - /** - * Convenience method for creating a state that indicates that more values can be scanned - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createMoreValuesState(long resultSize) { - return new NextState(State.MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that no more values can be scanned. - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createNoMoreValuesState(long resultSize) { - return new NextState(State.NO_MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped because the - * batch limit was exceeded - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createBatchLimitReachedState(long resultSize) { - return new NextState(State.BATCH_LIMIT_REACHED, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped due to the size - * limit - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createSizeLimitReachedState(long resultSize) { - return new NextState(State.SIZE_LIMIT_REACHED, resultSize); - } - - /** - * @return true when the scanner has more values to be scanned following the values returned by - * the call to {@link InternalScanner#next(List)} + * @return true when the state indicates that more values may follow those that have been + * returned */ public boolean hasMoreValues() { - return this.state.hasMoreValues(); - } - - /** - * @return true when the scanner had to stop scanning because it reached the batch limit - */ - public boolean batchLimitReached() { - return this.state == State.BATCH_LIMIT_REACHED; - } - - /** - * @return true when the scanner had to stop scanning because it reached the size limit - */ - public boolean sizeLimitReached() { - return this.state == State.SIZE_LIMIT_REACHED; - } - - /** - * @return The size (heap size) of the values that were returned from the call to - * {@link InternalScanner#next(List)}. This value should only be used if - * {@link #hasResultSizeEstimate()} returns true. - */ - public long getResultSize() { - return resultSize; + return this.moreValues; } /** - * @return true when an estimate for the size of the values returned by - * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility - * of the caller to calculate the result size + * @return true when the state indicates that a limit has been reached and scan should stop */ - public boolean hasResultSizeEstimate() { - return resultSize >= 0; + public boolean limitReached() { + return this.limitReached; } - - @Override - public String toString() { - return "State: " + state + " resultSize: " + resultSize; - } - - /** - * Helper method to centralize all checks as to whether or not the state is valid. - * @param state - * @return true when the state is valid - */ + public static boolean isValidState(NextState state) { return state != null; } - /** - * @param state - * @return true when the state is non null and indicates that more values exist - */ public static boolean hasMoreValues(NextState state) { - return state != null && state.hasMoreValues(); + return isValidState(state) && state.hasMoreValues(); } } @@ -234,26 +98,14 @@ public interface InternalScanner extends Closeable { NextState next(List results) throws IOException; /** - * Grab the next row's worth of values with a limit on the number of values to return. - * @param result return output array - * @param limit limit on row count to get - * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this - * one, false if scanner is done - * @throws IOException e - */ - NextState next(List result, int limit) throws IOException; - - /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a - * restriction on the size of the list of values that are returned. + * Grab the next row's worth of values. * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize limit on the size of the result being returned + * @param scannerContext * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this * one, false if scanner is done * @throws IOException e */ - NextState next(List result, int limit, long remainingResultSize) throws IOException; + NextState next(List result, ScannerContext scannerContext) throws IOException; /** * Closes the scanner and releases any resources it has allocated diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index beb23cf..ef69bfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -128,20 +128,21 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). * @param result - * @param limit * @return state where NextState#hasMoreValues() is true if more keys exist after this * one, false if scanner is done */ - public NextState next(List result, int limit) throws IOException { - return next(result, limit, -1); + @Override + public NextState next(List result) throws IOException { + return next(result, ScannerContext.NO_LIMIT); } - public NextState next(List result, int limit, long remainingResultSize) throws IOException { + @Override + public NextState next(List result, ScannerContext scannerContext) throws IOException { if (this.current == null) { - return NextState.makeState(NextState.State.NO_MORE_VALUES); + return NextState.NO_MORE_VALUES; } InternalScanner currentAsInternal = (InternalScanner)this.current; - NextState state = currentAsInternal.next(result, limit, remainingResultSize); + NextState state = currentAsInternal.next(result, scannerContext); // Invalid states should never be returned. Receiving an invalid state means that we have // no clue how to proceed. Throw an exception. if (!NextState.isValidState(state)) { @@ -163,26 +164,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } this.current = pollRealKV(); if (this.current == null) { - state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); + state = NextState.NO_MORE_VALUES; } return state; } - /** - * Gets the next row of keys from the top-most scanner. - *

- * This method takes care of updating the heap. - *

- * This can ONLY be called when you are using Scanners that implement InternalScanner as well as - * KeyValueScanner (a {@link StoreScanner}). - * @param result - * @return state where NextState#hasMoreValues() is true if more keys exist after this - * one, false if scanner is done - */ - public NextState next(List result) throws IOException { - return next(result, -1); - } - protected static class KVScannerComparator implements Comparator { protected KVComparator kvComparator; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4316a85..9fdbacb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -152,6 +152,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -2158,47 +2159,45 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; - boolean enforceMaxResultSizeAtCellLevel = + boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + final LimitScope sizeScope = + allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + + // Configure with limits for this RPC + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setSizeLimit(sizeScope, maxResultSize); + scannerContext.setBatchLimit(scanner.getBatch()); + // Keep progress so that progress towards limits is not wiped away between calls to + // RegionScanner#nextRaw below. + scannerContext.setKeepProgress(true); while (i < rows) { // Stop collecting results if we have exceeded maxResultSize - if (currentScanResultSize >= maxResultSize) { + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) { break; } - // A negative remainingResultSize communicates that there is no limit on the size - // of the results. - final long remainingResultSize = - enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize - : -1; + // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The + // batch limit is a limit on the number of cells per Result. Thus, if progress is + // being tracked (i.e. scannerContext.keepProgress() is true) then we need to + // reset the batch progress between nextRaw invocations since we don't want the + // batch progress from previous calls to affect future calls + scannerContext.setBatchProgress(0); // Collect values to be returned here - NextState state = - scanner.nextRaw(values, scanner.getBatch(), remainingResultSize); - // Invalid states should never be returned. If one is seen, throw exception - // to stop the scan -- We have no way of telling how we should proceed + NextState state = scanner.nextRaw(values, scannerContext); if (!NextState.isValidState(state)) { throw new IOException("NextState returned from call to nextRaw was invalid"); } - if (!values.isEmpty()) { - // The state should always contain an estimate of the result size because that - // estimate must be used to decide when partial results are formed. - boolean skipResultSizeCalculation = state.hasResultSizeEstimate(); - if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize(); + if (!values.isEmpty()) { for (Cell cell : values) { totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - - // If the calculation can't be skipped, then do it now. - if (!skipResultSizeCalculation) { - currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell); - } } - // The size limit was reached. This means there are more cells remaining in - // the row but we had to stop because we exceeded our max result size. This - // indicates that we are returning a partial result - final boolean partial = state != null && state.sizeLimitReached(); + // The size limit was reached in the middle of a row. This indicates that we are + // returning a partial result + final boolean partial = state == NextState.SIZE_LIMIT_REACHED_MID_ROW; results.add(Result.create(values, null, stale, partial)); i++; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 26f9aef..b30d167 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * RegionScanner describes iterators over rows in an HRegion. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) -@InterfaceStability.Stable +@InterfaceStability.Evolving public interface RegionScanner extends InternalScanner { /** * @return The RegionInfo for this scanner. @@ -74,35 +74,23 @@ public interface RegionScanner extends InternalScanner { int getBatch(); /** - * Grab the next row's worth of values with the default limit on the number of values to return. - * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. - * Caller must set the thread's readpoint, start and close a region operation, an synchronize on - * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * Grab the next row's worth of values. This is a special internal method to be called from + * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and + * close a region operation, an synchronize on the scanner object. Caller should maintain and + * update metrics. See {@link #nextRaw(List, int, long)} * @param result return output array * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ NextState nextRaw(List result) throws IOException; - - /** - * Grab the next row's worth of values with the default limit on the number of values to return. - * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. - * Caller must set the thread's readpoint, start and close a region operation, an synchronize on - * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} - * @param result return output array - * @param limit limit on row count to get - * @return a state where NextState#hasMoreValues() is true when more rows exist, false when - * scanner is done. - * @throws IOException e - */ - NextState nextRaw(List result, int limit) throws IOException; - + /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a - * limit on the heap size of those values. This is a special internal method to be called from + * Grab the next row's worth of values. The {@link ScannerContext} is used to enforce and track + * any limits associated with this call. Any progress that exists in the {@link ScannerContext} + * prior to calling this method will be LOST if {@link ScannerContext#keepProgress()} is false. + * Upon returning from this method, the {@link ScannerContext} will contain information about the + * progress made towards the limits. This is a special internal method to be called from * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and * close a region operation, an synchronize on the scanner object. Example:

    * HRegion region = ...;
@@ -120,13 +108,13 @@ public interface RegionScanner extends InternalScanner {
    * }
    * 
* @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize the space remaining within the restriction on the result size. - * Negative values indicate no limit + * @param scannerContext The {@link ScannerContext} instance encapsulating all limits that should + * be tracked during calls to this method. The progress towards these limits can be + * tracked within this instance. * @return a state where NextState#hasMoreValues() is true when more rows exist, false when * scanner is done. * @throws IOException e */ - NextState nextRaw(List result, int limit, final long remainingResultSize) + NextState nextRaw(List result, ScannerContext scannerContext) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java new file mode 100644 index 0000000..38f70b2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ScannerContext encapsulates limit tracking AND progress towards those limits during invocations + * of {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}. + */ +@InterfaceAudience.Private +public class ScannerContext { + private final Log LOG = LogFactory.getLog(this.getClass()); + + /** + * Two sets of the same fields. One for the limits, another for the progress towards those limits + */ + LimitFields limits; + LimitFields progress; + + /** + * When true, any attempt to mutate this object will fail silently. This allows us to create a + * static object that can be used when limits should not be enforced. + */ + boolean readonly = false; + + /** + * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and + * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this + * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for + * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single + * invocation, but any progress made should be considered in future invocations) + *

+ * Defaulting this value to false means that, by default, any tracked progress will be wiped clean + * on invocations to {@link InternalScanner#next(java.util.List)} and + * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress + * has been made towards the limits so far. + */ + boolean keepProgress = false; + + /** + * Use this instance whenever limits do not need to be enforced. The instance is readonly, thus + * any attempt to mutate this object will fail silently. All limit checks will return false to + * indicate that no limit has been reached. + */ + public static ScannerContext NO_LIMIT = new ScannerContext(true); + + public ScannerContext() { + this(false); + } + + ScannerContext(boolean readonly) { + // If readonly, all attempted mutations will fail silently and all limits checks will return + // false (i.e. indicate no limit has been reached). + this.readonly = readonly; + + // Limits are left with the default values. Limits are only enforced once set. + limits = new LimitFields(); + + // Progress fields are initialized to 0 + progress = new LimitFields(0, 0); + } + + /** + * @return true if the progress tracked so far in this instance will be considered during an + * invocation of {@link InternalScanner#next(java.util.List)} or + * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far + * should not be considered and should instead be wiped away via {@link #clearProgress()} + */ + boolean keepProgress() { + return keepProgress; + } + + void setKeepProgress(boolean keepProgress) { + if (!readonly) { + this.keepProgress = keepProgress; + } + } + + /** + * Progress towards the batch limit has been made. Increment internal tracking of batch progress + */ + void incrementBatchProgress(int batch) { + if (!readonly) { + int currentBatch = progress.getBatch(); + progress.setBatch(currentBatch + batch); + } + } + + /** + * Progress towards the size limit has been made. Increment internal tracking of size progress + */ + void incrementSizeProgress(long size) { + if (!readonly) { + long currentSize = progress.getSize(); + progress.setSize(currentSize + size); + } + } + + int getBatchProgress() { + return progress.getBatch(); + } + + long getSizeProgress() { + return progress.getSize(); + } + + void setProgress(int batchProgress, long sizeProgress) { + if (!readonly) { + setBatchProgress(batchProgress); + setSizeProgress(sizeProgress); + } + } + + void setSizeProgress(long sizeProgress) { + if (!readonly) { + progress.setSize(sizeProgress); + } + } + + void setBatchProgress(int batchProgress) { + if (!readonly) { + progress.setBatch(batchProgress); + } + } + + /** + * Clear away any progress that has been made so far. All progress fields are reset to initial + * values + */ + void clearProgress() { + if (!readonly) { + progress.setFields(0, 0); + } + } + + /** + * @param checkerScope + * @return true if the batch limit can be enforced in the checker's scope + */ + boolean hasBatchLimit(LimitScope checkerScope) { + return limits.canEnforceBatchLimitFromScope(checkerScope) && limits.getBatch() > 0; + } + + /** + * @param checkerScope + * @return true if the size limit can be enforced in the checker's scope + */ + boolean hasSizeLimit(LimitScope checkerScope) { + return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0; + } + + /** + * @param checkerScope + * @return true if any limit can be enforced within the checker's scope + */ + boolean hasAnyLimit(LimitScope checkerScope) { + return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope); + } + + public void setBatchLimit(int batchLimit) { + if (!readonly) { + // The setter for batch limit does not consider the LimitScope since batch will always be + // enforced in between cells (there is no way to enforce a batch limit from any other scope) + limits.setBatch(batchLimit); + } + } + + /** + * @param scope The scope in which the size limit will be enforced + * @param sizeLimit The size limit to be enforced + */ + public void setSizeLimit(LimitScope scope, long sizeLimit) { + if (!readonly) { + setSizeLimitScope(scope); + limits.setSize(sizeLimit); + } + } + + /** + * @param scope The scope in which the size limit will be enforced + */ + void setSizeLimitScope(LimitScope scope) { + if (!readonly) { + limits.setSizeScope(scope); + } + } + + /** + * @param checkerScope The scope that the limit is being checked from + * @return true when the limit is enforceable from the checker's scope and it has been reached + */ + boolean checkBatchLimit(LimitScope checkerScope) { + return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch(); + } + + /** + * @param checkerScope The scope that the limit is being checked from + * @return true when the limit is enforceable from the checker's scope and it has been reached + */ + boolean checkSizeLimit(LimitScope checkerScope) { + return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + sb.append("limits:"); + sb.append(limits); + + sb.append(", progress:"); + sb.append(progress); + + sb.append(", readonly:"); + sb.append(readonly); + + sb.append(", keepProgress:"); + sb.append(keepProgress); + + sb.append("}"); + return sb.toString(); + } + + /** + * The various scopes where a limit can be enforced. Used to differentiate when a limit should be + * enforced or not. + */ + public enum LimitScope { + /** + * Enforcing a limit between rows means that the limit will not be considered until all the + * cells for a particular row have been retrieved + */ + BETWEEN_ROWS(0), + + /** + * Enforcing a limit between cells means that the limit will be considered after each full cell + * has been retrieved + */ + BETWEEN_CELLS(1); + + /** + * When enforcing a limit, we must check that the scope is appropriate for enforcement. + *

+ * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of + * the checker's scope is less than or equal to the limit's scope. This means that when checking + * limits, the checker must know their own scope (i.e. are they checking the limits between + * rows, between cells, etc...) + */ + int depth; + LimitScope(int depth) { + this.depth = depth; + } + + int depth() { + return depth; + } + + /** + * @param checkerScope The scope in which the limit is being checked + * @return true when the checker is in a scope that indicates the limit can be enforced. Limits + * can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a + * lesser depth than the limit) + */ + boolean canEnforceLimitFromScope(LimitScope checkerScope) { + return checkerScope != null && checkerScope.depth() <= depth; + } + } + + /** + * The different fields that can be used as limits in calls to + * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)} + */ + private static class LimitFields { + /** + * Default values of the limit fields. Defined such that if a field does NOT change from its + * default, it will not be enforced + */ + private static int DEFAULT_BATCH = -1; + private static long DEFAULT_SIZE = -1L; + + /** + * The various limits that may be enforced and their accompanying scopes + */ + int batch = DEFAULT_BATCH; + // The batch scope cannot be changed. It is always between cells thus no setter for this field + LimitScope batchScope = LimitScope.BETWEEN_CELLS; + + long size = DEFAULT_SIZE; + LimitScope sizeScope = LimitScope.BETWEEN_ROWS; + + LimitFields() { + } + + LimitFields(int batch, long size) { + setFields(batch, size); + } + + /** + * Convenience setter for all fields of this object + * @param batch + * @param size + */ + void setFields(int batch, long size) { + setBatch(batch); + setSize(size); + } + + int getBatch() { + return this.batch; + } + + void setBatch(int batch) { + this.batch = batch; + } + + /** + * @param checkerScope + * @return true when the limit can be enforced from the scope of the checker + */ + boolean canEnforceBatchLimitFromScope(LimitScope checkerScope) { + return this.batchScope.canEnforceLimitFromScope(checkerScope); + } + + long getSize() { + return this.size; + } + + void setSize(long size) { + this.size = size; + } + + /** + * Change the scope in which the size limit is enforced + */ + void setSizeScope(LimitScope scope) { + this.sizeScope = scope; + } + + /** + * @param checkerScope + * @return true when the limit can be enforced from the scope of the checker + */ + boolean canEnforceSizeLimitFromScope(LimitScope checkerScope) { + return this.sizeScope.canEnforceLimitFromScope(checkerScope); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + sb.append("batch:"); + sb.append(batch); + + sb.append(", batchScope:"); + sb.append(batchScope); + + sb.append(", size:"); + sb.append(size); + + sb.append(", sizeScope:"); + sb.append(sizeScope); + + sb.append("}"); + return sb.toString(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 831673d..1267c1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -110,10 +110,14 @@ abstract class StoreFlusher { Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(compactionKVMax); + List kvs = new ArrayList(); boolean hasMore; do { - hasMore = NextState.hasMoreValues(scanner.next(kvs, compactionKVMax)); + hasMore = NextState.hasMoreValues(scanner.next(kvs, scannerContext)); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7ce4e0b..6fbe92b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -442,16 +443,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } - /** - * Get the next row of values from this Store. - * @param outResult - * @param limit - * @return true if there are more rows, false if scanner is done - */ @Override - public NextState next(List outResult, int limit) throws IOException { - // -1 means no limit - return next(outResult, limit, -1); + public NextState next(List outResult) throws IOException { + return next(outResult, ScannerContext.NO_LIMIT); } /** @@ -462,25 +456,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @return true if there are more rows, false if scanner is done */ @Override - public NextState next(List outResult, int limit, long remainingResultSize) - throws IOException { + public NextState next(List outResult, ScannerContext scannerContext) throws IOException { lock.lock(); try { if (checkReseek()) { - return NextState.makeState(NextState.State.MORE_VALUES, 0); + return NextState.MORE_VALUES; } // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { close(); - return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + return NextState.NO_MORE_VALUES; } Cell peeked = this.heap.peek(); if (peeked == null) { close(); - return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + return NextState.NO_MORE_VALUES; } // only call setRow if the row changes; avoids confusing the query matcher @@ -489,16 +482,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int offset = peeked.getRowOffset(); short length = peeked.getRowLength(); - // If limit < 0 and remainingResultSize < 0 we can skip the row comparison because we know - // the row has changed. Else it is possible we are still traversing the same row so we - // must perform the row comparison. - if ((limit < 0 && remainingResultSize < 0) || matcher.row == null - || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, - matcher.rowLength)) { - this.countPerRow = 0; - matcher.setRow(row, offset, length); + // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing + // rows. Else it is possible we are still traversing the same row so we must perform the row + // comparison. + if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null || + !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { + this.countPerRow = 0; + matcher.setRow(row, offset, length); } + // Clear progress away unless invoker has indicated it should be kept. + if (!scannerContext.keepProgress()) scannerContext.clearProgress(); + Cell cell; // Only do a sanity-check if store and comparator are available. @@ -507,7 +502,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int count = 0; long totalBytesRead = 0; - long totalHeapSize = 0; LOOP: while((cell = this.heap.peek()) != null) { if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. @@ -532,7 +526,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.countPerRow > (storeLimit + storeOffset)) { // do what SEEK_NEXT_ROW does. if (!matcher.moreRowsMayExistAfter(cell)) { - return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); + return NextState.NO_MORE_VALUES; } seekToNextRow(cell); break LOOP; @@ -542,9 +536,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // also update metric accordingly if (this.countPerRow > storeOffset) { outResult.add(cell); + + // Update local tracking information count++; totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); - totalHeapSize += CellUtil.estimatedHeapSizeOf(cell); + + // Update the progress of the scanner context + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); + scannerContext.incrementBatchProgress(1); + if (totalBytesRead > maxRowSize) { throw new RowTooBigException("Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); @@ -553,7 +553,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { if (!matcher.moreRowsMayExistAfter(cell)) { - return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); + return NextState.NO_MORE_VALUES; } seekToNextRow(cell); } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { @@ -562,26 +562,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap.next(); } - if (limit > 0 && (count == limit)) { + if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { break LOOP; } - if (remainingResultSize > 0 && (totalHeapSize >= remainingResultSize)) { + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { break LOOP; } continue; case DONE: - return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize); + return NextState.MORE_VALUES; case DONE_SCAN: close(); - return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); + return NextState.NO_MORE_VALUES; case SEEK_NEXT_ROW: // This is just a relatively simple end of scan fix, to short-cut end // us if there is an endKey in the scan. if (!matcher.moreRowsMayExistAfter(cell)) { - return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); + return NextState.NO_MORE_VALUES; } seekToNextRow(cell); @@ -611,12 +611,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } if (count > 0) { - return NextState.makeState(NextState.State.MORE_VALUES, totalHeapSize); + return NextState.MORE_VALUES; } // No more keys close(); - return NextState.makeState(NextState.State.NO_MORE_VALUES, totalHeapSize); + return NextState.NO_MORE_VALUES; } finally { lock.unlock(); } @@ -655,11 +655,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return qcode; } - @Override - public NextState next(List outResult) throws IOException { - return next(outResult, -1); - } - // Implementation of ChangedReadersObserver @Override public void updateReaders() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 3c3ea6b..f045a08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; @@ -246,10 +247,13 @@ public abstract class Compactor { store.getRegionInfo().getRegionNameAsString() + "#" + store.getFamily().getNameAsString(); long now = 0; boolean hasMore; + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(compactionKVMax); + throughputController.start(compactionName); try { do { - hasMore = NextState.hasMoreValues(scanner.next(cells, compactionKVMax)); + hasMore = NextState.hasMoreValues(scanner.next(cells, scannerContext)); if (LOG.isDebugEnabled()) { now = EnvironmentEdgeManager.currentTime(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 140534d..54c611b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -811,10 +812,13 @@ public class AccessController extends BaseMasterAndRegionObserver boolean foundColumn = false; try { boolean more = false; + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(1); + do { cells.clear(); // scan with limit as 1 to hold down memory use on wide rows - more = NextState.hasMoreValues(scanner.next(cells, 1)); + more = NextState.hasMoreValues(scanner.next(cells, scannerContext)); for (Cell cell: cells) { if (LOG.isTraceEnabled()) { LOG.trace("Found cell " + cell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index e7c3813..eef955e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -409,6 +409,7 @@ public class TestPartialResultsFromClientSide { scan.setBatch(batch); ResultScanner scanner = TABLE.getScanner(scan); Result result = scanner.next(); + int repCount = 0; while ((result = scanner.next()) != null) { assertTrue(result.rawCells() != null); @@ -416,11 +417,12 @@ public class TestPartialResultsFromClientSide { if (result.isPartial()) { final String error = "Cells:" + result.rawCells().length + " Batch size:" + batch - + " cellsPerPartialResult:" + cellsPerPartialResult; + + " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount; assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult)); } else { assertTrue(result.rawCells().length <= batch); } + repCount++; } scanner.close(); @@ -458,7 +460,7 @@ public class TestPartialResultsFromClientSide { do { partialResult = partialScanner.next(); partials.add(partialResult); - } while (partialResult.isPartial()); + } while (partialResult != null && partialResult.isPartial()); completeResult = Result.createCompleteResult(partials); oneShotResult = oneShotScanner.next(); @@ -696,7 +698,7 @@ public class TestPartialResultsFromClientSide { LOG.info("r2: " + r2); } - final String failureMessage = "Results r1:" + r1 + " r2:" + r2 + " are not equivalent"; + final String failureMessage = "Results r1:" + r1 + " \nr2:" + r2 + " are not equivalent"; if (r1 == null && r2 == null) fail(failureMessage); else if (r1 == null || r2 == null) fail(failureMessage); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 75fe93d..043dee3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.SplitTransaction; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -90,14 +91,9 @@ public class TestCoprocessorInterface { } @Override - public NextState next(List result, int limit) throws IOException { - return delegate.next(result, limit); - } - - @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, ScannerContext scannerContext) throws IOException { - return delegate.next(result, limit, remainingResultSize); + return delegate.next(result, scannerContext); } @Override @@ -107,14 +103,9 @@ public class TestCoprocessorInterface { } @Override - public NextState nextRaw(List result, int limit) throws IOException { - return delegate.nextRaw(result, limit); - } - - @Override - public NextState nextRaw(List result, int limit, long remainingResultSize) + public NextState nextRaw(List result, ScannerContext context) throws IOException { - return delegate.nextRaw(result, limit, remainingResultSize); + return delegate.nextRaw(result, context); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index a4963ae..75a60dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -434,22 +435,17 @@ public class TestRegionObserverInterface { return new InternalScanner() { @Override public NextState next(List results) throws IOException { - return next(results, -1); + return next(results, ScannerContext.NO_LIMIT); } @Override - public NextState next(List results, int limit) throws IOException { - return next(results, limit, -1); - } - - @Override - public NextState next(List results, int limit, long remainingResultSize) + public NextState next(List results, ScannerContext scannerContext) throws IOException { List internalResults = new ArrayList(); boolean hasMore; NextState state; do { - state = scanner.next(internalResults, limit, remainingResultSize); + state = scanner.next(internalResults, scannerContext); hasMore = state != null && state.hasMoreValues(); if (!internalResults.isEmpty()) { long row = Bytes.toLong(CellUtil.cloneValue(internalResults.get(0))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 9a2c23b..96b1dba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -60,11 +60,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -561,7 +561,9 @@ public class TestAtomicOperation { Scan s = new Scan(); RegionScanner scanner = region.getScanner(s); List results = new ArrayList(); - scanner.next(results, 2); + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(2); + scanner.next(results, scannerContext); for (Cell keyValue : results) { assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ca5135d..c11e985 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -131,8 +131,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -3317,8 +3317,10 @@ public class TestHRegion { List results = new ArrayList(); int index = 0; + ScannerContext scannerContext = new ScannerContext(); + scannerContext.setBatchLimit(3); while (true) { - boolean more = NextState.hasMoreValues(s.next(results, 3)); + boolean more = NextState.hasMoreValues(s.next(results, scannerContext)); if ((index >> 1) < 5) { if (index % 2 == 0) assertEquals(results.size(), 3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 06bbd54..8c3efa5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -238,21 +238,17 @@ public class TestStripeCompactor { @Override public NextState next(List results) throws IOException { - if (kvs.isEmpty()) return NextState.makeState(NextState.State.NO_MORE_VALUES); + if (kvs.isEmpty()) return NextState.NO_MORE_VALUES; results.add(kvs.remove(0)); if (!kvs.isEmpty()) { - return NextState.makeState(NextState.State.MORE_VALUES); + return NextState.MORE_VALUES; } else { - return NextState.makeState(NextState.State.NO_MORE_VALUES); + return NextState.NO_MORE_VALUES; } } - @Override - public NextState next(List result, int limit) throws IOException { - return next(result); - } @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, ScannerContext scannerContext) throws IOException { return next(result); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 3294f6d..e2c54d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -775,23 +776,18 @@ public class TestStripeCompactionPolicy { @Override public NextState next(List results) throws IOException { - if (kvs.isEmpty()) return NextState.makeState(NextState.State.NO_MORE_VALUES); + if (kvs.isEmpty()) return NextState.NO_MORE_VALUES; results.add(kvs.remove(0)); if (!kvs.isEmpty()) { - return NextState.makeState(NextState.State.MORE_VALUES); + return NextState.MORE_VALUES; } else { - return NextState.makeState(NextState.State.NO_MORE_VALUES); + return NextState.NO_MORE_VALUES; } } @Override - public NextState next(List result, int limit) throws IOException { - return next(result); - } - - @Override - public NextState next(List result, int limit, long remainingResultSize) + public NextState next(List result, ScannerContext scannerContext) throws IOException { return next(result); } -- 1.9.3 (Apple Git-50)