From aab8f4824544c6a6f867cbb72661765f9576282d Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Mon, 14 Mar 2016 19:53:18 +0800 Subject: [PATCH] HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests --- .../apache/hadoop/hbase/client/ClientScanner.java | 473 ++++++++++++--------- .../org/apache/hadoop/hbase/client/Result.java | 119 +++--- .../org/apache/hadoop/hbase/CellComparator.java | 2 +- .../hbase/TestPartialResultsFromClientSide.java | 294 +++++++++++-- 4 files changed, 584 insertions(+), 304 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 22a56e3..ab8e9f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -54,49 +55,53 @@ import java.util.concurrent.ExecutorService; */ @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { - private static final Log LOG = LogFactory.getLog(ClientScanner.class); - // A byte array in which all elements are the max byte, and it is used to - // construct closest front row - static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - protected Scan scan; - protected boolean closed = false; - // Current region scanner is against. Gets cleared if current region goes - // wonky: e.g. if it splits on us. - protected HRegionInfo currentRegion = null; - protected ScannerCallableWithReplicas callable = null; - protected Queue cache; - /** - * A list of partial results that have been returned from the server. This list should only - * contain results if this scanner does not have enough partial results to form the complete - * result. - */ - protected final LinkedList partialResults = new LinkedList(); - /** - * The row for which we are accumulating partial Results (i.e. the row of the Results stored - * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync - * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} - */ - protected byte[] partialResultsRow = null; - protected final int caching; - protected long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - protected Result lastResult = null; - protected final long maxScannerResultSize; - private final ClusterConnection connection; - private final TableName tableName; - protected final int scannerTimeout; - protected boolean scanMetricsPublished = false; - protected RpcRetryingCaller caller; - protected RpcControllerFactory rpcControllerFactory; - protected Configuration conf; - //The timeout on the primary. Applicable if there are multiple replicas for a region - //In that case, we will only wait for this much timeout on the primary before going - //to the replicas and trying the same scan. Note that the retries will still happen - //on each replica and the first successful results will be taken. A timeout of 0 is - //disallowed. - protected final int primaryOperationTimeout; - private int retries; - protected final ExecutorService pool; + private static final Log LOG = LogFactory.getLog(ClientScanner.class); + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + protected Scan scan; + protected boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + protected HRegionInfo currentRegion = null; + protected ScannerCallableWithReplicas callable = null; + protected Queue cache; + /** + * A list of partial results that have been returned from the server. This list should only + * contain results if this scanner does not have enough partial results to form the complete + * result. + */ + protected final LinkedList partialResults = new LinkedList<>(); + /** + * The row for which we are accumulating partial Results (i.e. the row of the Results stored + * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync + * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} + */ + protected byte[] partialResultsRow = null; + protected boolean isPartialResultStale = false; + protected int numOfPartialCells = 0; + + protected Cell lastCellOfLoadCache = null; + protected final int caching; + protected long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + protected Result lastResult = null; + protected final long maxScannerResultSize; + private final ClusterConnection connection; + private final TableName tableName; + protected final int scannerTimeout; + protected boolean scanMetricsPublished = false; + protected RpcRetryingCaller caller; + protected RpcControllerFactory rpcControllerFactory; + protected Configuration conf; + //The timeout on the primary. Applicable if there are multiple replicas for a region + //In that case, we will only wait for this much timeout on the primary before going + //to the replicas and trying the same scan. Note that the retries will still happen + //on each replica and the first successful results will be taken. A timeout of 0 is + //disallowed. + protected final int primaryOperationTimeout; + private int retries; + protected final ExecutorService pool; /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -111,49 +116,48 @@ public abstract class ClientScanner extends AbstractClientScanner { ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Scan table=" + tableName - + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); - } - this.scan = scan; - this.tableName = tableName; - this.lastNext = System.currentTimeMillis(); - this.connection = connection; - this.pool = pool; - this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - if (scan.getMaxResultSize() > 0) { - this.maxScannerResultSize = scan.getMaxResultSize(); - } else { - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + if (LOG.isTraceEnabled()) { + LOG.trace("Scan table=" + tableName + ", startRow=" + + Bytes.toStringBinary(scan.getStartRow())); + } + this.scan = scan; + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.connection = connection; + this.pool = pool; + this.primaryOperationTimeout = primaryOperationTimeout; + this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (scan.getMaxResultSize() > 0) { + this.maxScannerResultSize = scan.getMaxResultSize(); + } else { + this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - } - this.scannerTimeout = HBaseConfiguration.getInt(conf, - HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + } + this.scannerTimeout = HBaseConfiguration + .getInt(conf, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - // check if application wants to collect scan metrics - initScanMetrics(scan); + // check if application wants to collect scan metrics + initScanMetrics(scan); - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = conf.getInt( - HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - } + // Use the caching from the Scan. If not set, use the default cache setting for this table. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + } - this.caller = rpcFactory. newCaller(); - this.rpcControllerFactory = controllerFactory; + this.caller = rpcFactory.newCaller(); + this.rpcControllerFactory = controllerFactory; - this.conf = conf; - initCache(); - initializeScannerInConstruction(); - } + this.conf = conf; + initCache(); + initializeScannerInConstruction(); + } protected abstract void initCache(); @@ -389,7 +393,10 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; + // A flag to make sure we must scan this region in next rpc right now. + boolean mustRescanInCurrentRegion = false; do { + mustRescanInCurrentRegion = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -421,10 +428,10 @@ public abstract class ClientScanner extends AbstractClientScanner { // id against the new region server; reset the scanner. if (timeout < System.currentTimeMillis()) { LOG.info("For hints related to the following exception, please try taking a look at: " - + "https://hbase.apache.org/book.html#trouble.client.scantimeout"); + + "https://hbase.apache.org/book.html#trouble.client.scantimeout"); long elapsed = System.currentTimeMillis() - lastNext; - ScannerTimeoutException ex = - new ScannerTimeoutException(elapsed + "ms passed since the last invocation, " + ScannerTimeoutException ex = new ScannerTimeoutException( + elapsed + "ms passed since the last invocation, " + "timeout is currently set to " + scannerTimeout); ex.initCause(e); throw ex; @@ -448,10 +455,15 @@ public abstract class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + if (!this.lastResult.isPartial() && scan.getBatch() < 0) { + if (scan.isReversed()) { + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + } else { + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + } } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + // we need rescan this row because we only loaded partial row before + scan.setStartRow(lastResult.getRow()); } } if (e instanceof OutOfOrderScannerNextException) { @@ -459,8 +471,8 @@ public abstract class ClientScanner extends AbstractClientScanner { retryAfterOutOfOrderException = false; } else { // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? - throw new DoNotRetryIOException("Failed after retry of " + - "OutOfOrderScannerNextException: was there a rpc timeout?", e); + throw new DoNotRetryIOException("Failed after retry of " + + "OutOfOrderScannerNextException: was there a rpc timeout?", e); } } // Clear region. @@ -476,20 +488,34 @@ public abstract class ClientScanner extends AbstractClientScanner { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); } lastNext = currentTime; + + if (this.lastCellOfLoadCache != null && values != null && values.length > 0 && + compare(this.lastCellOfLoadCache, values[0].rawCells()[0]) >= 0) { + // If we will drop some results because we have loaded them to cache, we must continue to + // scan this region in next rpc. + // Set this flag to true to prevent doneWithRegion return true. + mustRescanInCurrentRegion = true; + } // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. List resultsToAddToCache = getResultsToAddToCache(values, callable.isHeartbeatMessage()); - if (!resultsToAddToCache.isEmpty()) { - for (Result rs : resultsToAddToCache) { - cache.add(rs); - long estimatedHeapSizeOfResult = calcEstimatedSize(rs); - countdown--; - remainingResultSize -= estimatedHeapSizeOfResult; - addEstimatedSize(estimatedHeapSizeOfResult); - this.lastResult = rs; - } + + for (Result rs : resultsToAddToCache) { + cache.add(rs); + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); + countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); + this.lastResult = rs; + updateLastCellOfLoadCache(this.lastResult); + } + + if (cache.isEmpty() && values != null && values.length > 0 && partialResults.size() == 0) { + // all result has been seen before, we need scan more. + mustRescanInCurrentRegion = true; + continue; } if (callable.isHeartbeatMessage()) { if (cache.size() > 0) { @@ -499,10 +525,11 @@ public abstract class ClientScanner extends AbstractClientScanner { // unnecesary delays to the caller if (LOG.isTraceEnabled()) { LOG.trace("Heartbeat message received and cache contains Results." - + " Breaking out of scan loop"); + + " Breaking out of scan loop"); } break; } + mustRescanInCurrentRegion = true; continue; } @@ -515,19 +542,19 @@ public abstract class ClientScanner extends AbstractClientScanner { // as it keeps the outer loop logic the same. serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty(); } + // Values == null means server-side filter has determined we must STOP // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while ((callable != null && callable.isHeartbeatMessage()) - || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); + } while (mustRescanInCurrentRegion || + (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) + &&(!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } /** * @param remainingResultSize * @param remainingRows - * @param regionHasMoreResults * @return true when the current region has been exhausted. When the current region has been * exhausted, the region must be changed before scanning can continue */ @@ -560,7 +587,7 @@ public abstract class ClientScanner extends AbstractClientScanner { * not contain errors. We return a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) - * @param resultsFromServer The array of {@link Result}s returned from the server + * @param origionResultsFromServer The array of {@link Result}s returned from the server * @param heartbeatMessage Flag indicating whether or not the response received from the server * represented a complete response, or a heartbeat message that was sent to keep the * client-server connection alive @@ -568,120 +595,81 @@ public abstract class ClientScanner extends AbstractClientScanner { * @throws IOException */ protected List - getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) + getResultsToAddToCache(Result[] origionResultsFromServer, boolean heartbeatMessage) throws IOException { - int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; - List resultsToAddToCache = new ArrayList(resultSize); - - final boolean isBatchSet = scan != null && scan.getBatch() > 0; - final boolean allowPartials = scan != null && scan.getAllowPartialResults(); - + List filteredResults = filterResultsFromServer(origionResultsFromServer); + List resultsToAddToCache = new ArrayList<>(filteredResults.size()); // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note that since scan batching also returns results - // for a row in pieces we treat batch being set as equivalent to allowing partials. The - // implication of treating batching as equivalent to partial results is that it is possible - // the caller will receive a result back where the number of cells in the result is less than - // the batch size even though it may not be the last group of cells for that row. - if (allowPartials || isBatchSet) { - addResultsToList(resultsToAddToCache, resultsFromServer, 0, - (null == resultsFromServer ? 0 : resultsFromServer.length)); + // then simply add all results to the list. + // Set batch limit and say allowed partial result is not same (HBASE-15325) + if (scan.getAllowPartialResults()) { + resultsToAddToCache.addAll(filteredResults); return resultsToAddToCache; } // If no results were returned it indicates that either we have the all the partial results // necessary to construct the complete result or the server had to send a heartbeat message // to the client to keep the client-server connection alive - if (resultsFromServer == null || resultsFromServer.length == 0) { + if (filteredResults.isEmpty()) { // If this response was an empty heartbeat message, then we have not exhausted the region // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result - if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); + if ((origionResultsFromServer == null || origionResultsFromServer.length == 0) + && !partialResults.isEmpty() && !heartbeatMessage) { + completeCurrentPartialRow(resultsToAddToCache); } - return resultsToAddToCache; } - // In every RPC response there should be at most a single partial result. Furthermore, if - // there is a partial result, it is guaranteed to be in the last position of the array. - Result last = resultsFromServer[resultsFromServer.length - 1]; - Result partial = last.isPartial() ? last : null; - - if (LOG.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("number results from RPC: ").append(resultsFromServer.length).append(","); - sb.append("partial != null: ").append(partial != null).append(","); - sb.append("number of partials so far: ").append(partialResults.size()); - LOG.trace(sb.toString()); - } - - // There are three possibilities cases that can occur while handling partial results - // - // 1. (partial != null && partialResults.isEmpty()) - // This is the first partial result that we have received. It should be added to - // the list of partialResults and await the next RPC request at which point another - // portion of the complete result will be received - // - // 2. !partialResults.isEmpty() - // Since our partialResults list is not empty it means that we have been accumulating partial - // Results for a particular row. We cannot form the complete/whole Result for that row until - // all partials for the row have been received. Thus we loop through all of the Results - // returned from the server and determine whether or not all partial Results for the row have - // been received. We know that we have received all of the partial Results for the row when: - // i) We notice a row change in the Results - // ii) We see a Result for the partial row that is NOT marked as a partial Result - // - // 3. (partial == null && partialResults.isEmpty()) - // Business as usual. We are not accumulating partial results and there wasn't a partial result - // in the RPC response. This means that all of the results we received from the server are - // complete and can be added directly to the cache - if (partial != null && partialResults.isEmpty()) { - addToPartialResults(partial); - - // Exclude the last result, it's a partial - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1); - } else if (!partialResults.isEmpty()) { - for (int i = 0; i < resultsFromServer.length; i++) { - Result result = resultsFromServer[i]; - - // This result is from the same row as the partial Results. Add it to the list of partials - // and check if it was the last partial Result for that row - if (Bytes.equals(partialResultsRow, result.getRow())) { - addToPartialResults(result); - - // If the result is not a partial, it is a signal to us that it is the last Result we - // need to form the complete Result client-side - if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - } else { - // The row of this result differs from the row of the partial results we have received so - // far. If our list of partials isn't empty, this is a signal to form the complete Result - // since the row has now changed - if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } + // If user setBatch(5) and rpc returns(after filterResultsFromServer) 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. In this case, the first Result with 3 cells must be + // partial because if it had 5 and we filterd two of them, we have changed the status + // to partial in filterLoadedCell. + for (Result result : filteredResults) { + // if partialResultsRow is null, Bytes.equals will return false. + if (!Bytes.equals(partialResultsRow, result.getRow())) { + // This result is a new row. We should add partialResults as a complete row to cache first. + completeCurrentPartialRow(resultsToAddToCache); + } - // It's possible that in one response from the server we receive the final partial for - // one row and receive a partial for a different row. Thus, make sure that all Results - // are added to the proper list - if (result.isPartial()) { - addToPartialResults(result); - } else { - resultsToAddToCache.add(result); - } + addToPartialResults(result); + if (scan.getBatch() > 0 && numOfPartialCells >= scan.getBatch()) { + List batchedResults = + Result.createValidBatchedResults(partialResults, scan.getBatch(), isPartialResultStale); + // remaining partialResults has at most one Cell[] + if(partialResults.size()>0) { + numOfPartialCells = partialResults.get(0).length; + }else { + numOfPartialCells = 0; + } + if (!batchedResults.isEmpty()) { + resultsToAddToCache.addAll(batchedResults); } } - } else { // partial == null && partialResults.isEmpty() -- business as usual - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length); - } + if (!result.isPartial() && (scan.getBatch() < 0 + || scan.getBatch() > 0 && result.size() < scan.getBatch())) { + // It is the last part of this row. + completeCurrentPartialRow(resultsToAddToCache); + } + } return resultsToAddToCache; } + private void completeCurrentPartialRow(List list) + throws IOException { + if (partialResultsRow == null) { + return; + } + if (scan.getBatch() > 0) { + list.addAll(Result.createCompleteBatchedResults( + partialResults, scan.getBatch(), isPartialResultStale)); + } else { + list.add(Result.createCompleteResult(partialResults, isPartialResultStale)); + } + clearPartialResults(); + } + /** * A convenience method for adding a Result to our list of partials. This method ensure that only * Results that belong to the same row as the other partials can be added to the list. @@ -696,7 +684,17 @@ public abstract class ClientScanner extends AbstractClientScanner { + Bytes.toString(row)); } partialResultsRow = row; - partialResults.add(result); + partialResults.add(result.rawCells()); + isPartialResultStale = isPartialResultStale || result.isStale(); + numOfPartialCells += result.size(); + } + + private int countOfCells(List list){ + int count = 0; + for(Result result:list){ + count+=result.size(); + } + return count; } /** @@ -705,22 +703,10 @@ public abstract class ClientScanner extends AbstractClientScanner { private void clearPartialResults() { partialResults.clear(); partialResultsRow = null; + isPartialResultStale = false; + numOfPartialCells = 0; } - /** - * Helper method for adding results between the indices [start, end) to the outputList - * @param outputList the list that results will be added to - * @param inputArray the array that results are taken from - * @param start beginning index (inclusive) - * @param end ending index (exclusive) - */ - private void addResultsToList(List outputList, Result[] inputArray, int start, int end) { - if (inputArray == null || start < 0 || end > inputArray.length) return; - - for (int i = start; i < end; i++) { - outputList.add(inputArray[i]); - } - } @Override public void close() { @@ -783,4 +769,69 @@ public abstract class ClientScanner extends AbstractClientScanner { } return false; } + + protected void updateLastCellOfLoadCache(Result result) { + if (result.rawCells().length == 0) { + return; + } + this.lastCellOfLoadCache = result.rawCells()[result.rawCells().length - 1]; + } + + private int compare(Cell a, Cell b) { + CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ? + CellComparator.META_COMPARATOR : + CellComparator.COMPARATOR; + int r = comparator.compareRows(a, b); + if (r != 0) { + return this.scan.isReversed() ? -r : r; + } + return CellComparator.compareWithoutRow(a, b); + } + + private List filterResultsFromServer(Result[] results) { + List list = new ArrayList<>(); + if (results == null || results.length == 0) { + return list; + } + for (Result r : results) { + r = filterLoadedCell(r); + if (r != null) { + list.add(r); + } + } + return list; + } + + private Result filterLoadedCell(Result result) { + // we only filter result when last result is partial + // so lastCellOfLoadCache and result should have same row key. + // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; + // 3) read more cell. lastCellOfLoadCache and result will be not at same row. + if (lastCellOfLoadCache == null || result.rawCells().length == 0) { + return result; + } + if (compare(this.lastCellOfLoadCache, result.rawCells()[0]) < 0) { + // The first cell of this result is larger than the last cell of loadcache. + // If user do not allow partial result, it must be true. + return result; + } + if (compare(this.lastCellOfLoadCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { + // The last cell of this result is smaller than the last cell of loadcache, skip all. + return null; + } + + int index = 0; + while (index < result.rawCells().length) { + if (compare(this.lastCellOfLoadCache, result.rawCells()[index]) < 0) { + break; + } + index++; + } + List list = new ArrayList<>(result.rawCells().length - index); + for (; index < result.rawCells().length; index++) { + list.add(result.rawCells()[index]); + } + // We mark this partial to be a flag that part of cells dropped + return Result.create(list, result.getExists(), result.isStale(), true); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..ecf2a2b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -783,69 +785,76 @@ public class Result implements CellScannable, CellScanner { /** * Forms a single result from the partial results in the partialResults list. This method is * useful for reconstructing partial results on the client side. - * @param partialResults list of partial results + * @param partialResults list of partial cells * @return The complete result that is formed by combining all of the partial results together - * @throws IOException A complete result cannot be formed because the results in the partial list - * come from different rows */ - public static Result createCompleteResult(List partialResults) + public static Result createCompleteResult(List partialResults, boolean stale) throws IOException { - List cells = new ArrayList(); - boolean stale = false; - byte[] prevRow = null; - byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.isPartial()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); + List results = new ArrayList(); + for (Cell[] cells : partialResults) { + Collections.addAll(results,cells); + } + return Result.create(results, null, stale); + } + + /** + * Forms a group of batched results. Caller must make sure we won't have more cells in this row. + * The last result may have less Cells, but it is complete, too. + * This method will not change the list. + * @param batch The size of batch that user set. + * @return A list of Results, all of them are not partial. + */ + public static List createCompleteBatchedResults(List list, int batch, boolean stale) { + int count = 0; + Cell[] tmp = new Cell[batch]; + List results = new ArrayList<>(); + for (Cell[] cells : list) { + for (Cell c : cells) { + tmp[count++] = c; + if (count == batch) { + results.add(new Result(tmp, null, stale, false)); + count = 0; + tmp = new Cell[batch]; } } } - - return Result.create(cells, null, stale); - } - - /** - * Get total size of raw cells - * @param result - * @return Total size. - */ - public static long getTotalSizeOfCells(Result result) { - long size = 0; - if (result.isEmpty()) { - return size; + if (count > 0) { + // count must less than batch here + Cell[] tmp2 = Arrays.copyOf(tmp, count); + results.add(new Result(tmp2, null, stale, false)); + } + return results; + } + + /** + * Forms a group of batched results. This method will change the list by LinkedList.poll. + * If the last part is less than batch size, it'll addFirst to LinkedList with remaining cells. + * @param list + * @param batch + * @return + */ + public static List createValidBatchedResults(LinkedList list, int batch, + boolean stale) { + int count = 0; + Cell[] tmp = new Cell[batch]; + List results = new ArrayList<>(); + Cell[] cells; + while ((cells = list.poll()) != null) { + for (Cell c : cells) { + tmp[count++] = c; + if (count == batch) { + results.add(new Result(tmp, null, stale, false)); + count = 0; + tmp = new Cell[batch]; + } + } } - for (Cell c : result.rawCells()) { - size += CellUtil.estimatedHeapSizeOf(c); + if (count > 0) { + // count must less than batch here + Cell[] tmp2 = Arrays.copyOf(tmp, count); + list.addFirst(tmp2); } - return size; + return results; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index d869b3e..a5e26cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -378,7 +378,7 @@ public class CellComparator implements Comparator, Serializable { roffset, rlength); } - private static int compareWithoutRow(final Cell left, final Cell right) { + public static int compareWithoutRow(final Cell left, final Cell right) { // If the column is not specified, the "minimum" key type appears the // latest in the sorted order, regardless of the timestamp. This is used // for specifying the last key/value in a given row, because there is no diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index a6f8373..3c2ab4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,7 +68,7 @@ public class TestPartialResultsFromClientSide { private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + private final static int MINICLUSTER_SIZE = 5; private static Table TABLE = null; /** @@ -99,7 +102,9 @@ public class TestPartialResultsFromClientSide { @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); + TEST_UTIL.getAdmin().setSplitOrMergeEnabled(false, true); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -430,7 +435,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List, Result)} + * Test the method {@link Result#createCompleteResult(List,boolean)} * @throws Exception */ @Test @@ -451,7 +456,7 @@ public class TestPartialResultsFromClientSide { oneShotScan.setMaxResultSize(Long.MAX_VALUE); ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan); - ArrayList partials = new ArrayList<>(); + ArrayList partials = new ArrayList<>(); for (int i = 0; i < NUM_ROWS; i++) { Result partialResult = null; Result completeResult = null; @@ -460,10 +465,10 @@ public class TestPartialResultsFromClientSide { do { partialResult = partialScanner.next(); - partials.add(partialResult); + partials.add(partialResult.rawCells()); } while (partialResult != null && partialResult.isPartial()); - completeResult = Result.createCompleteResult(partials); + completeResult = Result.createCompleteResult(partials, false); oneShotResult = oneShotScanner.next(); compareResults(completeResult, oneShotResult, null); @@ -477,35 +482,6 @@ public class TestPartialResultsFromClientSide { } /** - * When reconstructing the complete result from its partials we ensure that the row of each - * partial result is the same. If one of the rows differs, an exception is thrown. - */ - @Test - public void testExceptionThrownOnMismatchedPartialResults() throws IOException { - assertTrue(NUM_ROWS >= 2); - - ArrayList partials = new ArrayList<>(); - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - ResultScanner scanner = TABLE.getScanner(scan); - Result r1 = scanner.next(); - partials.add(r1); - Result r2 = scanner.next(); - partials.add(r2); - - assertFalse(Bytes.equals(r1.getRow(), r2.getRow())); - - try { - Result.createCompleteResult(partials); - fail("r1 and r2 are from different rows. It should not be possible to combine them into" - + " a single result"); - } catch (IOException e) { - } - - scanner.close(); - } - - /** * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is * true, the scanner should not return partial results. The scanner cannot return partial results * because the entire row needs to be read for the include/exclude decision to be made @@ -806,8 +782,9 @@ public class TestPartialResultsFromClientSide { public void testPartialResultsWithColumnFilter() throws Exception { testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter()); testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5"))); - testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, - Bytes.toBytes("testQualifier7"), true)); + testPartialResultsWithColumnFilter( + new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, Bytes.toBytes("testQualifier7"), + true)); Set qualifiers = new LinkedHashSet<>(); qualifiers.add(Bytes.toBytes("testQualifier5")); @@ -829,4 +806,247 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + private void moveRegion(Table table, int index) throws IOException{ + List> regions = MetaTableAccessor + .getTableRegionsAndLocations(TEST_UTIL.getConnection(), + table.getName()); + assertEquals(1, regions.size()); + HRegionInfo regionInfo = regions.get(0).getFirst(); + ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName(); + TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(name.getServerName())); + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + try { + assertArrayEquals(row, Bytes.copy(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength())); + assertArrayEquals(cf, Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())); + } catch (AssertionError e) { + throw new AssertionError( + "expected " + Bytes.toString(row) + "/" + Bytes.toString(cf) + ":" + Bytes.toString(cq) + + " but was:" + cell.toString()); + } + } + + @Test + public void testPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testReversedPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS - 2], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS - 2], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testReversedCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS - 1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS - 2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS - 3], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testBatchingResultWhenRegionMove() throws IOException { + // If user setBatch(5) and rpc returns 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. + // setBatch doesn't mean setAllowPartialResult(true) + Table table = + createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, + FAMILIES, QUALIFIERS, VALUE); + Put put = new Put(ROWS[1]); + put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]); + table.put(put); + Delete delete = new Delete(ROWS[1]); + delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + table.delete(delete); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setCaching(1); + scan.setBatch(5); + scan.setMaxResultSize(VALUE_SIZE * 6); + + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(5, result1.rawCells().length); + assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 5]); + assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(5, result2.rawCells().length); + assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(5, result3.rawCells().length); + assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]); + assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]); + assertFalse(result3.isPartial()); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) { + Result result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + Result result = scanner.next(); + assertEquals(4, result.rawCells().length); + assertFalse(result.isPartial()); + for (int i = 0; i < (NUM_ROWS-2) * NUM_FAMILIES * NUM_QUALIFIERS / 5; i++) { + result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + assertNull(scanner.next()); + } + } \ No newline at end of file -- 2.5.4 (Apple Git-61)