From 4fa0159dff54186155ad83da217ed3fbb3f9bee6 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 6 Apr 2016 23:59:14 +0800 Subject: [PATCH] HBASE-15484 Correct the semantic of batch and partial --- .../apache/hadoop/hbase/client/ClientScanner.java | 199 +++++++++++++++++---- .../hbase/client/ClientSmallReversedScanner.java | 6 +- .../hadoop/hbase/client/ClientSmallScanner.java | 6 +- .../org/apache/hadoop/hbase/client/Result.java | 80 +-------- .../hbase/TestPartialResultsFromClientSide.java | 103 ++++++----- 5 files changed, 236 insertions(+), 158 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3b6b83a..d63ca04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -78,14 +78,23 @@ public abstract class ClientScanner extends AbstractClientScanner { * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + + /** + * A list of cells which are being regrouped to batched results. + * This list is only used when user setBatch and don't setAllowPartialResults(true). + */ + private LinkedList regroupingBatchedCells = new LinkedList<>(); + private int numOfRegroupingBatchedCells = 0; + private boolean isRegroupingBatchedCellsStale = false; + /** * The last cell from a not full Row which is added to cache */ protected Cell lastCellLoadedToCache = null; protected final int caching; protected long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - protected Result lastResult = null; + // Keep lastResultLoadedToCache returned successfully in case we have to reset scanner. + protected Result lastResultLoadedToCache = null; protected final long maxScannerResultSize; private final ClusterConnection connection; private final TableName tableName; @@ -394,9 +403,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; - boolean allResultsSkipped = false; + boolean continueScanInCurrentRegion = false; do { - allResultsSkipped = false; + continueScanInCurrentRegion = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -411,6 +420,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there clearPartialResults(); + clearRegroupingBatchedCells(); this.currentRegion = callable.getHRegionInfo(); continue; } @@ -419,6 +429,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. clearPartialResults(); + clearRegroupingBatchedCells(); // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us // to reset the scanner and come back in again. if (e instanceof UnknownScannerException) { @@ -450,20 +461,20 @@ public abstract class ClientScanner extends AbstractClientScanner { } } // Else, its signal from depths of ScannerCallable that we need to reset the scanner. - if (this.lastResult != null) { + if (this.lastResultLoadedToCache != null) { // The region has moved. We need to open a brand new scanner at the new location. // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { + if (!this.lastResultLoadedToCache.isPartial() && scan.getBatch() < 0 ) { if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + scan.setStartRow(createClosestRowBefore(lastResultLoadedToCache.getRow())); } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + scan.setStartRow(Bytes.add(lastResultLoadedToCache.getRow(), new byte[1])); } } else { // we need rescan this row because we only loaded partial row before - scan.setStartRow(lastResult.getRow()); + scan.setStartRow(lastResultLoadedToCache.getRow()); } } if (e instanceof OutOfOrderScannerNextException) { @@ -495,26 +506,62 @@ public abstract class ClientScanner extends AbstractClientScanner { getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { + boolean mustTheLastResultOfRow = scan.getBatch() > 0 && scan.getBatch() > rs.size() + && !rs.isPartial(); + long estimatedHeapSizeOfResult = calcEstimatedSize(rs); + countdown--; + remainingResultSize -= estimatedHeapSizeOfResult; + addEstimatedSize(estimatedHeapSizeOfResult); rs = filterLoadedCell(rs); if (rs == null) { continue; } + if (scan.getBatch() > 0 && !scan.getAllowPartialResults()) { + // If user setBatch but not setAllowPartialResults, we should keep the size of results + // equals to getBatch. In other words, setBatch and setAllowPartialResults are not same. + if (!regroupingBatchedCells.isEmpty() + && compareRows(rs.rawCells()[0], regroupingBatchedCells.get(0)[0]) != 0) { + // Cells in regroupingBatchedCells is the last part of that row. + List list = createBatchedResults(regroupingBatchedCells, scan.getBatch(), + isRegroupingBatchedCellsStale, true); + cache.addAll(list); + clearRegroupingBatchedCells(); + this.lastResultLoadedToCache = list.get(list.size() - 1); + } - cache.add(rs); - long estimatedHeapSizeOfResult = calcEstimatedSize(rs); - countdown--; - remainingResultSize -= estimatedHeapSizeOfResult; - addEstimatedSize(estimatedHeapSizeOfResult); - this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { - updateLastCellLoadedToCache(this.lastResult); + // Add this rs to temp list + regroupingBatchedCells.add(rs.rawCells()); + isRegroupingBatchedCellsStale = isRegroupingBatchedCellsStale || rs.isStale(); + numOfRegroupingBatchedCells += rs.size(); + + if (mustTheLastResultOfRow || numOfRegroupingBatchedCells >= scan.getBatch()) { + // We have enough cells to make a complete Result or this Result is the last part. + List list = createBatchedResults(regroupingBatchedCells, scan.getBatch(), + isRegroupingBatchedCellsStale, mustTheLastResultOfRow); + cache.addAll(list); + this.lastResultLoadedToCache = list.get(list.size() - 1); + if (regroupingBatchedCells.isEmpty()) { + clearRegroupingBatchedCells(); + } else { + numOfRegroupingBatchedCells = regroupingBatchedCells.get(0).length; + } + } } else { - this.lastCellLoadedToCache = null; + // normal logic + cache.add(rs); + this.lastResultLoadedToCache = rs; + } + if (lastResultLoadedToCache != null) { + if (this.lastResultLoadedToCache.isPartial() || scan.getBatch() > 0) { + updateLastCellLoadedToCache(this.lastResultLoadedToCache); + } else { + this.lastCellLoadedToCache = null; + } } } if (cache.isEmpty()) { // all result has been seen before, we need scan more. - allResultsSkipped = true; + continueScanInCurrentRegion = true; continue; } } @@ -530,6 +577,7 @@ public abstract class ClientScanner extends AbstractClientScanner { } break; } + continueScanInCurrentRegion = true; continue; } @@ -546,7 +594,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) + } while (continueScanInCurrentRegion || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } @@ -604,11 +652,8 @@ public abstract class ClientScanner extends AbstractClientScanner { final boolean allowPartials = scan != null && scan.getAllowPartialResults(); // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note that since scan batching also returns results - // for a row in pieces we treat batch being set as equivalent to allowing partials. The - // implication of treating batching as equivalent to partial results is that it is possible - // the caller will receive a result back where the number of cells in the result is less than - // the batch size even though it may not be the last group of cells for that row. + // then simply add all results to the list. Note allowPartial and setBatch are not same, but + // we can return here because we will handle batching later. if (allowPartials || isBatchSet) { addResultsToList(resultsToAddToCache, resultsFromServer, 0, (null == resultsFromServer ? 0 : resultsFromServer.length)); @@ -623,7 +668,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } @@ -680,7 +725,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the result is not a partial, it is a signal to us that it is the last Result we // need to form the complete Result client-side if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } } else { @@ -688,7 +733,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // far. If our list of partials isn't empty, this is a signal to form the complete Result // since the row has now changed if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } @@ -734,6 +779,12 @@ public abstract class ClientScanner extends AbstractClientScanner { partialResultsRow = null; } + private void clearRegroupingBatchedCells() { + regroupingBatchedCells.clear(); + numOfRegroupingBatchedCells = 0; + isRegroupingBatchedCellsStale = false; + } + /** * Helper method for adding results between the indices [start, end) to the outputList * @param outputList the list that results will be added to @@ -823,13 +874,15 @@ public abstract class ClientScanner extends AbstractClientScanner { * ReversedScanner only reverses rows, not columns. */ private int compare(Cell a, Cell b) { + int r = compareRows(a, b); + return r != 0 ? r : CellComparator.compareWithoutRow(a, b); + } + + private int compareRows(Cell a, Cell b){ CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; int r = comparator.compareRows(a, b); - if (r != 0) { - return this.scan.isReversed() ? -r : r; - } - return CellComparator.compareWithoutRow(a, b); + return this.scan.isReversed() ? -r : r; } private Result filterLoadedCell(Result result) { @@ -859,6 +912,88 @@ public abstract class ClientScanner extends AbstractClientScanner { index++; } Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); + // Here the partial flag is only used when setAllowPartialResults(true) because we will regroup + // all results if setBatch and not allowing partial. return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); } + + + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partialResults list of partial cells + * @return The complete result that is formed by combining all of the partial results together + */ + @VisibleForTesting + public static Result createCompleteResult(List partialResults) { + if (partialResults.size() == 1) { + // fast-forward if we need not merge Cell arrays + Result result = partialResults.get(0); + return Result.create(result.rawCells(), result.getExists(), result.isStale()); + } + boolean stale = false; + int count = 0; + for (Result result : partialResults) { + count += result.size(); + } + Cell[] array = new Cell[count]; + int index = 0; + for (Result result : partialResults) { + System.arraycopy(result.rawCells(), 0, array, index, result.size()); + index += result.size(); + stale = stale || result.isStale(); + } + return Result.create(array, null, stale); + } + + /** + * Forms a group of batched results. + * This method will change the list by LinkedList.poll(). And may add the remaining cells to head + * if complete is false. + * If complete is false and the last part is less than batch size, + * it'll addFirst to LinkedList with remaining cells. + * @param complete true if they are last part of this row, false if there may be more + */ + @VisibleForTesting + public static List createBatchedResults(LinkedList list, int batch, + boolean stale, boolean complete) { + int count = 0; + Cell[] tmp = new Cell[batch]; + List results = new ArrayList<>(); + Cell[] cells; + while ((cells = list.poll()) != null) { + if (count == 0 && cells.length == batch) { + // fast-forward if we need not merge Cell arrays + results.add(Result.create(cells, null, stale)); + } else { + if (count + cells.length <= batch) { + System.arraycopy(cells, 0, tmp, count, cells.length); + count += cells.length; + if (count == batch) { + results.add(Result.create(tmp, null, stale)); + count = 0; + tmp = new Cell[batch]; + } + } else { + System.arraycopy(cells, 0, tmp, count, batch - count); + results.add(Result.create(tmp, null, stale)); + tmp = new Cell[batch]; + int pos = batch - count; + count = cells.length - pos; + System.arraycopy(cells, pos, tmp, 0, count); + } + } + } + if (count > 0) { + // count must less than batch here + if (complete) { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + results.add(Result.create(tmp2, null, stale)); + } else { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + list.addFirst(tmp2); + } + } + return results; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 5fac93a..8028c4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -153,9 +153,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { if (LOG.isDebugEnabled()) { LOG.debug("Finished with region " + this.currentRegion); } - } else if (this.lastResult != null) { + } else if (this.lastResultLoadedToCache != null) { regionChanged = false; - localStartKey = createClosestRowBefore(lastResult.getRow()); + localStartKey = createClosestRowBefore(lastResultLoadedToCache.getRow()); } else { localStartKey = this.scan.getStartRow(); } @@ -226,7 +226,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--; - this.lastResult = rs; + this.lastResultLoadedToCache = rs; } } if (smallScanCallable.hasMoreResultsContext()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index f9bdd55..74e9c4f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -156,9 +156,9 @@ public class ClientSmallScanner extends ClientSimpleScanner { if (LOG.isTraceEnabled()) { LOG.trace("Finished with region " + this.currentRegion); } - } else if (this.lastResult != null) { + } else if (this.lastResultLoadedToCache != null) { regionChanged = false; - localStartKey = Bytes.add(lastResult.getRow(), new byte[1]); + localStartKey = Bytes.add(lastResultLoadedToCache.getRow(), new byte[1]); } else { localStartKey = this.scan.getStartRow(); } @@ -272,7 +272,7 @@ public class ClientSmallScanner extends ClientSimpleScanner { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--; - this.lastResult = rs; + this.lastResultLoadedToCache = rs; } } if (smallScanCallable.hasMoreResultsContext()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..c0cf815 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -781,74 +783,6 @@ public class Result implements CellScannable, CellScanner { } /** - * Forms a single result from the partial results in the partialResults list. This method is - * useful for reconstructing partial results on the client side. - * @param partialResults list of partial results - * @return The complete result that is formed by combining all of the partial results together - * @throws IOException A complete result cannot be formed because the results in the partial list - * come from different rows - */ - public static Result createCompleteResult(List partialResults) - throws IOException { - List cells = new ArrayList(); - boolean stale = false; - byte[] prevRow = null; - byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.isPartial()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); - } - } - } - - return Result.create(cells, null, stale); - } - - /** - * Get total size of raw cells - * @param result - * @return Total size. - */ - public static long getTotalSizeOfCells(Result result) { - long size = 0; - if (result.isEmpty()) { - return size; - } - for (Cell c : result.rawCells()) { - size += CellUtil.estimatedHeapSizeOf(c); - } - return size; - } - - /** * Copy another Result into this one. Needed for the old Mapred framework * @throws UnsupportedOperationException if invoked on instance of EMPTY_RESULT * (which is supposed to be immutable). @@ -899,10 +833,12 @@ public class Result implements CellScannable, CellScanner { } /** - * Whether or not the result is a partial result. Partial results contain a subset of the cells - * for a row and should be combined with a result representing the remaining cells in that row to - * form a complete (non-partial) result. - * @return Whether or not the result is a partial result + * Whether or not this result from scanner is a partial result. + * The result will never be partial unless use {@link Scan#setAllowPartialResults(boolean)} + * while scanning. If allowing partial results, the result will still not be partial if + * ClientScanner is sure that the Result is the last one of this row. Otherwise the result will + * be partial if the number of cells in this result is less than expected, which means less than + * batch size (if setBatch) or not a complete row (if not setBatch). */ public boolean isPartial() { return partial; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index c6a2525..2c005ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -433,7 +434,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List)} + * Test the method {@link ClientScanner#createCompleteResult} * @throws Exception */ @Test @@ -455,18 +456,20 @@ public class TestPartialResultsFromClientSide { ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan); ArrayList partials = new ArrayList<>(); + for (int i = 0; i < NUM_ROWS; i++) { Result partialResult = null; Result completeResult = null; Result oneShotResult = null; partials.clear(); - + int count = 0; do { partialResult = partialScanner.next(); partials.add(partialResult); - } while (partialResult != null && partialResult.isPartial()); + count += partialResult.rawCells().length; + } while (partialResult.isPartial()); - completeResult = Result.createCompleteResult(partials); + completeResult = ClientScanner.createCompleteResult(partials); oneShotResult = oneShotScanner.next(); compareResults(completeResult, oneShotResult, null); @@ -480,35 +483,6 @@ public class TestPartialResultsFromClientSide { } /** - * When reconstructing the complete result from its partials we ensure that the row of each - * partial result is the same. If one of the rows differs, an exception is thrown. - */ - @Test - public void testExceptionThrownOnMismatchedPartialResults() throws IOException { - assertTrue(NUM_ROWS >= 2); - - ArrayList partials = new ArrayList<>(); - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - ResultScanner scanner = TABLE.getScanner(scan); - Result r1 = scanner.next(); - partials.add(r1); - Result r2 = scanner.next(); - partials.add(r2); - - assertFalse(Bytes.equals(r1.getRow(), r2.getRow())); - - try { - Result.createCompleteResult(partials); - fail("r1 and r2 are from different rows. It should not be possible to combine them into" - + " a single result"); - } catch (IOException e) { - } - - scanner.close(); - } - - /** * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is * true, the scanner should not return partial results. The scanner cannot return partial results * because the entire row needs to be read for the include/exclude decision to be made @@ -809,8 +783,9 @@ public class TestPartialResultsFromClientSide { public void testPartialResultsWithColumnFilter() throws Exception { testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter()); testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5"))); - testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, - Bytes.toBytes("testQualifier7"), true)); + testPartialResultsWithColumnFilter( + new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, Bytes.toBytes("testQualifier7"), + true)); Set qualifiers = new LinkedHashSet<>(); qualifiers.add(Bytes.toBytes("testQualifier5")); @@ -1002,42 +977,74 @@ public class TestPartialResultsFromClientSide { assertFalse(result3.isPartial()); } - @Test public void testBatchingResultWhenRegionMove() throws IOException { + // If user setBatch(5) and rpc returns 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. + // setBatch doesn't mean setAllowPartialResult(true) Table table = createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, QUALIFIERS, VALUE); + Put put = new Put(ROWS[1]); + put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]); + table.put(put); + Delete delete = new Delete(ROWS[1]); + delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + table.delete(delete); moveRegion(table, 1); Scan scan = new Scan(); scan.setCaching(1); - scan.setBatch(1); + scan.setBatch(5); + scan.setMaxResultSize(VALUE_SIZE * 6); ResultScanner scanner = table.getScanner(scan); - for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) { scanner.next(); } Result result1 = scanner.next(); - assertEquals(1, result1.rawCells().length); - Cell c1 = result1.rawCells()[0]; - assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertEquals(5, result1.rawCells().length); + assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 5]); + assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); moveRegion(table, 2); Result result2 = scanner.next(); - assertEquals(1, result2.rawCells().length); - Cell c2 = result2.rawCells()[0]; - assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertEquals(5, result2.rawCells().length); + assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]); + assertFalse(result2.isPartial()); moveRegion(table, 3); Result result3 = scanner.next(); - assertEquals(1, result3.rawCells().length); - Cell c3 = result3.rawCells()[0]; - assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertEquals(5, result3.rawCells().length); + assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]); + assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]); + assertFalse(result3.isPartial()); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) { + Result result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + Result result = scanner.next(); + assertEquals(4, result.rawCells().length); + assertFalse(result.isPartial()); + for (int i = 2; i < NUM_ROWS; i++) { + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k += 5) { + result = scanner.next(); + assertCell(result.rawCells()[0], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + } + } + assertNull(scanner.next()); } - } \ No newline at end of file -- 2.5.4 (Apple Git-61)