From 055ac94bc1a3f3bf03e73c6585c948a7a5a65c54 Mon Sep 17 00:00:00 2001 From: rahulgidwani Date: Wed, 22 Apr 2015 11:27:04 -0700 Subject: [PATCH] Backport of HBASE-13527 https://issues.apache.org/jira/browse/HBASE-13527 --- .../apache/hadoop/hbase/client/ClientScanner.java | 10 +++++ .../org/apache/hadoop/hbase/client/HTable.java | 6 +++ .../hadoop/hbase/client/TableConfiguration.java | 10 +++++ .../hbase/client/TestScannersFromClientSide.java | 50 ++++++++++++++++++++++ 4 files changed, 76 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 9874cd6..e2a41fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.LinkedList; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -214,6 +215,10 @@ public class ClientScanner extends AbstractClientScanner { return lastNext; } + protected long getMaxResultSize() { + return maxScannerResultSize; + } + // returns true if the passed region endKey protected boolean checkScanStopRow(final byte [] endKey) { if (this.scan.getStopRow().length > 0) { @@ -342,6 +347,11 @@ public class ClientScanner extends AbstractClientScanner { return null; } + @VisibleForTesting + public int getCacheSize() { + return cache != null ? cache.size() : 0; + } + /** * Contact the servers to load more {@link Result}s in the cache. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a190237..736064e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -134,6 +134,7 @@ public class HTable implements HTableInterface { private boolean autoFlush; protected long currentWriteBufferSize; protected int scannerCaching; + protected long scannerMaxResultSize; private ExecutorService pool; // For Multi private boolean closed; private int operationTimeout; @@ -377,6 +378,7 @@ public class HTable implements HTableInterface { this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = tableConfiguration.getScannerCaching(); + this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, @@ -760,6 +762,10 @@ public class HTable implements HTableInterface { scan.setCaching(getScannerCaching()); } + if (scan.getMaxResultSize() <= 0) { + scan.setMaxResultSize(scannerMaxResultSize); + } + if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 11d56de..9789cbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -44,6 +44,8 @@ public class TableConfiguration { private final int scannerCaching; + private final long scannerMaxResultSize; + private final int retries; private final int maxKeyValueSize; @@ -65,6 +67,9 @@ public class TableConfiguration { this.scannerCaching = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.scannerMaxResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -82,6 +87,7 @@ public class TableConfiguration { this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; + this.scannerMaxResultSize = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.maxKeyValueSize = -1; } @@ -102,6 +108,10 @@ public class TableConfiguration { return scannerCaching; } + public long getScannerMaxResultSize() { + return scannerMaxResultSize; + } + public int getRetriesNumber() { return retries; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 235a440..7003f83 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -601,6 +601,56 @@ public class TestScannersFromClientSide { verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); } + @Test + public void testMaxResultSizeIsSetToDefault() throws Exception { + TableName TABLE = TableName.valueOf("testMaxResultSizeIsSetToDefault"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + + // The max result size we expect the scan to use by default. + long expectedMaxResultSize = + TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + + int numRows = 5; + byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); + + int numQualifiers = 10; + byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); + + // Specify the cell size such that a single row will be larger than the default + // value of maxResultSize. This means that Scan RPCs should return at most a single + // result back to the client. + int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); + byte[] cellValue = Bytes.createMaxByteArray(cellSize); + + Put put; + List puts = new ArrayList(); + for (byte[] ROW1 : ROWS) { + put = new Put(ROW1); + for (byte[] QUALIFIER1 : QUALIFIERS) { + KeyValue kv = new KeyValue(ROW1, FAMILY, QUALIFIER1, cellValue); + put.add(kv); + } + puts.add(put); + } + ht.put(puts); + + // Create a scan with the default configuration. + Scan scan = new Scan(); + + ResultScanner scanner = ht.getScanner(scan); + assertTrue(scanner instanceof ClientScanner); + ClientScanner clientScanner = (ClientScanner) scanner; + + // Call next to issue a single RPC to the server + scanner.next(); + + // The scanner should have, at most, a single result in its cache. If there more results exists + // in the cache it means that more than the expected max result size was fetched. + assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", + clientScanner.getCacheSize() <= 1); + } + static void verifyResult(Result result, List expKvList, boolean toLog, String msg) { -- 2.1.0