From f9fee1a9259590eaa6c39a6cdf07bce6cdc9c50c Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Mon, 21 Mar 2016 11:34:36 +0800 Subject: [PATCH] HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests --- .../apache/hadoop/hbase/client/ClientScanner.java | 91 ++++++++- .../java/org/apache/hadoop/hbase/KeyValue.java | 9 +- .../hbase/TestPartialResultsFromClientSide.java | 218 ++++++++++++++++++++- 3 files changed, 304 insertions(+), 14 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 562b1f2..9dd705b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -27,9 +27,11 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue.MetaComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -38,7 +40,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -78,6 +79,10 @@ public class ClientScanner extends AbstractClientScanner { * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; + /** + * The last cell from a not full Row which is added to cache + */ + protected Cell lastCellLoadedToCache = null; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -393,7 +398,9 @@ public class ClientScanner extends AbstractClientScanner { // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; + boolean allResultsSkipped = false; do { + allResultsSkipped = false; try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just @@ -452,10 +459,15 @@ public class ClientScanner extends AbstractClientScanner { // Reset the startRow to the row we've seen last so that the new scanner starts at // the correct row. Otherwise we may see previously returned rows again. // (ScannerCallable by now has "relocated" the correct region) - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) { + if (scan.isReversed()) { + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + } else { + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + } } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + // we need rescan this row because we only loaded partial row before + scan.setStartRow(lastResult.getRow()); } } if (e instanceof OutOfOrderScannerNextException) { @@ -487,13 +499,26 @@ public class ClientScanner extends AbstractClientScanner { getResultsToAddToCache(values, callable.isHeartbeatMessage()); if (!resultsToAddToCache.isEmpty()) { for (Result rs : resultsToAddToCache) { + rs = filterLoadedCell(rs); + if (rs == null) { + continue; + } cache.add(rs); - // We don't make Iterator here for (Cell cell : rs.rawCells()) { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--; this.lastResult = rs; + if (this.lastResult.isPartial() || scan.getBatch() > 0 ) { + updateLastCellLoadedToCache(this.lastResult); + } else { + this.lastCellLoadedToCache = null; + } + } + if (cache.isEmpty()) { + // all result has been seen before, we need scan more. + allResultsSkipped = true; + continue; } } if (callable.isHeartbeatMessage()) { @@ -524,7 +549,7 @@ public class ClientScanner extends AbstractClientScanner { // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. - } while ((callable != null && callable.isHeartbeatMessage()) + } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage()) || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults) && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)))); } @@ -767,4 +792,58 @@ public class ClientScanner extends AbstractClientScanner { } return false; } + + protected void updateLastCellLoadedToCache(Result result) { + if (result.rawCells().length == 0) { + return; + } + this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1]; + } + + /** + * Compare two Cells considering reversed scanner. + * ReversedScanner only reverses rows, not columns. + */ + private int compare(Cell a, Cell b) { + int r = 0; + if (currentRegion != null && currentRegion.isMetaRegion()) { + r = CellComparator.compareRows(a, b); + } else { + r = MetaComparator.compareRows(a, b); + } + if (r != 0) { + return this.scan.isReversed() ? -r : r; + } + return CellComparator.compareWithoutRow(a, b); + } + + private Result filterLoadedCell(Result result) { + // we only filter result when last result is partial + // so lastCellLoadedToCache and result should have same row key. + // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; + // 3) read more cell. lastCellLoadedToCache and result will be not at same row. + if (lastCellLoadedToCache == null || result.rawCells().length == 0) { + return result; + } + if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) { + // The first cell of this result is larger than the last cell of loadcache. + // If user do not allow partial result, it must be true. + return result; + } + if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { + // The last cell of this result is smaller than the last cell of loadcache, skip all. + return null; + } + + // The first one must not in filtered result, we start at the second. + int index = 1; + while (index < result.rawCells().length) { + if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) { + break; + } + index++; + } + Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); + return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 0060055..13a46f3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1795,8 +1795,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return compare(left, right); } - @Override - public int compareRows(byte [] left, int loffset, int llength, + public static int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { int leftDelimiter = getDelimiter(left, loffset, llength, HConstants.DELIMITER); @@ -1829,7 +1828,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, // Now compare middlesection of row. lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter; rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter; - result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart); + result = KVComparator.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart); if (result != 0) { return result; } else { @@ -2029,7 +2028,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * @param right * @return Result comparing rows. */ - public int compareRows(final Cell left, final Cell right) { + public static int compareRows(final Cell left, final Cell right) { return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength()); } @@ -2044,7 +2043,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * @param rlength * @return 0 if equal, <0 if left smaller, >0 if right smaller */ - public int compareRows(byte [] left, int loffset, int llength, + public static int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { return Bytes.compareTo(left, loffset, llength, right, roffset, rlength); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 3794e59..1d4618c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.RandomRowFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide { private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + private final static int MINICLUSTER_SIZE = 5; private static Table TABLE = null; /** @@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide { @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE); + TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide { } /** - * Test the method {@link Result#createCompleteResult(List, Result)} + * Test the method {@link Result#createCompleteResult(List)} * @throws Exception */ @Test @@ -829,4 +832,213 @@ public class TestPartialResultsFromClientSide { testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan); } } + + + private void moveRegion(Table table, int index) throws IOException{ + List> regions = MetaTableAccessor + .getTableRegionsAndLocations(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConnection(), + table.getName()); + assertEquals(1, regions.size()); + HRegionInfo regionInfo = regions.get(0).getFirst(); + ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName(); + TEST_UTIL.getHBaseAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(name.getServerName())); + } + + private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) { + assertArrayEquals(row, + Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + assertArrayEquals(cf, + Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); + assertArrayEquals(cq, + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + } + + @Test + public void testPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testReversedPartialResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertTrue(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]); + assertTrue(result3.isPartial()); + + } + + @Test + public void testCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testReversedCompleteResultWhenRegionMove() throws IOException { + Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"), + ROWS, FAMILIES, QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setCaching(1); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + + Result result1 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result1.isPartial()); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result2.isPartial()); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]); + assertFalse(result3.isPartial()); + + } + + @Test + public void testBatchingResultWhenRegionMove() throws IOException { + Table table = + createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, + QUALIFIERS, VALUE); + + moveRegion(table, 1); + + Scan scan = new Scan(); + scan.setCaching(1); + scan.setBatch(1); + + ResultScanner scanner = table.getScanner(scan); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { + scanner.next(); + } + Result result1 = scanner.next(); + assertEquals(1, result1.rawCells().length); + Cell c1 = result1.rawCells()[0]; + assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + + moveRegion(table, 2); + + Result result2 = scanner.next(); + assertEquals(1, result2.rawCells().length); + Cell c2 = result2.rawCells()[0]; + assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + + moveRegion(table, 3); + + Result result3 = scanner.next(); + assertEquals(1, result3.rawCells().length); + Cell c3 = result3.rawCells()[0]; + assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + } + + } -- 2.5.4 (Apple Git-61)