From de21ccc85ae1872faba1864f75108e6a7a7b414c Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Sat, 27 Dec 2014 13:31:35 +0100 Subject: [PATCH] HBASE-12761 On region jump ClientScanners should get next row start key instead of a skip. --- .../apache/hadoop/hbase/client/ClientScanner.java | 56 ++++++---------------- .../hbase/client/ClientSmallReversedScanner.java | 15 ++---- .../hadoop/hbase/client/ClientSmallScanner.java | 14 +----- 3 files changed, 21 insertions(+), 64 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3f1ba84..32eb6d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -91,7 +91,9 @@ public class ClientScanner extends AbstractClientScanner { */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) + throws IOException { + if (LOG.isTraceEnabled()) { LOG.trace("Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -309,12 +311,13 @@ public class ClientScanner extends AbstractClientScanner { /** * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the - * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics - * framework because it doesn't support multi-instances of the same metrics on the same machine; - * for scan/map reduce scenarios, we will have multiple scans running at the same time. + * application or TableInputFormat.Later, we could push it to other systems. We don't use + * metrics framework because it doesn't support multi-instances of the same metrics on the + * same machine; for scan/map reduce scenarios, we will have multiple scans running at the same + * time. * - * By default, scan metrics are disabled; if the application wants to collect them, this behavior - * can be turned on by calling calling: + * By default, scan metrics are disabled; if the application wants to collect them, this + * behavior can be turned on by calling calling: * * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)) */ @@ -346,35 +349,11 @@ public class ClientScanner extends AbstractClientScanner { boolean retryAfterOutOfOrderException = true; do { try { - if (skipFirst) { - // Skip only the first row (which was the last row of the last - // already-processed batch). - callable.setCaching(1); - values = call(scan, callable, caller, scannerTimeout); - // When the replica switch happens, we need to do certain operations - // again. The scannercallable will openScanner with the right startkey - // but we need to pick up from there. Bypass the rest of the loop - // and let the catch-up happen in the beginning of the loop as it - // happens for the cases where we see exceptions. Since only openScanner - // would have happened, values would be null - if (values == null && callable.switchedToADifferentReplica()) { - if (this.lastResult != null) { //only skip if there was something read earlier - skipFirst = true; - } - this.currentRegion = callable.getHRegionInfo(); - continue; - } - callable.setCaching(this.caching); - skipFirst = false; - } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(scan, callable, caller, scannerTimeout); - if (skipFirst && values != null && values.length == 1) { - skipFirst = false; // Already skipped, unset it before scanning again - values = call(scan, callable, caller, scannerTimeout); - } + // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop @@ -382,9 +361,6 @@ public class ClientScanner extends AbstractClientScanner { // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { - if (this.lastResult != null) { //only skip if there was something read earlier - skipFirst = true; - } this.currentRegion = callable.getHRegionInfo(); continue; } @@ -423,15 +399,13 @@ public class ClientScanner extends AbstractClientScanner { if (this.lastResult != null) { // The region has moved. We need to open a brand new scanner at // the new location. - // Reset the startRow to the row we've seen last so that the new + // Reset the startRow to the row we've seen last +0 so that the new // scanner starts at the correct row. Otherwise we may see previously // returned rows again. // (ScannerCallable by now has "relocated" the correct region) - this.scan.setStartRow(this.lastResult.getRow()); - - // Skip first row returned. We already let it out on previous - // invocation. - skipFirst = true; + this.scan.setStartRow( + Bytes.add(this.lastResult.getRow(), new byte[1]) + ); } if (e instanceof OutOfOrderScannerNextException) { if (retryAfterOutOfOrderException) { @@ -451,7 +425,7 @@ public class ClientScanner extends AbstractClientScanner { continue; } long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null ) { + if (this.scanMetrics != null) { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext); } lastNext = currentTime; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 86ff424..744f031 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.util.Bytes; public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private ScannerCallableWithReplicas smallScanCallable = null; - private byte[] skipRowOfFirstResult = null; /** * Create a new ReversibleClientScanner for the specified table Note that the @@ -62,7 +61,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); + super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + primaryOperationTimeout); } /** @@ -80,7 +80,6 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] startKey = this.currentRegion.getStartKey(); @@ -99,8 +98,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); + localStartKey = Bytes.add(this.lastResult.getRow(),new byte[1]); cacheNum++; } else { localStartKey = this.scan.getStartRow(); @@ -116,7 +114,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && lastResult != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -153,11 +151,6 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); // We don't make Iterator here for (Cell cell : rs.rawCells()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 9fc9cc6..27c65d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -52,9 +52,6 @@ import com.google.protobuf.ServiceException; public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; - // When fetching results from server, skip the first result if it has the same - // row with this one - private byte[] skipRowOfFirstResult = null; /** * Create a new ShortClientScanner for the specified table Note that the @@ -97,7 +94,6 @@ public class ClientSmallScanner extends ClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] endKey = this.currentRegion.getEndKey(); @@ -114,8 +110,7 @@ public class ClientSmallScanner extends ClientScanner { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); + localStartKey = Bytes.add(this.lastResult.getRow(),new byte[1]); cacheNum++; } else { localStartKey = this.scan.getStartRow(); @@ -129,7 +124,7 @@ public class ClientSmallScanner extends ClientScanner { getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && this.lastResult != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -218,11 +213,6 @@ public class ClientSmallScanner extends ClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); // We don't make Iterator here for (Cell cell : rs.rawCells()) { -- 1.9.3 (Apple Git-50)