From 388ddd948fb95002a8a0b8f1f18fd2f7e717c6b3 Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Tue, 10 Mar 2015 10:03:15 -0700 Subject: [PATCH] HBASE-13193: RegionScannerImpl filters should not be reset if a partial Result is returned --- .../apache/hadoop/hbase/client/ClientScanner.java | 123 +++++++++++++-------- .../apache/hadoop/hbase/regionserver/HRegion.java | 14 ++- .../hadoop/hbase/regionserver/InternalScanner.java | 5 + .../hbase/TestPartialResultsFromClientSide.java | 44 +++++++- 4 files changed, 133 insertions(+), 53 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 9993974..bfbe718 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -71,6 +71,12 @@ public class ClientScanner extends AbstractClientScanner { * result. */ protected final LinkedList partialResults = new LinkedList(); + /** + * The row for which we are accumulating partial Results (i.e. the row of the Results stored + * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync + * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} + */ + protected byte[] partialResultsRow = null; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -378,7 +384,7 @@ public class ClientScanner extends AbstractClientScanner { } catch (DoNotRetryIOException e) { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. - partialResults.clear(); + clearPartialResults(); // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us // to reset the scanner and come back in again. @@ -515,7 +521,7 @@ public class ClientScanner extends AbstractClientScanner { if (resultsFromServer == null || resultsFromServer.length == 0) { if (!partialResults.isEmpty()) { resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - partialResults.clear(); + clearPartialResults(); } return resultsToAddToCache; @@ -534,67 +540,65 @@ public class ClientScanner extends AbstractClientScanner { LOG.trace(sb.toString()); } - // There are four possibilities cases that can occur while handling partial results + // There are three possibilities cases that can occur while handling partial results // // 1. (partial != null && partialResults.isEmpty()) // This is the first partial result that we have received. It should be added to // the list of partialResults and await the next RPC request at which point another // portion of the complete result will be received // - // 2. (partial != null && !partialResults.isEmpty()) - // a. values.length == 1 - // Since partialResults contains some elements, it means that we are expecting to receive - // the remainder of the complete result within this RPC response. The fact that a partial result - // was returned and it's the ONLY result returned indicates that we are still receiving - // fragments of the complete result. The Result can be completely formed only when we have - // received all of the fragments and thus in this case we simply add the partial result to - // our list. - // - // b. values.length > 1 - // More than one result has been returned from the server. The fact that we are accumulating - // partials in partialList and we just received more than one result back from the server - // indicates that the FIRST result we received from the server must be the final fragment that - // can be used to complete our result. What this means is that the partial that we received is - // a partial result for a different row, and at this point we should combine the existing - // partials into a complete result, clear the partialList, and begin accumulating partials for - // a new row + // 2. !partialResults.isEmpty() + // Since our partialResults list is not empty it means that we have been accumulating partial + // Results for a particular row. We cannot form the complete/whole Result for that row until + // all partials for the row have been received. Thus we loop through all of the Results + // returned from the server and determine whether or not all partial Results for the row have + // been received. We know that we have received all of the partial Results for the row when: + // i) We notice a row change in the Results + // ii) We see a Result for the partial row that is NOT marked as a partial Result // - // 3. (partial == null && !partialResults.isEmpty()) - // No partial was received but we are accumulating partials in our list. That means the final - // fragment of the complete result will be the first Result in values[]. We use it to create the - // complete Result, clear the list, and add it to the list of Results that must be added to the - // cache. All other Results in values[] are added after the complete result to maintain proper - // ordering - // - // 4. (partial == null && partialResults.isEmpty()) + // 3. (partial == null && partialResults.isEmpty()) // Business as usual. We are not accumulating partial results and there wasn't a partial result // in the RPC response. This means that all of the results we received from the server are // complete and can be added directly to the cache if (partial != null && partialResults.isEmpty()) { - partialResults.add(partial); + addToPartialResults(partial); // Exclude the last result, it's a partial addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1); - } else if (partial != null && !partialResults.isEmpty()) { - if (resultsFromServer.length > 1) { - Result finalResult = resultsFromServer[0]; - partialResults.add(finalResult); - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - partialResults.clear(); + } else if (!partialResults.isEmpty()) { + for (int i = 0; i < resultsFromServer.length; i++) { + Result result = resultsFromServer[i]; + + // This result is from the same row as the partial Results. Add it to the list of partials + // and check if it was the last partial Result for that row + if (Bytes.equals(partialResultsRow, result.getRow())) { + addToPartialResults(result); + + // If the result is not a partial, it is a signal to us that it is the last Result we + // need to form the complete Result client-side + if (!result.isPartial()) { + resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + clearPartialResults(); + } + } else { + // The row of this result differs from the row of the partial results we have received so + // far. If our list of partials isn't empty, this is a signal to form the complete Result + // since the row has now changed + if (!partialResults.isEmpty()) { + resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + clearPartialResults(); + } - // Exclude first result, it was used to form our complete result - // Exclude last result, it's a partial result - addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length - 1); + // It's possible that in one response from the server we receive the final partial for + // one row and receive a partial for a different row. Thus, make sure that all Results + // are added to the proper list + if (result.isPartial()) { + addToPartialResults(result); + } else { + resultsToAddToCache.add(result); + } + } } - partialResults.add(partial); - } else if (partial == null && !partialResults.isEmpty()) { - Result finalResult = resultsFromServer[0]; - partialResults.add(finalResult); - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - partialResults.clear(); - - // Exclude the first result, it was used to form our complete result - addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length); } else { // partial == null && partialResults.isEmpty() -- business as usual addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length); } @@ -603,6 +607,31 @@ public class ClientScanner extends AbstractClientScanner { } /** + * A convenience method for adding a Result to our list of partials. This method ensure that only + * Results that belong to the same row as the other partials can be added to the list. + * @param result The result that we want to add to our list of partial Results + * @throws IOException + */ + private void addToPartialResults(final Result result) throws IOException { + final byte[] row = result.getRow(); + if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) { + throw new IOException("Partial result row does not match. All partial results must come " + + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " + + Bytes.toString(row)); + } + partialResultsRow = row; + partialResults.add(result); + } + + /** + * Convenience method for clearing the list of partials and resetting the partialResultsRow. + */ + private void clearPartialResults() { + partialResults.clear(); + partialResultsRow = null; + } + + /** * Helper method for adding results between the indices [start, end) to the outputList * @param outputList the list that results will be added to * @param inputArray the array that results are taken from diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 71b5646..3b1f267 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5538,13 +5538,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // state = nextInternal(tmpList, batchLimit, remainingResultSize); outResults.addAll(tmpList); } - // State should never be null, this is a precautionary measure - if (state == null) { - if (LOG.isTraceEnabled()) LOG.trace("State was null. Defaulting to no more values state"); - state = NextState.makeState(NextState.State.NO_MORE_VALUES); + // Invalid states should never be returned. Receiving an invalid state means that we have + // no clue how to proceed. Throw an exception. + if (!NextState.isValidState(state)) { + throw new IOException("Invalid state returned from nextInternal. state:" + state); } - resetFilters(); + // If the size limit was reached it means a partial Result is being returned. Returning a + // partial Result means that we should not reset the filters; filters should only be reset in + // between rows + if (!state.sizeLimitReached()) resetFilters(); + if (isFilterDoneInternal()) { state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index e68dc75..ea5a75f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -201,6 +201,11 @@ public interface InternalScanner extends Closeable { return resultSize >= 0; } + @Override + public String toString() { + return "State: " + state + " resultSize: " + resultSize; + } + /** * Helper method to centralize all checks as to whether or not the state is valid. * @param state diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 75fa6e3..e7c3813 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -24,7 +24,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +37,11 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -501,7 +508,7 @@ public class TestPartialResultsFromClientSide { * because the entire row needs to be read for the include/exclude decision to be made */ @Test - public void testNoPartialResultsWhenFilterPresent() throws Exception { + public void testNoPartialResultsWhenRowFilterPresent() throws Exception { Scan scan = new Scan(); scan.setMaxResultSize(1); scan.setAllowPartialResults(true); @@ -784,4 +791,39 @@ public class TestPartialResultsFromClientSide { scanner.close(); return numCells; } + + /** + * Test partial Result re-assembly in the presence of different filters. The Results from the + * partial scanner should match the Results returned from a scanner that receives all of the + * results in one RPC to the server. The partial scanner is tested with a variety of different + * result sizes (all of which are less than the size necessary to fetch an entire row) + * @throws Exception + */ + @Test + public void testPartialResultsWithColumnFilter() throws Exception { + testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter()); + testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5"))); + testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, + Bytes.toBytes("testQualifier7"), true)); + + Set qualifiers = new LinkedHashSet<>(); + qualifiers.add(Bytes.toBytes("testQualifier5")); + testPartialResultsWithColumnFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); + } + + public void testPartialResultsWithColumnFilter(Filter filter) throws Exception { + assertTrue(!filter.hasFilterRow()); + + Scan partialScan = new Scan(); + partialScan.setFilter(filter); + + Scan oneshotScan = new Scan(); + oneshotScan.setFilter(filter); + oneshotScan.setMaxResultSize(Long.MAX_VALUE); + + for (int i = 1; i <= NUM_COLS; i++) { + partialScan.setMaxResultSize(getResultSizeForNumberOfCells(i)); + testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); + } + } } -- 1.9.3 (Apple Git-50)