From 919dde0fa23aa4fa70fa3282428b30bd4642e223 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Fri, 18 Mar 2016 20:48:23 +0800 Subject: [PATCH] HBASE-15484 Correct the semantic of batch and partial --- .../apache/hadoop/hbase/client/ClientScanner.java | 327 ++++++++++++--------- .../org/apache/hadoop/hbase/client/Result.java | 77 +---- .../hbase/TestPartialResultsFromClientSide.java | 101 +++---- 3 files changed, 246 insertions(+), 259 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3b6b83a..b7fc0c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -71,13 +72,15 @@ public abstract class ClientScanner extends AbstractClientScanner { * contain results if this scanner does not have enough partial results to form the complete * result. */ - protected final LinkedList partialResults = new LinkedList(); + protected final LinkedList partialResults = new LinkedList<>(); /** * The row for which we are accumulating partial Results (i.e. the row of the Results stored * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + protected boolean isPartialResultStale = false; + protected int numOfPartialCells = 0; /** * The last cell from a not full Row which is added to cache */ @@ -394,9 +397,10 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; - boolean allResultsSkipped = false; + // A flag to make sure we must scan this region in next rpc right now. + boolean continueScanInCurrentRegion = false; do { - allResultsSkipped = false; + continueScanInCurrentRegion = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -488,36 +492,36 @@ public abstract class ClientScanner extends AbstractClientScanner { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); } lastNext = currentTime; + if (this.lastCellLoadedToCache != null && values != null && values.length > 0 && + compare(this.lastCellLoadedToCache, values[0].rawCells()[0]) >= 0) { + // If we will drop some results because we have loaded them to cache, we must continue to + // scan this region in next rpc. + // Set this flag to true to prevent doneWithRegion return true. + continueScanInCurrentRegion = true; + } // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. List resultsToAddToCache = getResultsToAddToCache(values, callable.isHeartbeatMessage()); - if (!resultsToAddToCache.isEmpty()) { - for (Result rs : resultsToAddToCache) { - rs = filterLoadedCell(rs); - if (rs == null) { - continue; - } - - cache.add(rs); - long estimatedHeapSizeOfResult = calcEstimatedSize(rs); - countdown--; - remainingResultSize -= estimatedHeapSizeOfResult; - addEstimatedSize(estimatedHeapSizeOfResult); - this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { - updateLastCellLoadedToCache(this.lastResult); - } else { - this.lastCellLoadedToCache = null; - } - } - if (cache.isEmpty()) { - // all result has been seen before, we need scan more. - allResultsSkipped = true; - continue; + for (Result rs : resultsToAddToCache) { + cache.add(rs); + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); + countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); + this.lastResult = rs; + if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { + updateLastCellLoadedToCache(this.lastResult); + } else { + this.lastCellLoadedToCache = null; } } + if (cache.isEmpty() && values != null && values.length > 0 && partialResults.isEmpty()) { + // all result has been seen before, we need scan more. + continueScanInCurrentRegion = true; + continue; + } if (callable.isHeartbeatMessage()) { if (cache.size() > 0) { // Caller of this method just wants a Result. If we see a heartbeat message, it means @@ -530,6 +534,7 @@ public abstract class ClientScanner extends AbstractClientScanner { } break; } + continueScanInCurrentRegion = true; continue; } @@ -546,7 +551,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) + } while (continueScanInCurrentRegion || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } @@ -587,7 +592,7 @@ public abstract class ClientScanner extends AbstractClientScanner { * not contain errors. We return a list of results that should be added to the cache. In general, * this list will contain all NON-partial results from the input array (unless the client has * specified that they are okay with receiving partial results) - * @param resultsFromServer The array of {@link Result}s returned from the server + * @param origionResultsFromServer The array of {@link Result}s returned from the server * @param heartbeatMessage Flag indicating whether or not the response received from the server * represented a complete response, or a heartbeat message that was sent to keep the * client-server connection alive @@ -595,119 +600,78 @@ public abstract class ClientScanner extends AbstractClientScanner { * @throws IOException */ protected List - getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage) + getResultsToAddToCache(Result[] origionResultsFromServer, boolean heartbeatMessage) throws IOException { - int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; - List resultsToAddToCache = new ArrayList(resultSize); - - final boolean isBatchSet = scan != null && scan.getBatch() > 0; - final boolean allowPartials = scan != null && scan.getAllowPartialResults(); + List filteredResults = filterResultsFromServer(origionResultsFromServer); + List resultsToAddToCache = new ArrayList<>(filteredResults.size()); // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note that since scan batching also returns results - // for a row in pieces we treat batch being set as equivalent to allowing partials. The - // implication of treating batching as equivalent to partial results is that it is possible - // the caller will receive a result back where the number of cells in the result is less than - // the batch size even though it may not be the last group of cells for that row. - if (allowPartials || isBatchSet) { - addResultsToList(resultsToAddToCache, resultsFromServer, 0, - (null == resultsFromServer ? 0 : resultsFromServer.length)); + // then simply add all results to the list. + // Set batch limit and say allowed partial result is not same (HBASE-15484) + if (scan.getAllowPartialResults()) { + resultsToAddToCache.addAll(filteredResults); return resultsToAddToCache; } // If no results were returned it indicates that either we have the all the partial results // necessary to construct the complete result or the server had to send a heartbeat message // to the client to keep the client-server connection alive - if (resultsFromServer == null || resultsFromServer.length == 0) { + if (filteredResults.isEmpty()) { // If this response was an empty heartbeat message, then we have not exhausted the region // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result - if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); + if ((origionResultsFromServer == null || origionResultsFromServer.length == 0) + && !partialResults.isEmpty() && !heartbeatMessage) { + completeCurrentPartialRow(resultsToAddToCache); } - - return resultsToAddToCache; } - - // In every RPC response there should be at most a single partial result. Furthermore, if - // there is a partial result, it is guaranteed to be in the last position of the array. - Result last = resultsFromServer[resultsFromServer.length - 1]; - Result partial = last.isPartial() ? last : null; - - if (LOG.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("number results from RPC: ").append(resultsFromServer.length).append(","); - sb.append("partial != null: ").append(partial != null).append(","); - sb.append("number of partials so far: ").append(partialResults.size()); - LOG.trace(sb.toString()); - } - - // There are three possibilities cases that can occur while handling partial results - // - // 1. (partial != null && partialResults.isEmpty()) - // This is the first partial result that we have received. It should be added to - // the list of partialResults and await the next RPC request at which point another - // portion of the complete result will be received - // - // 2. !partialResults.isEmpty() - // Since our partialResults list is not empty it means that we have been accumulating partial - // Results for a particular row. We cannot form the complete/whole Result for that row until - // all partials for the row have been received. Thus we loop through all of the Results - // returned from the server and determine whether or not all partial Results for the row have - // been received. We know that we have received all of the partial Results for the row when: - // i) We notice a row change in the Results - // ii) We see a Result for the partial row that is NOT marked as a partial Result - // - // 3. (partial == null && partialResults.isEmpty()) - // Business as usual. We are not accumulating partial results and there wasn't a partial result - // in the RPC response. This means that all of the results we received from the server are - // complete and can be added directly to the cache - if (partial != null && partialResults.isEmpty()) { - addToPartialResults(partial); - - // Exclude the last result, it's a partial - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1); - } else if (!partialResults.isEmpty()) { - for (int i = 0; i < resultsFromServer.length; i++) { - Result result = resultsFromServer[i]; - - // This result is from the same row as the partial Results. Add it to the list of partials - // and check if it was the last partial Result for that row - if (Bytes.equals(partialResultsRow, result.getRow())) { - addToPartialResults(result); - - // If the result is not a partial, it is a signal to us that it is the last Result we - // need to form the complete Result client-side - if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } + // If user setBatch(5) and rpc returns(after filterResultsFromServer) 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. In this case, the first Result with 3 cells must be + // partial because if it had 5 and we filterd two of them, we have changed the status + // to partial in filterLoadedCell. + for (Result result : filteredResults) { + // if partialResultsRow is null, Bytes.equals will return false. + if (!Bytes.equals(partialResultsRow, result.getRow())) { + // This result is a new row. We should add partialResults as a complete row to cache first. + completeCurrentPartialRow(resultsToAddToCache); + } + addToPartialResults(result); + if (scan.getBatch() > 0 && numOfPartialCells >= scan.getBatch()) { + List batchedResults = createBatchedResults(partialResults, scan.getBatch(), + isPartialResultStale, false); + // remaining partialResults has at most one Cell[] + if (partialResults.size() > 0) { + numOfPartialCells = partialResults.get(0).length; } else { - // The row of this result differs from the row of the partial results we have received so - // far. If our list of partials isn't empty, this is a signal to form the complete Result - // since the row has now changed - if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - - // It's possible that in one response from the server we receive the final partial for - // one row and receive a partial for a different row. Thus, make sure that all Results - // are added to the proper list - if (result.isPartial()) { - addToPartialResults(result); - } else { - resultsToAddToCache.add(result); - } + clearPartialResults(); + } + if (!batchedResults.isEmpty()) { + resultsToAddToCache.addAll(batchedResults); } } - } else { // partial == null && partialResults.isEmpty() -- business as usual - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length); - } + if (!result.isPartial() && (scan.getBatch() < 0 || + scan.getBatch() > 0 && result.size() < scan.getBatch())) { + // It is the last part of this row. + completeCurrentPartialRow(resultsToAddToCache); + } + } return resultsToAddToCache; } + private void completeCurrentPartialRow(List list) + throws IOException { + if (partialResultsRow == null) { + return; + } + if (scan.getBatch() > 0) { + list.addAll( + createBatchedResults(partialResults, scan.getBatch(), isPartialResultStale, true)); + } else { + list.add(createCompleteResult(partialResults, isPartialResultStale, numOfPartialCells)); + } + clearPartialResults(); + } + /** * A convenience method for adding a Result to our list of partials. This method ensure that only @@ -723,30 +687,41 @@ public abstract class ClientScanner extends AbstractClientScanner { + Bytes.toString(row)); } partialResultsRow = row; - partialResults.add(result); + partialResults.add(result.rawCells()); + isPartialResultStale = isPartialResultStale || result.isStale(); + numOfPartialCells += result.size(); } + private List filterResultsFromServer(Result[] results) { + List list = new ArrayList<>(); + if (results == null || results.length == 0) { + return list; + } + boolean skipFilter = false; + for (Result r : results) { + if (skipFilter) { + list.add(r); + } else { + int oriSize = r.size(); + r = filterLoadedCell(r); + if (r != null) { + list.add(r); + if (oriSize == r.size()) { + skipFilter = true; + } + } + } + } + return list; + } /** * Convenience method for clearing the list of partials and resetting the partialResultsRow. */ private void clearPartialResults() { partialResults.clear(); partialResultsRow = null; - } - - /** - * Helper method for adding results between the indices [start, end) to the outputList - * @param outputList the list that results will be added to - * @param inputArray the array that results are taken from - * @param start beginning index (inclusive) - * @param end ending index (exclusive) - */ - private void addResultsToList(List outputList, Result[] inputArray, int start, int end) { - if (inputArray == null || start < 0 || end > inputArray.length) return; - - for (int i = start; i < end; i++) { - outputList.add(inputArray[i]); - } + isPartialResultStale = false; + numOfPartialCells = 0; } @Override @@ -859,6 +834,82 @@ public abstract class ClientScanner extends AbstractClientScanner { index++; } Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); - return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); + // We mark this partial to be a flag that part of cells dropped + return Result.create(list, result.getExists(), result.isStale(), true); + } + + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partialResults list of partial cells + * @return The complete result that is formed by combining all of the partial results together + * @throws IOException A complete result cannot be formed because the results in the partial list + * come from different rows + */ + @VisibleForTesting + public static Result createCompleteResult(List partialResults, boolean stale, int count) + throws IOException { + if (partialResults.size() == 1) { + // fast-forward if we need not merge Cell arrays + return Result.create(partialResults.get(0), null, stale); + } + Cell[] array = new Cell[count]; + int index = 0; + for (Cell[] cells : partialResults) { + System.arraycopy(cells, 0, array, index, cells.length); + index += cells.length; + } + return Result.create(array, null, stale); + } + + /** + * Forms a group of batched results. + * This method will change the list by LinkedList.poll(). And may add the remaining cells to head + * if complete is false. + * If complete is false and the last part is less than batch size, + * it'll addFirst to LinkedList with remaining cells. + * @param complete true if they are last part of this row, false if there may be more + */ + @VisibleForTesting + public static List createBatchedResults(LinkedList list, int batch, + boolean stale, boolean complete) { + int count = 0; + Cell[] tmp = new Cell[batch]; + List results = new ArrayList<>(); + Cell[] cells; + while ((cells = list.poll()) != null) { + if (count == 0 && cells.length == batch) { + // fast-forward if we need not merge Cell arrays + results.add(Result.create(cells, null, stale)); + } else { + if (count + cells.length <= batch) { + System.arraycopy(cells, 0, tmp, count, cells.length); + count += cells.length; + if (count == batch) { + results.add(Result.create(tmp, null, stale)); + count = 0; + tmp = new Cell[batch]; + } + } else { + System.arraycopy(cells, 0, tmp, count, batch - count); + results.add(Result.create(tmp, null, stale)); + tmp = new Cell[batch]; + int pos = batch - count; + count = cells.length - pos; + System.arraycopy(cells, pos, tmp, 0, count); + } + } + } + if (count > 0) { + // count must less than batch here + if (complete) { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + results.add(Result.create(tmp2, null, stale)); + } else { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + list.addFirst(tmp2); + } + } + return results; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..ee2b346 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -781,74 +783,6 @@ public class Result implements CellScannable, CellScanner { } /** - * Forms a single result from the partial results in the partialResults list. This method is - * useful for reconstructing partial results on the client side. - * @param partialResults list of partial results - * @return The complete result that is formed by combining all of the partial results together - * @throws IOException A complete result cannot be formed because the results in the partial list - * come from different rows - */ - public static Result createCompleteResult(List partialResults) - throws IOException { - List cells = new ArrayList(); - boolean stale = false; - byte[] prevRow = null; - byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.isPartial()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); - } - } - } - - return Result.create(cells, null, stale); - } - - /** - * Get total size of raw cells - * @param result - * @return Total size. - */ - public static long getTotalSizeOfCells(Result result) { - long size = 0; - if (result.isEmpty()) { - return size; - } - for (Cell c : result.rawCells()) { - size += CellUtil.estimatedHeapSizeOf(c); - } - return size; - } - - /** * Copy another Result into this one. Needed for the old Mapred framework * @throws UnsupportedOperationException if invoked on instance of EMPTY_RESULT * (which is supposed to be immutable). @@ -899,9 +833,10 @@ public class Result implements CellScannable, CellScanner { } /** - * Whether or not the result is a partial result. Partial results contain a subset of the cells - * for a row and should be combined with a result representing the remaining cells in that row to - * form a complete (non-partial) result. + * Whether or not the result is a partial result. + * Considering getBatch == MaxInt if don't setBatch, if rawcells().length < scan.getBatch() and + * result is not the last part of this row, isPartial will return true, otherwise return false. + * So if don't setAllowPartialResults(true), isPartial should always be false. * @return Whether or not the result is a partial result */ public boolean isPartial() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index c6a2525..caac9a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -433,7 +434,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List)} + * Test the method {@link ClientScanner#createCompleteResult(List, boolean, int)} * @throws Exception */ @Test @@ -454,19 +455,21 @@ public class TestPartialResultsFromClientSide { oneShotScan.setMaxResultSize(Long.MAX_VALUE); ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan); - ArrayList partials = new ArrayList<>(); + ArrayList partials = new ArrayList<>(); + for (int i = 0; i < NUM_ROWS; i++) { Result partialResult = null; Result completeResult = null; Result oneShotResult = null; partials.clear(); - + int count = 0; do { partialResult = partialScanner.next(); - partials.add(partialResult); - } while (partialResult != null && partialResult.isPartial()); + partials.add(partialResult.rawCells()); + count += partialResult.rawCells().length; + } while (partialResult.isPartial()); - completeResult = Result.createCompleteResult(partials); + completeResult = ClientScanner.createCompleteResult(partials, false, count); oneShotResult = oneShotScanner.next(); compareResults(completeResult, oneShotResult, null); @@ -480,35 +483,6 @@ public class TestPartialResultsFromClientSide { } /** - * When reconstructing the complete result from its partials we ensure that the row of each - * partial result is the same. If one of the rows differs, an exception is thrown. - */ - @Test - public void testExceptionThrownOnMismatchedPartialResults() throws IOException { - assertTrue(NUM_ROWS >= 2); - - ArrayList partials = new ArrayList<>(); - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - ResultScanner scanner = TABLE.getScanner(scan); - Result r1 = scanner.next(); - partials.add(r1); - Result r2 = scanner.next(); - partials.add(r2); - - assertFalse(Bytes.equals(r1.getRow(), r2.getRow())); - - try { - Result.createCompleteResult(partials); - fail("r1 and r2 are from different rows. It should not be possible to combine them into" - + " a single result"); - } catch (IOException e) { - } - - scanner.close(); - } - - /** * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is * true, the scanner should not return partial results. The scanner cannot return partial results * because the entire row needs to be read for the include/exclude decision to be made @@ -809,8 +783,9 @@ public class TestPartialResultsFromClientSide { public void testPartialResultsWithColumnFilter() throws Exception { testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter()); testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5"))); - testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, - Bytes.toBytes("testQualifier7"), true)); + testPartialResultsWithColumnFilter( + new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, Bytes.toBytes("testQualifier7"), + true)); Set qualifiers = new LinkedHashSet<>(); qualifiers.add(Bytes.toBytes("testQualifier5")); @@ -1002,42 +977,68 @@ public class TestPartialResultsFromClientSide { assertFalse(result3.isPartial()); } - @Test public void testBatchingResultWhenRegionMove() throws IOException { + // If user setBatch(5) and rpc returns 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. + // setBatch doesn't mean setAllowPartialResult(true) Table table = createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, QUALIFIERS, VALUE); + Put put = new Put(ROWS[1]); + put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]); + table.put(put); + Delete delete = new Delete(ROWS[1]); + delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + table.delete(delete); moveRegion(table, 1); Scan scan = new Scan(); scan.setCaching(1); - scan.setBatch(1); + scan.setBatch(5); + scan.setMaxResultSize(VALUE_SIZE * 6); ResultScanner scanner = table.getScanner(scan); - for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) { scanner.next(); } Result result1 = scanner.next(); - assertEquals(1, result1.rawCells().length); - Cell c1 = result1.rawCells()[0]; - assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertEquals(5, result1.rawCells().length); + assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 5]); + assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); moveRegion(table, 2); Result result2 = scanner.next(); - assertEquals(1, result2.rawCells().length); - Cell c2 = result2.rawCells()[0]; - assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertEquals(5, result2.rawCells().length); + assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]); + assertFalse(result2.isPartial()); moveRegion(table, 3); Result result3 = scanner.next(); - assertEquals(1, result3.rawCells().length); - Cell c3 = result3.rawCells()[0]; - assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); - } + assertEquals(5, result3.rawCells().length); + assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]); + assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]); + assertFalse(result3.isPartial()); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) { + Result result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + Result result = scanner.next(); + assertEquals(4, result.rawCells().length); + assertFalse(result.isPartial()); + for (int i = 0; i < (NUM_ROWS - 2) * NUM_FAMILIES * NUM_QUALIFIERS / 5; i++) { + result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + assertNull(scanner.next()); + } } \ No newline at end of file -- 2.5.4 (Apple Git-61)