commit cddcc28fc1682bed32d0dd775d66db5ee7b92240 Author: Jonathan Lawlor Date: Wed Apr 1 13:05:30 2015 -0700 HBASE-13374 Small scanners (with particular configurations) do not return all rows Signed-off-by: Enis Soztutar Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 3dd19fd..21491ad 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -49,7 +49,6 @@ import java.io.IOException; public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private RegionServerCallable smallScanCallable = null; - private byte[] skipRowOfFirstResult = null; private SmallScannerCallableFactory callableFactory; /** @@ -108,7 +107,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; + boolean regionChanged = true; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] startKey = this.currentRegion.getStartKey(); @@ -127,9 +126,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); - cacheNum++; + regionChanged = false; + localStartKey = createClosestRowBefore(lastResult.getRow()); } else { localStartKey = this.scan.getStartRow(); } @@ -142,7 +140,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { smallScanCallable = callableFactory.getCallable( scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -191,11 +189,6 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); for (Cell kv : rs.rawCells()) { remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 3b35e4a..1ff7788 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -53,9 +53,6 @@ import com.google.protobuf.ServiceException; public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private RegionServerCallable smallScanCallable = null; - // When fetching results from server, skip the first result if it has the same - // row with this one - private byte[] skipRowOfFirstResult = null; private SmallScannerCallableFactory callableFactory; /** @@ -159,7 +156,7 @@ public class ClientSmallScanner extends ClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; + boolean regionChanged = true; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] endKey = this.currentRegion.getEndKey(); @@ -176,9 +173,8 @@ public class ClientSmallScanner extends ClientScanner { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); - cacheNum++; + regionChanged = false; + localStartKey = Bytes.add(lastResult.getRow(), new byte[1]); } else { localStartKey = this.scan.getStartRow(); } @@ -189,7 +185,7 @@ public class ClientSmallScanner extends ClientScanner { } smallScanCallable = callableFactory.getCallable( scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -238,11 +234,6 @@ public class ClientSmallScanner extends ClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); for (Cell kv : rs.rawCells()) { remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index c16d770..235a440 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -24,8 +24,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTestConst; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -69,6 +72,8 @@ public class TestScannersFromClientSide { */ @BeforeClass public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); TEST_UTIL.startMiniCluster(3); } @@ -171,6 +176,85 @@ public class TestScannersFromClientSide { } + @Test + public void testSmallScan() throws Exception { + TableName TABLE = TableName.valueOf("testSmallScan"); + + int numRows = 10; + byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); + + int numQualifiers = 10; + byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + List puts = new ArrayList(); + for (int row = 0; row < ROWS.length; row++) { + put = new Put(ROWS[row]); + for (int qual = 0; qual < QUALIFIERS.length; qual++) { + KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); + put.add(kv); + } + puts.add(put); + } + ht.put(puts); + + int expectedRows = numRows; + int expectedCols = numRows * numQualifiers; + + // Test normal and reversed + testSmallScan(ht, true, expectedRows, expectedCols); + testSmallScan(ht, false, expectedRows, expectedCols); + } + + /** + * Run through a variety of test configurations with a small scan + * @param table + * @param reversed + * @param rows + * @param columns + * @throws Exception + */ + public void testSmallScan(HTable table, boolean reversed, int rows, int columns) throws Exception { + Scan baseScan = new Scan(); + baseScan.setReversed(reversed); + baseScan.setSmall(true); + + Scan scan = new Scan(baseScan); + verifyExpectedCounts(table, scan, rows, columns); + + scan = new Scan(baseScan); + scan.setMaxResultSize(1); + verifyExpectedCounts(table, scan, rows, columns); + + scan = new Scan(baseScan); + scan.setMaxResultSize(1); + scan.setCaching(Integer.MAX_VALUE); + verifyExpectedCounts(table, scan, rows, columns); + } + + private void verifyExpectedCounts(HTable table, Scan scan, int expectedRowCount, + int expectedCellCount) throws Exception { + ResultScanner scanner = table.getScanner(scan); + + int rowCount = 0; + int cellCount = 0; + Result r = null; + while ((r = scanner.next()) != null) { + rowCount++; + for (Cell c : r.rawCells()) { + cellCount++; + } + } + + assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, + expectedRowCount == rowCount); + assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, + expectedCellCount == cellCount); + scanner.close(); + } + /** * Test from client side for get with maxResultPerCF set *