From 859b2f6d4dee997c222481b6998c87bf23578566 Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Tue, 1 Mar 2016 15:59:24 -0800 Subject: [PATCH] HBASE-15137 CallTimeoutException and CallQueueTooBigException should trigger PFFE --- .../hbase/client/FastFailInterceptorContext.java | 15 ++++- .../client/PreemptiveFastFailInterceptor.java | 35 ++++++++--- .../exceptions/PreemptiveFastFailException.java | 24 ++++++++ .../apache/hadoop/hbase/client/TestFastFail.java | 68 ++++++++++++++++++++++ 4 files changed, 131 insertions(+), 11 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java index 9eb56bc..3cbdfb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -27,8 +27,10 @@ class FastFailInterceptorContext extends // The variable that indicates whether we were able to connect with the server // in the last run - private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean( - false); + private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false); + + // If set, we guarantee that no modifications went to server + private MutableBoolean guaranteedClientSideOnly = new MutableBoolean(false); // The variable which indicates whether this was a retry or the first time private boolean didTry = false; @@ -53,6 +55,10 @@ class FastFailInterceptorContext extends return couldNotCommunicateWithServer; } + public MutableBoolean getGuaranteedClientSideOnly() { + return guaranteedClientSideOnly; + } + public FailureInfo getFailureInfo() { return fInfo; } @@ -78,6 +84,10 @@ class FastFailInterceptorContext extends this.couldNotCommunicateWithServer = couldNotCommunicateWithServer; } + public void setGuaranteedClientSideOnly(MutableBoolean guaranteedClientSideOnly) { + this.guaranteedClientSideOnly = guaranteedClientSideOnly; + } + public void setDidTry(boolean didTry) { this.didTry = didTry; } @@ -103,6 +113,7 @@ class FastFailInterceptorContext extends fInfo = null; didTry = false; couldNotCommunicateWithServer.setValue(false); + guaranteedClientSideOnly.setValue(false); retryDespiteFastFailMode = false; tries = 0; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index 64cd03d..6e4053a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -33,13 +33,16 @@ import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -124,7 +127,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { throw new PreemptiveFastFailException( context.getFailureInfo().numConsecutiveFailures.get(), context.getFailureInfo().timeOfFirstFailureMilliSec, - context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer()); + context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), + context.getGuaranteedClientSideOnly().isTrue()); } } context.setDidTry(true); @@ -133,7 +137,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { public void handleFailure(FastFailInterceptorContext context, Throwable t) throws IOException { handleThrowable(t, context.getServer(), - context.getCouldNotCommunicateWithServer()); + context.getCouldNotCommunicateWithServer(), + context.getGuaranteedClientSideOnly()); } public void updateFailureInfo(FastFailInterceptorContext context) { @@ -172,15 +177,27 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { } public void handleThrowable(Throwable t1, ServerName serverName, - MutableBoolean couldNotCommunicateWithServer) throws IOException { + MutableBoolean couldNotCommunicateWithServer, + MutableBoolean guaranteedClientSideOnly) throws IOException { Throwable t2 = translateException(t1); boolean isLocalException = !(t2 instanceof RemoteException); - if (isLocalException && isConnectionException(t2)) { + boolean isCallQueueTooBig = isCallQueueTooBigException(t2); + if ((isLocalException && isConnectionException(t2)) || isCallQueueTooBig) { couldNotCommunicateWithServer.setValue(true); + guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); handleFailureToServer(serverName, t2); } } + protected boolean isCallQueueTooBigException(Throwable t) { + if (t instanceof RemoteWithExtrasException) { + return CallQueueTooBigException.class.getName().equals( + ((RemoteWithExtrasException) t).getClassName().trim()); + } else { + return false; + } + } + private Throwable translateException(Throwable t) throws IOException { if (t instanceof NoSuchMethodError) { // We probably can't recover from this exception by retrying. @@ -222,11 +239,11 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { // This list covers most connectivity exceptions but not all. // For example, in SocketOutputStream a plain IOException is thrown // at times when the channel is closed. - return (e instanceof SocketTimeoutException - || e instanceof ConnectException || e instanceof ClosedChannelException - || e instanceof SyncFailedException || e instanceof EOFException - || e instanceof TimeoutException - || e instanceof ConnectionClosingException || e instanceof FailedServerException); + return (e instanceof SocketTimeoutException || e instanceof ConnectException + || e instanceof ClosedChannelException || e instanceof SyncFailedException + || e instanceof EOFException || e instanceof TimeoutException + || e instanceof CallTimeoutException || e instanceof ConnectionClosingException + || e instanceof FailedServerException); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java index 6ca1d88..ad4f2e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java @@ -38,6 +38,9 @@ import org.apache.hadoop.hbase.ServerName; private static final long serialVersionUID = 7129103682617007177L; private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec; + // If set, we guarantee that no modifications went to server + private boolean guaranteedClientSideOnly; + /** * @param count * @param timeOfFirstFailureMilliSec @@ -52,6 +55,23 @@ import org.apache.hadoop.hbase.ServerName; this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; } + /** + * @param count + * @param timeOfFirstFailureMilliSec + * @param timeOfLatestAttemptMilliSec + * @param serverName + * @param guaranteedClientSideOnly + */ + public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, + long timeOfLatestAttemptMilliSec, ServerName serverName, + boolean guaranteedClientSideOnly) { + super("Exception happened " + count + " times. to" + serverName); + this.failureCount = count; + this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; + this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; + this.guaranteedClientSideOnly = guaranteedClientSideOnly; + } + public long getFirstFailureAt() { return timeOfFirstFailureMilliSec; } @@ -67,4 +87,8 @@ import org.apache.hadoop.hbase.ServerName; public boolean wasOperationAttemptedByServer() { return false; } + + public boolean isGuaranteedClientSideOnly() { + return guaranteedClientSideOnly; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java index 5ceef01..ca06838 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java @@ -35,15 +35,21 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.ipc.Call; +import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HBaseConfTool; import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; import org.junit.After; import org.junit.AfterClass; @@ -85,6 +91,7 @@ public class TestFastFail { @Before public void setUp() throws Exception { MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); + CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0); } /** @@ -285,6 +292,49 @@ public class TestFastFail { .get()); } + @Test + public void testCallQueueTooBigException() throws Exception { + Admin admin = TEST_UTIL.getAdmin(); + + final String tableName = "testCallQueueTooBigException"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes + .toBytes(tableName))); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3); + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); + conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class); + + final Connection connection = ConnectionFactory.createConnection(conf); + + // Set max call queues size to 0 + SimpleRpcScheduler srs = (SimpleRpcScheduler) + TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler(); + Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + newConf.setInt("hbase.ipc.server.max.callqueue.length", 0); + srs.onConfigurationChange(newConf); + + try (Table table = connection.getTable(TableName.valueOf(tableName))) { + Get get = new Get(new byte[1]); + table.get(get); + } catch (Throwable ex) { + } + + assertEquals("There should have been 1 hit", 1, + CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get()); + + newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + newConf.setInt("hbase.ipc.server.max.callqueue.length", 250); + srs.onConfigurationChange(newConf); + } + public static class MyPreemptiveFastFailInterceptor extends PreemptiveFastFailInterceptor { public static AtomicInteger numBraveSouls = new AtomicInteger(); @@ -302,6 +352,24 @@ public class TestFastFail { } } + public static class CallQueueTooBigPffeInterceptor extends + PreemptiveFastFailInterceptor { + public static AtomicInteger numCallQueueTooBig = new AtomicInteger(); + + @Override + protected boolean isCallQueueTooBigException(Throwable t) { + boolean ret = super.isCallQueueTooBigException(t); + if (ret) { + numCallQueueTooBig.incrementAndGet(); + } + return ret; + } + + public CallQueueTooBigPffeInterceptor(Configuration conf) { + super(conf); + } + } + private byte[] longToByteArrayKey(long rowKey) { return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); } -- 1.9.5