From df10b2a96789de6d83974a4688dc4f1cc791f3b7 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 6 Apr 2016 21:33:14 +0800 Subject: [PATCH] HBASE-15484 Correct the semantic of batch and partial --- .../apache/hadoop/hbase/client/ClientScanner.java | 151 +++++++++++++++++++-- .../org/apache/hadoop/hbase/client/Result.java | 77 +---------- .../hbase/TestPartialResultsFromClientSide.java | 103 +++++++------- 3 files changed, 199 insertions(+), 132 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3b6b83a..bd1649f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -78,6 +78,10 @@ public abstract class ClientScanner extends AbstractClientScanner { * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + + private LinkedList tempBatchedCells = new LinkedList<>(); + private int numOfTempBatchedCells = 0; + private boolean isTempBatchedCellsStale = false; /** * The last cell from a not full Row which is added to cache */ @@ -394,9 +398,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; - boolean allResultsSkipped = false; + boolean continueScanInCurrentRegion = false; do { - allResultsSkipped = false; + continueScanInCurrentRegion = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -495,18 +499,56 @@ public abstract class ClientScanner extends AbstractClientScanner { getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { + boolean mustTheLastResultOfRow = scan.getBatch() > 0 && scan.getBatch() > rs.size() + && !rs.isPartial(); rs = filterLoadedCell(rs); if (rs == null) { + // If we will drop some results because we have loaded them to cache, we must continue + // to scan this region in next rpc. + // Set this flag to true to prevent doneWithRegion return true. + continueScanInCurrentRegion = true; continue; } + if (scan.getBatch() > 0 && !scan.getAllowPartialResults()) { + // If user setBatch but not setAllowPartialResults, we should keep the size of results + // equals to getBatch. In other words, setBatch and setAllowPartialResults are not same. + if (!tempBatchedCells.isEmpty() && compareRows(rs.rawCells()[0], + tempBatchedCells.get(0)[0]) != 0) { + // Cells in tempBatchedCells is the last part of that row. + cache.addAll(createBatchedResults(tempBatchedCells, scan.getBatch(), + isTempBatchedCellsStale, true)); + numOfTempBatchedCells = 0; + isTempBatchedCellsStale = false; + } - cache.add(rs); + // Add this rs to temp list + tempBatchedCells.add(rs.rawCells()); + isTempBatchedCellsStale = isTempBatchedCellsStale || rs.isStale(); + numOfTempBatchedCells += rs.size(); + + if (mustTheLastResultOfRow || numOfTempBatchedCells >= scan.getBatch()) { + // We have enough cells to make a complete Result or this Result is the last part. + cache.addAll( + createBatchedResults(tempBatchedCells, scan.getBatch(), + isTempBatchedCellsStale, + mustTheLastResultOfRow)); + if (tempBatchedCells.isEmpty()) { + numOfTempBatchedCells = 0; + isTempBatchedCellsStale = false; + } else { + numOfTempBatchedCells = tempBatchedCells.get(0).length; + } + } + } else { + // normal logic + cache.add(rs); + } long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; remainingResultSize -= estimatedHeapSizeOfResult; addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; - if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { + if (this.lastResult.isPartial() || scan.getBatch() > 0) { updateLastCellLoadedToCache(this.lastResult); } else { this.lastCellLoadedToCache = null; @@ -514,7 +556,7 @@ public abstract class ClientScanner extends AbstractClientScanner { } if (cache.isEmpty()) { // all result has been seen before, we need scan more. - allResultsSkipped = true; + continueScanInCurrentRegion = true; continue; } } @@ -530,6 +572,7 @@ public abstract class ClientScanner extends AbstractClientScanner { } break; } + continueScanInCurrentRegion = true; continue; } @@ -546,7 +589,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) + } while (continueScanInCurrentRegion || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } @@ -623,7 +666,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } @@ -680,7 +723,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the result is not a partial, it is a signal to us that it is the last Result we // need to form the complete Result client-side if (!result.isPartial()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } } else { @@ -688,7 +731,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // far. If our list of partials isn't empty, this is a signal to form the complete Result // since the row has now changed if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + resultsToAddToCache.add(createCompleteResult(partialResults)); clearPartialResults(); } @@ -823,13 +866,15 @@ public abstract class ClientScanner extends AbstractClientScanner { * ReversedScanner only reverses rows, not columns. */ private int compare(Cell a, Cell b) { + int r = compareRows(a, b); + return r != 0 ? r : CellComparator.compareWithoutRow(a, b); + } + + private int compareRows(Cell a, Cell b){ CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; int r = comparator.compareRows(a, b); - if (r != 0) { - return this.scan.isReversed() ? -r : r; - } - return CellComparator.compareWithoutRow(a, b); + return this.scan.isReversed() ? -r : r; } private Result filterLoadedCell(Result result) { @@ -861,4 +906,84 @@ public abstract class ClientScanner extends AbstractClientScanner { Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); } + + + /** + * Forms a single result from the partial results in the partialResults list. This method is + * useful for reconstructing partial results on the client side. + * @param partialResults list of partial cells + * @return The complete result that is formed by combining all of the partial results together + */ + @VisibleForTesting + public static Result createCompleteResult(List partialResults) { + if (partialResults.size() == 1) { + // fast-forward if we need not merge Cell arrays + Result result = partialResults.get(0); + return Result.create(result.rawCells(), result.getExists(), result.isStale()); + } + boolean stale = false; + int count = 0; + for (Result result : partialResults) { + count += result.size(); + } + Cell[] array = new Cell[count]; + int index = 0; + for (Result result : partialResults) { + System.arraycopy(result.rawCells(), 0, array, index, result.size()); + index += result.size(); + stale = stale || result.isStale(); + } + return Result.create(array, null, stale); + } + + /** + * Forms a group of batched results. + * This method will change the list by LinkedList.poll(). And may add the remaining cells to head + * if complete is false. + * If complete is false and the last part is less than batch size, + * it'll addFirst to LinkedList with remaining cells. + * @param complete true if they are last part of this row, false if there may be more + */ + @VisibleForTesting + public static List createBatchedResults(LinkedList list, int batch, + boolean stale, boolean complete) { + int count = 0; + Cell[] tmp = new Cell[batch]; + List results = new ArrayList<>(); + Cell[] cells; + while ((cells = list.poll()) != null) { + if (count == 0 && cells.length == batch) { + // fast-forward if we need not merge Cell arrays + results.add(Result.create(cells, null, stale)); + } else { + if (count + cells.length <= batch) { + System.arraycopy(cells, 0, tmp, count, cells.length); + count += cells.length; + if (count == batch) { + results.add(Result.create(tmp, null, stale)); + count = 0; + tmp = new Cell[batch]; + } + } else { + System.arraycopy(cells, 0, tmp, count, batch - count); + results.add(Result.create(tmp, null, stale)); + tmp = new Cell[batch]; + int pos = batch - count; + count = cells.length - pos; + System.arraycopy(cells, pos, tmp, 0, count); + } + } + } + if (count > 0) { + // count must less than batch here + if (complete) { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + results.add(Result.create(tmp2, null, stale)); + } else { + Cell[] tmp2 = Arrays.copyOf(tmp, count); + list.addFirst(tmp2); + } + } + return results; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index d2a49c2..ee2b346 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -781,74 +783,6 @@ public class Result implements CellScannable, CellScanner { } /** - * Forms a single result from the partial results in the partialResults list. This method is - * useful for reconstructing partial results on the client side. - * @param partialResults list of partial results - * @return The complete result that is formed by combining all of the partial results together - * @throws IOException A complete result cannot be formed because the results in the partial list - * come from different rows - */ - public static Result createCompleteResult(List partialResults) - throws IOException { - List cells = new ArrayList(); - boolean stale = false; - byte[] prevRow = null; - byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.isPartial()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); - } - } - } - - return Result.create(cells, null, stale); - } - - /** - * Get total size of raw cells - * @param result - * @return Total size. - */ - public static long getTotalSizeOfCells(Result result) { - long size = 0; - if (result.isEmpty()) { - return size; - } - for (Cell c : result.rawCells()) { - size += CellUtil.estimatedHeapSizeOf(c); - } - return size; - } - - /** * Copy another Result into this one. Needed for the old Mapred framework * @throws UnsupportedOperationException if invoked on instance of EMPTY_RESULT * (which is supposed to be immutable). @@ -899,9 +833,10 @@ public class Result implements CellScannable, CellScanner { } /** - * Whether or not the result is a partial result. Partial results contain a subset of the cells - * for a row and should be combined with a result representing the remaining cells in that row to - * form a complete (non-partial) result. + * Whether or not the result is a partial result. + * Considering getBatch == MaxInt if don't setBatch, if rawcells().length < scan.getBatch() and + * result is not the last part of this row, isPartial will return true, otherwise return false. + * So if don't setAllowPartialResults(true), isPartial should always be false. * @return Whether or not the result is a partial result */ public boolean isPartial() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index c6a2525..2c005ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -433,7 +434,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List)} + * Test the method {@link ClientScanner#createCompleteResult} * @throws Exception */ @Test @@ -455,18 +456,20 @@ public class TestPartialResultsFromClientSide { ResultScanner oneShotScanner = TABLE.getScanner(oneShotScan); ArrayList partials = new ArrayList<>(); + for (int i = 0; i < NUM_ROWS; i++) { Result partialResult = null; Result completeResult = null; Result oneShotResult = null; partials.clear(); - + int count = 0; do { partialResult = partialScanner.next(); partials.add(partialResult); - } while (partialResult != null && partialResult.isPartial()); + count += partialResult.rawCells().length; + } while (partialResult.isPartial()); - completeResult = Result.createCompleteResult(partials); + completeResult = ClientScanner.createCompleteResult(partials); oneShotResult = oneShotScanner.next(); compareResults(completeResult, oneShotResult, null); @@ -480,35 +483,6 @@ public class TestPartialResultsFromClientSide { } /** - * When reconstructing the complete result from its partials we ensure that the row of each - * partial result is the same. If one of the rows differs, an exception is thrown. - */ - @Test - public void testExceptionThrownOnMismatchedPartialResults() throws IOException { - assertTrue(NUM_ROWS >= 2); - - ArrayList partials = new ArrayList<>(); - Scan scan = new Scan(); - scan.setMaxResultSize(Long.MAX_VALUE); - ResultScanner scanner = TABLE.getScanner(scan); - Result r1 = scanner.next(); - partials.add(r1); - Result r2 = scanner.next(); - partials.add(r2); - - assertFalse(Bytes.equals(r1.getRow(), r2.getRow())); - - try { - Result.createCompleteResult(partials); - fail("r1 and r2 are from different rows. It should not be possible to combine them into" - + " a single result"); - } catch (IOException e) { - } - - scanner.close(); - } - - /** * When a scan has a filter where {@link org.apache.hadoop.hbase.filter.Filter#hasFilterRow()} is * true, the scanner should not return partial results. The scanner cannot return partial results * because the entire row needs to be read for the include/exclude decision to be made @@ -809,8 +783,9 @@ public class TestPartialResultsFromClientSide { public void testPartialResultsWithColumnFilter() throws Exception { testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter()); testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5"))); - testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, - Bytes.toBytes("testQualifier7"), true)); + testPartialResultsWithColumnFilter( + new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true, Bytes.toBytes("testQualifier7"), + true)); Set qualifiers = new LinkedHashSet<>(); qualifiers.add(Bytes.toBytes("testQualifier5")); @@ -1002,42 +977,74 @@ public class TestPartialResultsFromClientSide { assertFalse(result3.isPartial()); } - @Test public void testBatchingResultWhenRegionMove() throws IOException { + // If user setBatch(5) and rpc returns 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. + // setBatch doesn't mean setAllowPartialResult(true) Table table = createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, QUALIFIERS, VALUE); + Put put = new Put(ROWS[1]); + put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]); + table.put(put); + Delete delete = new Delete(ROWS[1]); + delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + table.delete(delete); moveRegion(table, 1); Scan scan = new Scan(); scan.setCaching(1); - scan.setBatch(1); + scan.setBatch(5); + scan.setMaxResultSize(VALUE_SIZE * 6); ResultScanner scanner = table.getScanner(scan); - for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) { scanner.next(); } Result result1 = scanner.next(); - assertEquals(1, result1.rawCells().length); - Cell c1 = result1.rawCells()[0]; - assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertEquals(5, result1.rawCells().length); + assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 5]); + assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); moveRegion(table, 2); Result result2 = scanner.next(); - assertEquals(1, result2.rawCells().length); - Cell c2 = result2.rawCells()[0]; - assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertEquals(5, result2.rawCells().length); + assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]); + assertFalse(result2.isPartial()); moveRegion(table, 3); Result result3 = scanner.next(); - assertEquals(1, result3.rawCells().length); - Cell c3 = result3.rawCells()[0]; - assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertEquals(5, result3.rawCells().length); + assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]); + assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]); + assertFalse(result3.isPartial()); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) { + Result result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + Result result = scanner.next(); + assertEquals(4, result.rawCells().length); + assertFalse(result.isPartial()); + for (int i = 2; i < NUM_ROWS; i++) { + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k += 5) { + result = scanner.next(); + assertCell(result.rawCells()[0], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + assertEquals(5, result.rawCells().length); + assertFalse(result.isPartial()); + } + } + } + assertNull(scanner.next()); } - } \ No newline at end of file -- 2.6.4 (Apple Git-63)