From e1eea41eebc467a3498f07b4e3b20ecb3c466fe2 Mon Sep 17 00:00:00 2001 From: James Moore Date: Thu, 2 Mar 2017 12:47:23 -0500 Subject: [PATCH] allow Scanners to Fast Fail --- .../apache/hadoop/hbase/client/ClientScanner.java | 4 ++- .../hbase/client/ClientSmallReversedScanner.java | 25 +++++++++--------- .../hadoop/hbase/client/ClientSmallScanner.java | 6 ++--- .../hbase/client/FastFailInterceptorContext.java | 1 - .../client/PreemptiveFastFailInterceptor.java | 1 + .../hadoop/hbase/client/ReversedClientScanner.java | 2 +- .../hbase/client/RpcRetryingCallerFactory.java | 8 +++--- .../hbase/client/ScannerCallableWithReplicas.java | 30 +++++++++++++--------- .../hadoop/hbase/client/TestClientScanner.java | 2 +- .../client/TestClientSmallReversedScanner.java | 2 +- .../hbase/client/TestClientSmallScanner.java | 2 +- 11 files changed, 46 insertions(+), 37 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 053814c..b9e08b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -95,6 +95,7 @@ public class ClientScanner extends AbstractClientScanner { protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; protected RpcControllerFactory rpcControllerFactory; + protected RpcRetryingCallerFactory rpcRetryingCallerFactory; protected Configuration conf; //The timeout on the primary. Applicable if there are multiple replicas for a region //In that case, we will only wait for this much timeout on the primary before going @@ -156,6 +157,7 @@ public class ClientScanner extends AbstractClientScanner { } this.caller = rpcFactory. newCaller(); + this.rpcRetryingCallerFactory = rpcFactory; this.rpcControllerFactory = controllerFactory; this.conf = conf; @@ -336,7 +338,7 @@ public class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory); s.setCaching(nbRows); ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), - s, pool, primaryOperationTimeout, scan, + s, pool, rpcRetryingCallerFactory, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); return sr; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index bd5575a..d870444 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -20,15 +20,18 @@ package org.apache.hadoop.hbase.client; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ExecutorService; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -39,10 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; +import com.google.protobuf.ServiceException; /** *

@@ -180,10 +180,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { + Bytes.toStringBinary(localStartKey) + "'"); } - smallReversedScanCallable = - callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), - localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), - getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate); + smallReversedScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), + localStartKey, cacheNum, rpcControllerFactory, getPool(), rpcRetryingCallerFactory, + getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate); if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -322,8 +321,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, + RpcControllerFactory controllerFactory, ExecutorService pool, RpcRetryingCallerFactory rpcRetryingCallerFactory, + int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, boolean isFirstRegionToLocate) { byte[] locateStartRow = null; if (isFirstRegionToLocate @@ -337,7 +336,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, cacheNum, 0); ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan, + new ScannerCallableWithReplicas(table, connection, s, pool, rpcRetryingCallerFactory, primaryOperationTimeout, scan, retries, scannerTimeout, cacheNum, conf, caller); return scannerCallableWithReplicas; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index b1554fd..d49674e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -169,7 +169,7 @@ public class ClientSmallScanner extends ClientScanner { + Bytes.toStringBinary(localStartKey) + "'"); } smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, - getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), + getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), rpcRetryingCallerFactory, getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -301,14 +301,14 @@ public class ClientSmallScanner extends ClientScanner { public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + RpcControllerFactory controllerFactory, ExecutorService pool, RpcRetryingCallerFactory rpcRetryingCallerFactory, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller) { scan.setStartRow(localStartKey); SmallScannerCallable s = new SmallScannerCallable( connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); ScannerCallableWithReplicas scannerCallableWithReplicas = new ScannerCallableWithReplicas(table, connection, - s, pool, primaryOperationTimeout, scan, retries, + s, pool, rpcRetryingCallerFactory, primaryOperationTimeout, scan, retries, scannerTimeout, cacheNum, conf, caller); return scannerCallableWithReplicas; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java index 9eb56bc..d6d7014 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private class FastFailInterceptorContext extends RetryingCallerInterceptorContext { - // The variable that indicates whether we were able to connect with the server // in the last run private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index ce21ee9..4b62f4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -253,6 +253,7 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { for (Entry entry : repeatedFailuresMap.entrySet()) { if (now > entry.getValue().timeOfLatestAttemptMilliSec + failureMapCleanupIntervalMilliSec) { // no recent failures + LOG.info("removing failure for " + entry.getKey()); repeatedFailuresMap.remove(entry.getKey()); } else if (now > entry.getValue().timeOfFirstFailureMilliSec + this.fastFailClearingTimeMilliSec) { // been failing for a long diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 13b164d..3a7b06d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -136,7 +136,7 @@ public class ReversedClientScanner extends ClientScanner { locateStartRow, this.rpcControllerFactory); s.setCaching(nbRows); ScannerCallableWithReplicas sr = - new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool, + new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool, rpcRetryingCallerFactory, primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(), caller); return sr; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 09b70b8..2809d28 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -101,12 +101,14 @@ public class RpcRetryingCallerFactory { } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); + RetryingCallerInterceptorFactory factory = new RetryingCallerInterceptorFactory(configuration); + return instantiate(configuration, factory.build(), null); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, ServerStatisticTracker stats) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + return instantiate(configuration, (new RetryingCallerInterceptorFactory(configuration)).build(), + stats); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, @@ -127,4 +129,4 @@ public class RpcRetryingCallerFactory { factory.setStatisticTracker(stats); return factory; } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 4d5bb0f..e6c935b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -35,11 +35,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -58,7 +58,7 @@ import com.google.common.annotations.VisibleForTesting; * */ @InterfaceAudience.Private -class ScannerCallableWithReplicas implements RetryingCallable { +class ScannerCallableWithReplicas extends RegionServerCallable { private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class); volatile ScannerCallable currentScannerCallable; AtomicBoolean replicaSwitched = new AtomicBoolean(false); @@ -69,6 +69,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private final int retries; private Result lastResult; private final RpcRetryingCaller caller; + private final RpcRetryingCallerFactory rpcRetryingCallerFactory; private final TableName tableName; private Configuration conf; private int scannerTimeout; @@ -76,12 +77,14 @@ class ScannerCallableWithReplicas implements RetryingCallable { private boolean someRPCcancelled = false; //required for testing purposes only public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, - ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, + ScannerCallable baseCallable, ExecutorService pool, RpcRetryingCallerFactory rpcRetryingCallerFactory, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller caller) { + super(cConnection,tableName,scan.getStartRow()); this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; + this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; if (timeBeforeReplicas < 0) { throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); } @@ -161,9 +164,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = new ResultBoundedCompletionService>( - RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, + rpcRetryingCallerFactory, pool, rl.size() * 5); - AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. @@ -247,7 +249,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { // want to wait for the "close" to happen yet. The "wait" will happen when // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); - final RetryingRPC r = new RetryingRPC(s); + final RetryingRPC r = new RetryingRPC(s, rpcRetryingCallerFactory); pool.submit(new Callable(){ @Override public Void call() throws Exception { @@ -282,7 +284,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private void addCallsForCurrentReplica( ResultBoundedCompletionService> cs, RegionLocations rl) { - RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); + RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable, rpcRetryingCallerFactory); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); } @@ -300,7 +302,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); setStartRowForReplicaCallable(s); outstandingCallables.add(s); - RetryingRPC retryingOnReplica = new RetryingRPC(s); + RetryingRPC retryingOnReplica = new RetryingRPC(s, rpcRetryingCallerFactory); cs.submit(retryingOnReplica, scannerTimeout, id); } } @@ -335,22 +337,25 @@ class ScannerCallableWithReplicas implements RetryingCallable { return someRPCcancelled; } - class RetryingRPC implements RetryingCallable>, Cancellable { + class RetryingRPC extends RegionServerCallable> implements Cancellable { final ScannerCallable callable; + final RpcRetryingCallerFactory rpcRetryingCallerFactory; RpcRetryingCaller caller; private volatile boolean cancelled = false; - RetryingRPC(ScannerCallable callable) { + RetryingRPC(ScannerCallable callable, RpcRetryingCallerFactory rpcRetryingCallerFactory) { + super(callable.cConnection,callable.tableName,callable.getRow()); this.callable = callable; + this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; // For the Consistency.STRONG (default case), we reuse the caller // to keep compatibility with what is done in the past // For the Consistency.TIMELINE case, we can't reuse the caller // since we could be making parallel RPCs (caller.callWithRetries is synchronized // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; + if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) - .newCaller(); + this.caller = rpcRetryingCallerFactory.newCaller(); } } @@ -374,6 +379,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { } callable.prepare(reload); + this.location = callable.getLocation(); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 44a742f..84807aa 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -120,7 +120,7 @@ public class TestClientScanner { this.rpcControllerFactory); s.setCaching(nbRows); ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(), - s, pool, primaryOperationTimeout, scan, + s, pool, rpcRetryingCallerFactory, primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); return sr; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java index 57b52e6..d0f3ed8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -108,7 +108,7 @@ public class TestClientSmallReversedScanner { @Override public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, + RpcControllerFactory controllerFactory, ExecutorService pool, RpcRetryingCallerFactory rpcRetryingCallerFactory, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, boolean isFirstRegionToLocate) { return callableWithReplicas; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java index 90bf4bb..05b050b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java @@ -107,7 +107,7 @@ public class TestClientSmallScanner { @Override public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, + RpcControllerFactory controllerFactory, ExecutorService pool, RpcRetryingCallerFactory rpcRetryingCallerFactory, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller) { return callableWithReplicas; -- 2.7.4 (Apple Git-66)