From 895c863973f9d3ac0e7b77cdd42f9ac26a04896e Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 1 Mar 2016 10:37:14 +0800 Subject: [PATCH] HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests --- .../apache/hadoop/hbase/client/ClientScanner.java | 79 +++++++- .../hbase/client/ScannerCallableWithReplicas.java | 15 +- .../org/apache/hadoop/hbase/CellComparator.java | 2 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 2 + .../hadoop/hbase/regionserver/ScannerContext.java | 3 +- .../hbase/TestPartialResultsFromClientSide.java | 219 ++++++++++++++++++++- 6 files changed, 308 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1658e5b..bd72426 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -77,6 +78,7 @@ public abstract class ClientScanner extends AbstractClientScanner { * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + protected Cell lastCellOfLoadCache = null; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -391,7 +393,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; + boolean allResultsSkipped = false; do { + allResultsSkipped = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -455,10 +459,15 @@ public abstract class ClientScanner extends AbstractClientScanner { // scanner starts at the correct row. Otherwise we may see previously // returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + if (!this.lastResult.isPartial()) { + if (scan.isReversed()) { + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + } else { + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + } } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + // we need rescan this row because we only loaded partial row before + scan.setStartRow(lastResult.getRow()); } } if (e instanceof OutOfOrderScannerNextException) { @@ -491,12 +500,24 @@ public abstract class ClientScanner extends AbstractClientScanner { getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { + if (this.lastResult != null && this.lastResult.isPartial()) { + rs = filterLoadedCell(rs); + if (rs == null) { + continue; + } + } cache.add(rs); long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; remainingResultSize -= estimatedHeapSizeOfResult; addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; + updateLastCellOfLoadCache(this.lastResult); + } + if (cache.isEmpty()) { + // all result has been seen before, we need scan more. + allResultsSkipped = true; + continue; } } @@ -525,8 +546,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) - && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))); + } while (allResultsSkipped || + (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) + && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } /** @@ -788,4 +810,51 @@ public abstract class ClientScanner extends AbstractClientScanner { } return false; } + + protected void updateLastCellOfLoadCache(Result result) { + if (result.rawCells().length == 0) { + return; + } + this.lastCellOfLoadCache = result.rawCells()[result.rawCells().length - 1]; + } + + private int compare(Cell a, Cell b) { + int r = CellComparator.COMPARATOR.compareRows(a, b); + if (r != 0) { + return this.scan.isReversed() ? -r : r; + } + return CellComparator.compareWithoutRow(a, b); + } + + private Result filterLoadedCell(Result result) { + // we only filter result when last result is partial + // so lastCellOfLoadCache and result should have same row key. + // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; + // 3) read more cell. lastCellOfLoadCache and result will be not at same row. + if (lastCellOfLoadCache == null || result.rawCells().length == 0) { + return result; + } + if (compare(this.lastCellOfLoadCache, result.rawCells()[0]) < 0) { + // The first cell of this result is larger than the last cell of loadcache. + // If user do not allow partial result, it must be true. + return result; + } + if (compare(this.lastCellOfLoadCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { + // The last cell of this result is smaller than the last cell of loadcache, skip all. + return null; + } + + int index = 0; + while (index < result.rawCells().length) { + if (compare(this.lastCellOfLoadCache, result.rawCells()[index]) < 0) { + break; + } + index++; + } + List list = new ArrayList<>(result.rawCells().length - index); + for (; index < result.rawCells().length; index++) { + list.add(result.rawCells()[index]); + } + return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index a197e90..ed868b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -178,6 +178,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } + if(r!=null&&r.getFirst()!=null) { + LOG.info("get results:"); + for (Result result : r.getFirst()) { + LOG.info(result.isPartial() + " " + result.toString()); + } + } return r == null ? null : r.getFirst(); //great we got a response } } catch (ExecutionException e) { @@ -225,7 +231,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; // store where to start the replica scanner from if we need to. - if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; + if (result != null && result.length != 0) { + this.lastResult = result[result.length - 1]; + LOG.info("updateCurrentlyServingReplica " + result.length + " " + result[result.length - 1] + .isPartial() + " " + result[result.length - 1].toString()); + } + if (LOG.isTraceEnabled()) { LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); @@ -294,7 +305,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { continue; //this was already scheduled earlier } ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); - setStartRowForReplicaCallable(s); + //setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica, scannerTimeout, id); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index d869b3e..a5e26cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -378,7 +378,7 @@ public class CellComparator implements Comparator, Serializable { roffset, rlength); } - private static int compareWithoutRow(final Cell left, final Cell right) { + public static int compareWithoutRow(final Cell left, final Cell right) { // If the column is not specified, the "minimum" key type appears the // latest in the sorted order, regardless of the timestamp. This is used // for specifying the last key/value in a given row, because there is no diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7bcde52..dfed77d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2549,6 +2549,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); Scan scan = ProtobufUtil.toScan(protoScan); + + LOG.info("Scan rpc request: " + Bytes.toString(scan.getStartRow())); // if the request doesn't set this, get the default region setting. if (!isLoadingCfsOnDemandSet) { scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 6674443..32dcf86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -226,7 +226,8 @@ public class ScannerContext { */ boolean partialResultFormed() { return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW - || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW; + || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW + || scannerState == NextState.BATCH_LIMIT_REACHED; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index a6f8373..16447b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide { private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + private final static int MINICLUSTER_SIZE = 5; private static Table TABLE = null; /** @@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide { @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE); + TEST_UTIL.getAdmin().setBalancerRunning(false, true); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List, Result)} + * Test the method {@link Result#createCompleteResult(List)} * @throws Exception */ @Test @@ -829,4 +832,214 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + private void moveRegion(Table table, int index) throws IOException{ + List> regions = MetaTableAccessor + .getTableRegionsAndLocations(TEST_UTIL.getConnection(), + table.getName()); + assertEquals(1, regions.size()); + HRegionInfo regionInfo = regions.get(0).getFirst(); + ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName(); + TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(name.getServerName())); + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + assertArrayEquals(row, + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(cf, + Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + } + + @Test + public void testPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testReversedPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testReversedCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testBatchingResultWhenRegionMove() throws IOException { + Table table = + createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, + FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setCaching(1); + scan.setBatch(1); + + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertTrue(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } } \ No newline at end of file -- 2.5.4 (Apple Git-61)