From 0b35410aadd5ed2e218c3b90cab67e69a46b5768 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 31 Jan 2018 11:02:59 +0800 Subject: [PATCH] HBASE-19855 Refactor RegionScannerImpl.nextInternal method --- .../apache/hadoop/hbase/regionserver/HRegion.java | 177 ++++++++++----------- 1 file changed, 83 insertions(+), 94 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ecc9a74..b32ffff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6392,6 +6392,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Used to check time limit LimitScope limitScope = LimitScope.BETWEEN_CELLS; + // When has filter row is true it means that the all the cells for a particular row must be + // read before a filtering decision can be made. This means that filters where hasFilterRow + // run the risk of enLongAddering out of memory errors in the case that they are applied to a + // table that has very large rows. + boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); + + // If filter#hasFilterRow is true, partial results are not allowed since allowing them + // would prevent the filters from being evaluated. Thus, if it is true, change the + // scope of any limits that could potentially create partial results to + // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row + if (hasFilterRow) { + if (LOG.isTraceEnabled()) { + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. Changing scope of limits that may create partials"); + } + scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); + scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); + limitScope = LimitScope.BETWEEN_ROWS; + } + // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, @@ -6407,52 +6427,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { scannerContext.clearProgress(); } - if (rpcCall.isPresent()) { - // If a user specifies a too-restrictive or too-slow scanner, the - // client might time out and disconnect while the server side - // is still processing the request. We should abort aggressively - // in that case. - long afterTime = rpcCall.get().disconnectSince(); - if (afterTime >= 0) { - throw new CallerDisconnectedException( - "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + - this + " after " + afterTime + " ms, since " + - "caller disconnected"); - } + + throwExceptionIfCallerDisconnected(rpcCall); + + // Read nothing as the rowkey was filtered, but still need to check time limit + if (scannerContext.checkTimeLimit(limitScope)) { + return true; } // Let's see what we have in the storeHeap. Cell current = this.storeHeap.peek(); - boolean shouldStop = shouldStop(current); - // When has filter row is true it means that the all the cells for a particular row must be - // read before a filtering decision can be made. This means that filters where hasFilterRow - // run the risk of enLongAddering out of memory errors in the case that they are applied to a - // table that has very large rows. - boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); - - // If filter#hasFilterRow is true, partial results are not allowed since allowing them - // would prevent the filters from being evaluated. Thus, if it is true, change the - // scope of any limits that could potentially create partial results to - // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row - if (hasFilterRow) { - if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " - + " formed. Changing scope of limits that may create partials"); - } - scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); - scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); - limitScope = LimitScope.BETWEEN_ROWS; - } - - if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { - if (hasFilterRow) { - throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scans that must " + - " stop mid-row because of a limit. ScannerContext:" + scannerContext); - } - return true; - } // Check if we were getting data from the joinedHeap and hit the limit. // If not, then it's main path - getting results from storeHeap. @@ -6468,7 +6453,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); // early check, see HBASE-16296 if (isFilterDoneInternal()) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -6477,16 +6461,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // here we are filtering a row based purely on its row key, preventing us from calling // #populateResult. Thus, perform the necessary increment here to rows scanned metric incrementCountOfRowsScannedMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + if (!hasMoreValuesWhenReadNothingFromCurrentRow(scannerContext, current)) { + return false; } results.clear(); - - // Read nothing as the rowkey was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { - return true; - } continue; } @@ -6511,40 +6489,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); - - // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. - if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress, - initialHeapSizeProgress); - } else { - scannerContext.clearProgress(); - } - scannerContext.incrementBatchProgress(results.size()); - for (Cell cell : results) { - scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), - PrivateCellUtil.estimatedHeapSizeOf(cell)); - } + resetProgressAfterFilterRowCells(results, scannerContext, initialBatchProgress, + initialSizeProgress, initialHeapSizeProgress); } if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); results.clear(); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { + if (!hasMoreValuesWhenReadNothingFromCurrentRow(scannerContext, current) || + shouldStop) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - - // This row was totally filtered out, if this is NOT the last row, - // we should continue on. Otherwise, nothing else to do. - if (!shouldStop) { - // Read nothing as the cells was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { - return true; - } - continue; - } - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + continue; } // Ok, we are done with storeHeap for this row. @@ -6555,36 +6510,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean mayHaveData = joinedHeapMayHaveData(current); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); - - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - return true; - } } } - } else { - // Populating from the joined heap was stopped by limits, populate some more. + } + + if (joinedContinuationRow != null) { + // Populating from the joined heap populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; } } - // We may have just called populateFromJoinedMap and hit the limits. If that is - // the case, we need to call it again on the next next() invocation. - if (joinedContinuationRow != null) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); - } // Finally, we are done with both joinedHeap and storeHeap. // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { - incrementCountOfRowsFilteredMetric(scannerContext); - boolean moreRows = nextRow(scannerContext, current); - if (!moreRows) { + if (!hasMoreValuesWhenReadNothingFromCurrentRow(scannerContext, current) || + shouldStop) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - if (!shouldStop) continue; + continue; } if (shouldStop) { @@ -6595,6 +6541,49 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * If a user specifies a too-restrictive or too-slow scanner, the client might time out and + * disconnect while the server side is still processing the request. We should abort + * aggressively in that case. + */ + private void throwExceptionIfCallerDisconnected(Optional rpcCall) + throws CallerDisconnectedException { + if (rpcCall.isPresent()) { + long afterTime = rpcCall.get().disconnectSince(); + if (afterTime >= 0) { + throw new CallerDisconnectedException( + "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this + + " after " + afterTime + " ms, since " + "caller disconnected"); + } + } + } + + /** + * Only call this after {@link org.apache.hadoop.hbase.filter.Filter#filterRowCells(List)}. As + * we don't know how the results have changed after being filtered, reset progress according to + * contents of results now. + */ + private void resetProgressAfterFilterRowCells(List results, ScannerContext scannerContext, + int initialBatchProgress, long initialSizeProgress, long initialHeapSizeProgress) { + if (scannerContext.getKeepProgress()) { + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, + initialHeapSizeProgress); + } else { + scannerContext.clearProgress(); + } + scannerContext.incrementBatchProgress(results.size()); + for (Cell cell : results) { + scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), + PrivateCellUtil.estimatedHeapSizeOf(cell)); + } + } + + private boolean hasMoreValuesWhenReadNothingFromCurrentRow(ScannerContext scannerContext, + Cell current) throws IOException { + incrementCountOfRowsFilteredMetric(scannerContext); + return nextRow(scannerContext, current); + } + protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { filteredReadRequestsCount.increment(); -- 1.9.1