From 4488a7d1a4dabb7be11deda2a173c86d0bca084b Mon Sep 17 00:00:00 2001 From: Vikas Vishwakarma Date: Wed, 11 Jul 2018 12:50:39 +0530 Subject: [PATCH] HBASE-20866 HBase 1.x scan performance degradation compared to 0.98 version --- .../apache/hadoop/hbase/client/ClientScanner.java | 106 ++++++++++----------- 1 file changed, 52 insertions(+), 54 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ecaab99..6607e18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -67,6 +67,8 @@ public class ClientScanner extends AbstractClientScanner { protected HRegionInfo currentRegion = null; protected ScannerCallableWithReplicas callable = null; protected final LinkedList cache = new LinkedList(); + protected long remainingResultSize; + protected int countdown; /** * A list of partial results that have been returned from the server. This list should only * contain results if this scanner does not have enough partial results to form the complete @@ -408,8 +410,8 @@ public class ClientScanner extends AbstractClientScanner { */ protected void loadCache() throws IOException { Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; + remainingResultSize = maxScannerResultSize; + countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { if (!nextScanner(countdown, false)) { @@ -522,27 +524,7 @@ public class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = - getResultsToAddToCache(values, callable.isHeartbeatMessage()); - if (!resultsToAddToCache.isEmpty()) { - for (Result rs : resultsToAddToCache) { - rs = filterLoadedCell(rs); - if (rs == null) { - continue; - } - cache.add(rs); - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0) { - updateLastCellLoadedToCache(this.lastResult); - } else { - this.lastCellLoadedToCache = null; - } - } - } + loadResultsToCache(values, callable.isHeartbeatMessage()); boolean exhausted = regionExhausted(values); if (callable.isHeartbeatMessage()) { if (!cache.isEmpty()) { @@ -593,34 +575,31 @@ public class ClientScanner extends AbstractClientScanner { /** * This method ensures all of our book keeping regarding partial results is kept up to date. This * method should be called once we know that the results we received back from the RPC request do - * not contain errors. We return a list of results that should be added to the cache. In general, + * not contain errors. We groom a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) * @param resultsFromServer The array of {@link Result}s returned from the server * @param heartbeatMessage Flag indicating whether or not the response received from the server * represented a complete response, or a heartbeat message that was sent to keep the * client-server connection alive - * @return the list of results that should be added to the cache. * @throws IOException */ - protected List getResultsToAddToCache(Result[] resultsFromServer, + protected void loadResultsToCache(Result[] resultsFromServer, boolean heartbeatMessage) throws IOException { - int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; - List resultsToAddToCache = new ArrayList(resultSize); final boolean isBatchSet = scan != null && scan.getBatch() > 0; final boolean allowPartials = scan != null && scan.getAllowPartialResults(); // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note that since scan batching also returns results + // then simply add all results to the cache. Note that since scan batching also returns results // for a row in pieces we treat batch being set as equivalent to allowing partials. The // implication of treating batching as equivalent to partial results is that it is possible // the caller will receive a result back where the number of cells in the result is less than // the batch size even though it may not be the last group of cells for that row. if (allowPartials || isBatchSet) { - addResultsToList(resultsToAddToCache, resultsFromServer, 0, + addResultArrayToCache(resultsFromServer, 0, (null == resultsFromServer ? 0 : resultsFromServer.length)); - return resultsToAddToCache; + return; } // If no results were returned it indicates that either we have the all the partial results @@ -631,11 +610,11 @@ public class ClientScanner extends AbstractClientScanner { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + addResultToCache(Result.createCompleteResult(partialResults)); clearPartialResults(); } - return resultsToAddToCache; + return; } // In every RPC response there should be at most a single partial result. Furthermore, if @@ -675,7 +654,7 @@ public class ClientScanner extends AbstractClientScanner { addToPartialResults(partial); // Exclude the last result, it's a partial - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1); + addResultArrayToCache(resultsFromServer, 0, resultsFromServer.length - 1); } else if (!partialResults.isEmpty()) { for (int i = 0; i < resultsFromServer.length; i++) { Result result = resultsFromServer[i]; @@ -688,7 +667,7 @@ public class ClientScanner extends AbstractClientScanner { // If the result is not a partial, it is a signal to us that it is the last Result we // need to form the complete Result client-side if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + addResultToCache(Result.createCompleteResult(partialResults)); clearPartialResults(); } } else { @@ -696,7 +675,7 @@ public class ClientScanner extends AbstractClientScanner { // far. If our list of partials isn't empty, this is a signal to form the complete Result // since the row has now changed if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + addResultToCache(Result.createCompleteResult(partialResults)); clearPartialResults(); } @@ -706,15 +685,49 @@ public class ClientScanner extends AbstractClientScanner { if (result.isPartial()) { addToPartialResults(result); } else { - resultsToAddToCache.add(result); + addResultToCache(result); } } } } else { // partial == null && partialResults.isEmpty() -- business as usual - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length); + addResultArrayToCache(resultsFromServer, 0, resultsFromServer.length); + } + } + + /** + * Add result array received from server to cache + * @param resultsToAddToCache + * @param start + * @param end + */ + private void addResultArrayToCache(Result[] resultsToAddToCache, int start, int end) { + if (resultsToAddToCache != null) { + for (int r = start; r < end; r++) { + addResultToCache(resultsToAddToCache[r]); + } } + } - return resultsToAddToCache; + /** + * Add result received from server or result constructed from partials to cache + * @param rs + */ + private void addResultToCache(Result rs) { + rs = filterLoadedCell(rs); + if (rs == null) { + return; + } + cache.add(rs); + for (Cell cell : rs.rawCells()) { + remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); + } + countdown--; + this.lastResult = rs; + if (this.lastResult.isPartial() || scan.getBatch() > 0) { + updateLastCellLoadedToCache(this.lastResult); + } else { + this.lastCellLoadedToCache = null; + } } /** @@ -742,21 +755,6 @@ public class ClientScanner extends AbstractClientScanner { partialResultsRow = null; } - /** - * Helper method for adding results between the indices [start, end) to the outputList - * @param outputList the list that results will be added to - * @param inputArray the array that results are taken from - * @param start beginning index (inclusive) - * @param end ending index (exclusive) - */ - private void addResultsToList(List outputList, Result[] inputArray, int start, int end) { - if (inputArray == null || start < 0 || end > inputArray.length) return; - - for (int i = start; i < end; i++) { - outputList.add(inputArray[i]); - } - } - @Override public void close() { if (!scanMetricsPublished) writeScanMetrics(); -- 2.7.4