From 3f19c6fcd891c1f9aca8d5af85f8ef27e4fb8620 Mon Sep 17 00:00:00 2001 From: manukranthk Date: Tue, 23 Sep 2014 19:15:09 -0700 Subject: [PATCH] Implement Preemptive Fast Fail --- .../hadoop/hbase/client/ClusterConnection.java | 3 + .../hadoop/hbase/client/ConnectionAdapter.java | 5 + .../hadoop/hbase/client/ConnectionManager.java | 10 +- .../apache/hadoop/hbase/client/FailureInfo.java | 58 ++++ .../hbase/client/FastFailInterceptorContext.java | 126 +++++++ .../org/apache/hadoop/hbase/client/HTable.java | 3 +- .../client/NoOpRetryableCallerInterceptor.java | 65 ++++ .../client/NoOpRetryingInterceptorContext.java | 41 +++ .../client/PreemptiveFastFailInterceptor.java | 385 +++++++++++++++++++++ .../hbase/client/RetryingCallerInterceptor.java | 94 +++++ .../client/RetryingCallerInterceptorContext.java | 67 ++++ .../client/RetryingCallerInterceptorFactory.java | 77 +++++ .../hadoop/hbase/client/RpcRetryingCaller.java | 21 +- .../hbase/client/RpcRetryingCallerFactory.java | 19 +- .../exceptions/ConnectionClosingException.java | 59 ++++ .../exceptions/PreemptiveFastFailException.java | 70 ++++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 18 +- .../java/org/apache/hadoop/hbase/HConstants.java | 27 ++ .../apache/hadoop/hbase/util/ExceptionUtil.java | 6 + .../apache/hadoop/hbase/client/TestFastFail.java | 313 +++++++++++++++++ .../hbase/client/TestFastFailWithoutTestUtil.java | 308 +++++++++++++++++ 21 files changed, 1763 insertions(+), 12 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index ad67f63..78b97e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -270,4 +270,7 @@ public interface ClusterConnection extends HConnection { * @return Default AsyncProcess associated with this connection. */ AsyncProcess getAsyncProcess(); + + public RetryingCallerInterceptor getRetryingCallerInterceptor(); } + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 1e569b8..4ec60f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -440,4 +440,9 @@ class ConnectionAdapter implements ClusterConnection { public AsyncProcess getAsyncProcess() { return wrappedConnection.getAsyncProcess(); } + + @Override + public RetryingCallerInterceptor getRetryingCallerInterceptor() { + return RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 80e22b9..dc42d9f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -601,6 +601,8 @@ class ConnectionManager { private RpcControllerFactory rpcControllerFactory; + private RetryingCallerInterceptor interceptor; + /** * Cluster registry of basic info such as clusterid and meta region location. */ @@ -631,7 +633,8 @@ class ConnectionManager { retrieveClusterId(); this.rpcClient = new RpcClient(this.conf, this.clusterId); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? @@ -2532,6 +2535,11 @@ class ConnectionManager { master.close(); } } + + @Override + public RetryingCallerInterceptor getRetryingCallerInterceptor() { + return this.interceptor; + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java new file mode 100644 index 0000000..e8b2231 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Keeps track of repeated failures to any region server. Multiple threads manipulate the contents + * of this thread. + * + * Access to the members is guarded by the concurrent nature of the members inherently. + * + */ +public class FailureInfo { + // The number of consecutive failures. + public final AtomicLong numConsecutiveFailures = new AtomicLong(); + // The time when the server started to become unresponsive + // Once set, this would never be updated. + public final long timeOfFirstFailureMilliSec; + // The time when the client last tried to contact the server. + // This is only updated by one client at a time + public volatile long timeOfLatestAttemptMilliSec; + // Used to keep track of concurrent attempts to contact the server. + // In Fast fail mode, we want just one client thread to try to connect + // the rest of the client threads will fail fast. + public final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean( + false); + + @Override + public String toString() { + return "FailureInfo: numConsecutiveFailures = " + + numConsecutiveFailures + " timeOfFirstFailureMilliSec = " + + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = " + + timeOfLatestAttemptMilliSec + + " exclusivelyRetringInspiteOfFastFail = " + + exclusivelyRetringInspiteOfFastFail.get(); + } + + FailureInfo(long firstFailureTime) { + this.timeOfFirstFailureMilliSec = firstFailureTime; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java new file mode 100644 index 0000000..5c4e563 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FastFailInterceptorContext extends + RetryingCallerInterceptorContext { + + // The variable that indicates whether we were able to connect with the server + // in the last run + private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean( + false); + + // The variable which indicates whether this was a retry or the first time + private boolean didTry = false; + + // The failure info that is associated with the machine which we are trying to + // contact as part of this attempt. + private FailureInfo fInfo = null; + + // Variable indicating that the thread that is currently executing the + // operation is in a mode where it would retry instead of failing fast, so + // that we can figure out whether making contact with the server is + // possible or not. + private boolean retryDespiteFastFailMode = false; + + // The server that would be contacted to successfully complete this operation. + private ServerName server; + + // The number of the retry we are currenty doing. + private int tries; + + public MutableBoolean getCouldNotCommunicateWithServer() { + return couldNotCommunicateWithServer; + } + + public FailureInfo getFailureInfo() { + return fInfo; + } + + public ServerName getServer() { + return server; + } + + public int getTries() { + return tries; + } + + public boolean didTry() { + return didTry; + } + + public boolean isRetryDespiteFastFailMode() { + return retryDespiteFastFailMode; + } + + public void setCouldNotCommunicateWithServer( + MutableBoolean couldNotCommunicateWithServer) { + this.couldNotCommunicateWithServer = couldNotCommunicateWithServer; + } + + public void setDidTry(boolean didTry) { + this.didTry = didTry; + } + + public void setFailureInfo(FailureInfo fInfo) { + this.fInfo = fInfo; + } + + public void setRetryDespiteFastFailMode(boolean retryDespiteFastFailMode) { + this.retryDespiteFastFailMode = retryDespiteFastFailMode; + } + + public void setServer(ServerName server) { + this.server = server; + } + + public void setTries(int tries) { + this.tries = tries; + } + + public void clear() { + server = null; + fInfo = null; + didTry = false; + couldNotCommunicateWithServer = new MutableBoolean(false); + retryDespiteFastFailMode = false; + tries = 0; + } + + public FastFailInterceptorContext prepare(RetryingCallable callable) { + return prepare(callable, 0); + } + + public FastFailInterceptorContext prepare(RetryingCallable callable, + int tries) { + if (callable instanceof RegionServerCallable) { + RegionServerCallable retryingCallable = (RegionServerCallable) callable; + server = retryingCallable.getLocation().getServerName(); + } + this.tries = tries; + return this; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 8a6575e..be120aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -349,7 +349,8 @@ public class HTable implements HTableInterface, RegionLocator { this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, + connection.getRetryingCallerInterceptor()); this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); // puts need to track errors globally due to how the APIs currently work. ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java new file mode 100644 index 0000000..911fa60 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; + +/** + * Class that acts as a NoOpInterceptor. This class is used in case the + * {@link RetryingCallerInterceptor} was not configured correctly or an + * {@link RetryingCallerInterceptor} was never configured in the first place. + * + */ +public class NoOpRetryableCallerInterceptor extends RetryingCallerInterceptor { + private static final RetryingCallerInterceptorContext NO_OP_CONTEXT = + new NoOpRetryingInterceptorContext(); + + public NoOpRetryableCallerInterceptor() { + } + + public NoOpRetryableCallerInterceptor(Configuration conf) { + super(); + } + + @Override + public void intercept( + RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext) + throws PreemptiveFastFailException { + } + + @Override + public void handleFailure(RetryingCallerInterceptorContext context, + Throwable t) throws IOException { + } + + @Override + public void updateFailureInfo(RetryingCallerInterceptorContext context) { + } + + @Override + public RetryingCallerInterceptorContext createEmptyContext() { + return NO_OP_CONTEXT; + } + + @Override + public String toString() { + return "NoOpRetryableCallerInterceptor"; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java new file mode 100644 index 0000000..d4d759d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +public class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext { + + @Override + public void clear() { + // Do Nothing + } + + @Override + public RetryingCallerInterceptorContext prepare( + RetryingCallable callable) { + // Do Nothing + return this; + } + + @Override + public RetryingCallerInterceptorContext prepare( + RetryingCallable callable, int tries) { + // Do Nothing + return this; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java new file mode 100644 index 0000000..5f57e4d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.EOFException; +import java.io.IOException; +import java.io.SyncFailedException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +public class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { + + public static final Log LOG = LogFactory + .getLog(PreemptiveFastFailInterceptor.class); + // amount of time to wait before we consider a server to be in fast fail + // mode + protected final long fastFailThresholdMilliSec; + // Keeps track of failures when we cannot talk to a server. Helps in + // fast failing clients if the server is down for a long time. + protected final ConcurrentMap repeatedFailuresMap = + new ConcurrentHashMap(); + // We populate repeatedFailuresMap every time there is a failure. So, to + // keep it + // from growing unbounded, we garbage collect the failure information + // every cleanupInterval. + protected final long failureMapCleanupIntervalMilliSec; + protected volatile long lastFailureMapCleanupTimeMilliSec; + // clear failure Info. Used to clean out all entries. + // A safety valve, in case the client does not exit the + // fast fail mode for any reason. + private long fastFailClearingTimeMilliSec; + private final ThreadLocal threadRetryingInFastFailMode = + new ThreadLocal(); + + public PreemptiveFastFailInterceptor(Configuration conf) { + this.fastFailThresholdMilliSec = conf.getLong( + HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); + this.failureMapCleanupIntervalMilliSec = conf.getLong( + HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); + lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); + } + + public void intercept(FastFailInterceptorContext context) + throws PreemptiveFastFailException { + context.setFailureInfo(repeatedFailuresMap.get(context.getServer())); + if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context + .getFailureInfo())); + if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry + LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " + + context.getTries()); + throw new PreemptiveFastFailException( + context.getFailureInfo().numConsecutiveFailures.get(), + context.getFailureInfo().timeOfFirstFailureMilliSec, + context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer()); + } + } + context.setDidTry(true); + } + + public void handleFailure(FastFailInterceptorContext context, + Throwable t) throws IOException { + handleThrowable(t, context.getServer(), + context.getCouldNotCommunicateWithServer()); + } + + public void updateFailureInfo(FastFailInterceptorContext context) { + updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), + context.didTry(), context.getCouldNotCommunicateWithServer() + .booleanValue(), context.isRetryDespiteFastFailMode()); + } + + /** + * Handles failures encountered when communicating with a server. + * + * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. + * Throws RepeatedConnectException if the client is in Fast fail mode. + * + * @param serverName + * @param t + * - the throwable to be handled. + * @throws PreemptiveFastFailException + */ + private void handleFailureToServer(ServerName serverName, Throwable t) { + if (serverName == null || t == null) { + return; + } + long currentTime = EnvironmentEdgeManager.currentTime(); + FailureInfo fInfo = repeatedFailuresMap.get(serverName); + if (fInfo == null) { + fInfo = new FailureInfo(currentTime); + FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo); + + if (oldfInfo != null) { + fInfo = oldfInfo; + } + } + fInfo.timeOfLatestAttemptMilliSec = currentTime; + fInfo.numConsecutiveFailures.incrementAndGet(); + } + + public void handleThrowable(Throwable t1, ServerName serverName, + MutableBoolean couldNotCommunicateWithServer) throws IOException { + Throwable t2 = translateException(t1); + boolean isLocalException = !(t2 instanceof RemoteException); + if (isLocalException && isConnectionException(t2)) { + couldNotCommunicateWithServer.setValue(true); + handleFailureToServer(serverName, t2); + } + } + + private Throwable translateException(Throwable t) throws IOException { + if (t instanceof NoSuchMethodError) { + // We probably can't recover from this exception by retrying. + LOG.error(t); + throw (NoSuchMethodError) t; + } + + if (t instanceof NullPointerException) { + // The same here. This is probably a bug. + LOG.error(t.getMessage(), t); + throw (NullPointerException) t; + } + + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) t; + } + if (t instanceof Error) { + throw (Error) t; + } + return t; + } + + /** + * Check if the exception is something that indicates that we cannot + * contact/communicate with the server. + * + * @param e + * @return + */ + private boolean isConnectionException(Throwable e) { + if (e == null) + return false; + // This list covers most connectivity exceptions but not all. + // For example, in SocketOutputStream a plain IOException is thrown + // at times when the channel is closed. + return (e instanceof SocketTimeoutException + || e instanceof ConnectException || e instanceof ClosedChannelException + || e instanceof SyncFailedException || e instanceof EOFException + || e instanceof TimeoutException + || e instanceof ConnectionClosingException || e instanceof FailedServerException); + } + + /** + * Occasionally cleans up unused information in repeatedFailuresMap. + * + * repeatedFailuresMap stores the failure information for all remote hosts + * that had failures. In order to avoid these from growing indefinitely, + * occassionallyCleanupFailureInformation() will clear these up once every + * cleanupInterval ms. + */ + protected void occasionallyCleanupFailureInformation() { + long now = System.currentTimeMillis(); + if (!(now > lastFailureMapCleanupTimeMilliSec + + failureMapCleanupIntervalMilliSec)) + return; + + // remove entries that haven't been attempted in a while + // No synchronization needed. It is okay if multiple threads try to + // remove the entry again and again from a concurrent hash map. + StringBuilder sb = new StringBuilder(); + for (Entry entry : repeatedFailuresMap.entrySet()) { + if (now > entry.getValue().timeOfLatestAttemptMilliSec + + failureMapCleanupIntervalMilliSec) { // no recent failures + repeatedFailuresMap.remove(entry.getKey()); + } else if (now > entry.getValue().timeOfFirstFailureMilliSec + + this.fastFailClearingTimeMilliSec) { // been failing for a long + // time + LOG.error(entry.getKey() + + " been failing for a long time. clearing out." + + entry.getValue().toString()); + repeatedFailuresMap.remove(entry.getKey()); + } else { + sb.append(entry.getKey().toString()).append(" failing ") + .append(entry.getValue().toString()).append("\n"); + } + } + if (sb.length() > 0 + // If there are multiple threads cleaning up, try to see that only one + // will log the msg. + && now > this.lastFailureMapCleanupTimeMilliSec + + this.failureMapCleanupIntervalMilliSec) { + LOG.warn("Preemptive failure enabled for : " + sb.toString()); + } + lastFailureMapCleanupTimeMilliSec = now; + } + + /** + * Checks to see if we are in the Fast fail mode for requests to the server. + * + * If a client is unable to contact a server for more than + * fastFailThresholdMilliSec the client will get into fast fail mode. + * + * @param server + * @return true if the client is in fast fail mode for the server. + */ + private boolean inFastFailMode(ServerName server) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + // if fInfo is null --> The server is considered good. + // If the server is bad, wait long enough to believe that the server is + // down. + return (fInfo != null && + EnvironmentEdgeManager.currentTime() > + (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); + } + + /** + * Checks to see if the current thread is already in FastFail mode for *some* + * server. + * + * @return true, if the thread is already in FF mode. + */ + private boolean currentThreadInFastFailMode() { + return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode + .get().booleanValue() == true)); + } + + /** + * Check to see if the client should try to connnect to the server, inspite of + * knowing that it is in the fast fail mode. + * + * The idea here is that we want just one client thread to be actively trying + * to reconnect, while all the other threads trying to reach the server will + * short circuit. + * + * @param fInfo + * @return true if the client should try to connect to the server. + */ + protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { + // We believe that the server is down, But, we want to have just one + // client + // actively trying to connect. If we are the chosen one, we will retry + // and not throw an exception. + if (fInfo != null + && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) { + MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode + .get(); + if (threadAlreadyInFF == null) { + threadAlreadyInFF = new MutableBoolean(); + this.threadRetryingInFastFailMode.set(threadAlreadyInFF); + } + threadAlreadyInFF.setValue(true); + return true; + } else { + return false; + } + } + + /** + * + * This function updates the Failure info for a particular server after the + * attempt to + * + * @param server + * @param fInfo + * @param couldNotCommunicate + * @param retryDespiteFastFailMode + */ + private void updateFailureInfoForServer(ServerName server, + FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, + boolean retryDespiteFastFailMode) { + if (server == null || fInfo == null || didTry == false) + return; + + // If we were able to connect to the server, reset the failure + // information. + if (couldNotCommunicate == false) { + LOG.info("Clearing out PFFE for server " + server.getServerName()); + repeatedFailuresMap.remove(server); + } else { + // update time of last attempt + long currentTime = System.currentTimeMillis(); + fInfo.timeOfLatestAttemptMilliSec = currentTime; + + // Release the lock if we were retrying inspite of FastFail + if (retryDespiteFastFailMode) { + fInfo.exclusivelyRetringInspiteOfFastFail.set(false); + threadRetryingInFastFailMode.get().setValue(false); + } + } + + occasionallyCleanupFailureInformation(); + } + + public void updateFailureInfoForServer(ServerName server, boolean didTry, + boolean couldNotCommunicate) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + boolean retryDespiteFastFailMode = false; + if (inFastFailMode(server) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + } + + updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, + retryDespiteFastFailMode); + } + + @Override + public void intercept(RetryingCallerInterceptorContext context) + throws PreemptiveFastFailException { + if (context instanceof FastFailInterceptorContext) { + intercept((FastFailInterceptorContext) context); + } + } + + @Override + public void handleFailure(RetryingCallerInterceptorContext context, + Throwable t) throws IOException { + if (context instanceof FastFailInterceptorContext) { + handleFailure((FastFailInterceptorContext) context, t); + } + } + + @Override + public void updateFailureInfo(RetryingCallerInterceptorContext context) { + if (context instanceof FastFailInterceptorContext) { + updateFailureInfo((FastFailInterceptorContext) context); + } + } + + @Override + public RetryingCallerInterceptorContext createEmptyContext() { + return new FastFailInterceptorContext(); + } + + @Override + public String toString() { + return "PreemptiveFastFailInterceptor"; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java new file mode 100644 index 0000000..0ffbb7e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +/** + * This class is designed to fit into the RetryingCaller class which forms the + * central piece of intelligence for the client side retries for most calls. + * + * One can extend this class and intercept the RetryingCaller and add additional + * logic into the execution of a simple HTable operations like get, delete etc. + * + * Concrete implementations of this calls are supposed to the thread safe. The + * object is used across threads to identify the fast failing threads. + * + * For a concrete use case see {@link PreemptiveFastFailInterceptor} + * + * Example use case : + * try { + * interceptor.intercept + * doAction() + * } catch (Exception e) { + * interceptor.handleFailure + * } finally { + * interceptor.updateFaulireInfo + * } + * + * The {@link RetryingCallerInterceptor} also acts as a factory + * for getting a new {@link RetryingCallerInterceptorContext}. + * + */ +public abstract class RetryingCallerInterceptor { + + protected RetryingCallerInterceptor() { + // Empty constructor protected for NoOpRetryableCallerInterceptor + } + + /** + * This returns the context object for the current call. + * + * @return context : the context that needs to be used during this call. + */ + public abstract RetryingCallerInterceptorContext createEmptyContext(); + + /** + * Call this function in case we caught a failure during retries. + * + * @param context + * : The context object that we obtained previously. + * @param t + * : The exception that we caught in this particular try + * @throws IOException + */ + public abstract void handleFailure(RetryingCallerInterceptorContext context, + Throwable t) throws IOException; + + /** + * Call this function alongside the actual call done on the callable. + * + * @param abstractRetryingCallerInterceptorContext + * @throws PreemptiveFastFailException + */ + public abstract void intercept( + RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext) + throws IOException; + + /** + * Call this function to update at the end of the retry. This is not necessary + * to happen. + * + * @param context + */ + public abstract void updateFailureInfo( + RetryingCallerInterceptorContext context); + + @Override + public abstract String toString(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java new file mode 100644 index 0000000..fed0d3b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The context object used in the {@link RpcRetryingCaller} to enable + * {@link RetryingCallerInterceptor} to intercept calls. + * {@link RetryingCallerInterceptorContext} is the piece of information unique + * to a retrying call that transfers information from the call into the + * {@link RetryingCallerInterceptor} so that {@link RetryingCallerInterceptor} + * can take appropriate action according to the specific logic + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RetryingCallerInterceptorContext { + protected RetryingCallerInterceptorContext() { } + /** + * This function clears the internal state of the context object. + */ + public abstract void clear(); + + /** + * This prepares the context object by populating it with information specific + * to the implementation of the {@link RetryingCallerInterceptor} along with + * which this will be used. + * + * @param callable + * : The {@link RetryingCallable} that contains the information about + * the call that is being made. + * @return + */ + public abstract RetryingCallerInterceptorContext prepare( + RetryingCallable callable); + + /** + * Telescopic extension that takes which of the many retries we are currently + * in. + * + * @param callable + * : The {@link RetryingCallable} that contains the information about + * the call that is being made. + * @param tries + * : The retry number that we are currently in. + * @return + */ + public abstract RetryingCallerInterceptorContext prepare( + RetryingCallable callable, int tries); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java new file mode 100644 index 0000000..efae2d3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java @@ -0,0 +1,77 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; + +/** + * Factory implementation to provide the {@link HConnectionImplementation} with + * the implementation of the {@link RetryingCallerInterceptor} that we would use + * to intercept the {@link RpcRetryingCaller} during the course of their calls. + * + */ +public class RetryingCallerInterceptorFactory { + private static final Log LOG = LogFactory + .getLog(RetryingCallerInterceptorFactory.class); + private Configuration conf; + private final boolean failFast; + public static final RetryingCallerInterceptor NO_OP_INTERCEPTOR = + new NoOpRetryableCallerInterceptor(null); + + public RetryingCallerInterceptorFactory(Configuration conf) { + this.conf = conf; + failFast = conf.getBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, + HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT); + } + + /** + * This builds the implementation of {@link RetryingCallerInterceptor} that we + * specify in the conf and returns the same. + * + * To use {@link PreemptiveFastFailInterceptor}, set + * {@link HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE} to true. + * {@link HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL} is defaulted to + * {@link PreemptiveFastFailInterceptor} + * + * @return + */ + public RetryingCallerInterceptor build() { + if (failFast) { + try { + LOG.trace("Using PreemptiveFastFailInterceptor for intercepting the RpcRetryingCaller"); + Class c = conf.getClass( + HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + PreemptiveFastFailInterceptor.class); + Constructor constructor = c + .getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + return (RetryingCallerInterceptor) constructor.newInstance(conf); + } catch (Exception e) { + return new PreemptiveFastFailInterceptor(conf); + } + } + return NO_OP_INTERCEPTOR; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index a10f561..6afb29c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; @@ -61,10 +62,19 @@ public class RpcRetryingCaller { private final long pause; private final int retries; private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final RetryingCallerInterceptor interceptor; + private final RetryingCallerInterceptorContext context; public RpcRetryingCaller(long pause, int retries) { + this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public RpcRetryingCaller(long pause, int retries, + RetryingCallerInterceptor interceptor) { this.pause = pause; this.retries = retries; + this.interceptor = interceptor; + context = interceptor.createEmptyContext(); } private int getRemainingTime(int callTimeout) { @@ -83,7 +93,7 @@ public class RpcRetryingCaller { return remainingTime; } } - + public void cancel(){ cancelled.set(true); synchronized (cancelled){ @@ -104,11 +114,15 @@ public class RpcRetryingCaller { List exceptions = new ArrayList(); this.globalStartTime = EnvironmentEdgeManager.currentTime(); + context.clear(); for (int tries = 0;; tries++) { long expectedSleep; try { callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); return callable.call(getRemainingTime(callTimeout)); + } catch (PreemptiveFastFailException e) { + throw e; } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); if (LOG.isTraceEnabled()) { @@ -118,6 +132,7 @@ public class RpcRetryingCaller { } // translateException throws exception when should not retry: i.e. when request is bad. + interceptor.handleFailure(context, t); t = translateException(t); callable.throwable(t, retries != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = @@ -139,6 +154,8 @@ public class RpcRetryingCaller { ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } + } finally { + interceptor.updateFailureInfo(context); } try { if (expectedSleep > 0) { @@ -188,7 +205,7 @@ public class RpcRetryingCaller { } } } - + /** * Get the good or the remote exception if any, throws the DoNotRetryIOException. * @param t the throwable to analyze diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 9b070a5..7626229 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -26,32 +28,45 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; */ public class RpcRetryingCallerFactory { + private static final Log LOG = + LogFactory.getLog(RpcRetryingCallerFactory.class); /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; protected final Configuration conf; private final long pause; private final int retries; + private final RetryingCallerInterceptor interceptor; public RpcRetryingCallerFactory(Configuration conf) { + this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { this.conf = conf; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.interceptor = interceptor; } public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller(pause, retries); + return new RpcRetryingCaller(pause, retries, interceptor); } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { - return new RpcRetryingCallerFactory(configuration); + return new RpcRetryingCallerFactory(configuration, interceptor); } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java new file mode 100644 index 0000000..cb8e5df --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** +* Thrown when the client believes that we are trying to communicate to has +* been repeatedly unresponsive for a while. +* +* On receiving such an exception. The HConnectionManager will skip all +* retries and fast fail the operation. +*/ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ConnectionClosingException extends IOException { + public ConnectionClosingException(String string) { + super(string); + } + + private static final long serialVersionUID = -8980028569652624236L; +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java new file mode 100644 index 0000000..7fb53e8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java @@ -0,0 +1,70 @@ +package org.apache.hadoop.hbase.exceptions; + +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +import java.net.ConnectException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.ServerName; + +/** + * Thrown when the client believes that we are trying to communicate to has + * been repeatedly unresponsive for a while. + * + * On receiving such an exception. The HConnectionManager will skip all + * retries and fast fail the operation. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public class PreemptiveFastFailException extends ConnectException { + private static final long serialVersionUID = 7129103682617007177L; + private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec; + + /** + * @param count + * @param timeOfFirstFailureMilliSec + * @param timeOfLatestAttemptMilliSec + * @param serverName + */ + public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, + long timeOfLatestAttemptMilliSec, ServerName serverName) { + super("Exception happened " + count + " times. to" + serverName); + this.failureCount = count; + this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; + this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; + } + + public long getFirstFailureAt() { + return timeOfFirstFailureMilliSec; + } + + public long getLastAttemptAt() { + return timeOfLatestAttemptMilliSec; + } + + public long getFailureCount() { + return failureCount; + } + + public boolean wasOperationAttemptedByServer() { + return false; + } + } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1bfd9a6..00d3512 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -27,6 +27,7 @@ import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -76,6 +78,7 @@ import org.htrace.TraceScope; import javax.net.SocketFactory; import javax.security.sasl.SaslException; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; @@ -501,7 +504,7 @@ public class RpcClient { private void cleanup() { assert shouldCloseConnection.get(); - IOException ie = new IOException("Connection to " + server + " is closing."); + IOException ie = new ConnectionClosingException("Connection to " + server + " is closing."); while (true) { CallFuture cts = callsToWrite.poll(); if (cts == null) { @@ -716,7 +719,7 @@ public class RpcClient { */ private void checkIsOpen() throws IOException { if (shouldCloseConnection.get()) { - throw new IOException(getName() + " is closing"); + throw new ConnectionClosingException(getName() + " is closing"); } } @@ -906,7 +909,7 @@ public class RpcClient { } if (shouldCloseConnection.get()){ - throw new IOException("This connection is closing"); + throw new ConnectionClosingException("This connection is closing"); } if (failedServers.isFailedServer(remoteId.getAddress())) { @@ -1251,7 +1254,7 @@ public class RpcClient { itor.remove(); } else if (allCalls) { long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime(); - IOException ie = new IOException("Connection to " + getRemoteAddress() + IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress() + " is closing. Call id=" + c.id + ", waitTime=" + waitTime); c.setException(ie); itor.remove(); @@ -1507,8 +1510,8 @@ public class RpcClient { break; } if (connection.shouldCloseConnection.get()) { - throw new IOException("Call id=" + call.id + " on server " - + addr + " aborted: connection is closing"); + throw new ConnectionClosingException("Call id=" + call.id + + " on server " + addr + " aborted: connection is closing"); } try { synchronized (call) { @@ -1556,6 +1559,9 @@ public class RpcClient { } else if (exception instanceof SocketTimeoutException) { return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr + " failed because " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosingException){ + return (ConnectionClosingException) new ConnectionClosingException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); } else { return (IOException)new IOException("Call to " + addr + " failed on local exception: " + exception).initCause(exception); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ba152c0..1f9c378 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1041,6 +1041,33 @@ public final class HConstants { */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + /** + * HConstants for fast fail on the client side follow + */ + /** + * Config for enabling/disabling the fast fail mode. + */ + public static final String HBASE_CLIENT_ENABLE_FAST_FAIL_MODE = + "hbase.client.enable.fast.fail.mode"; + + public static final boolean HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT = + false; + + public static final String HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS = + "hbase.client.fastfail.threshold"; + + public static final long HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT = + 60000; + + public static final String HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS = + "hbase.client.fast.fail.cleanup.duration"; + + public static final long HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT = + 600000; + + public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = + "hbase.client.fast.fail.interceptor.impl"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index d56055a..13ea56f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -49,6 +49,12 @@ public class ExceptionUtil { InterruptedIOException iie = asInterrupt(t); if (iie != null) throw iie; } + + public static void rethrowIfInterruptedOrInFastFail(Throwable t) + throws InterruptedIOException { + InterruptedIOException iie = asInterrupt(t); + if (iie != null) throw iie; + } /** * @return an InterruptedIOException if t was an interruption, null otherwise diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java new file mode 100644 index 0000000..d312bf0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class, ClientTests.class}) +public class TestFastFail { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final Random random = new Random(); + private static int SLAVES = 3; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final int SLEEPTIME = 1000; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testClientRelearning() throws IOException, InterruptedException { + Admin admin = TEST_UTIL.getHBaseAdmin(); + + final String tableName = "testClientRelearningExperiment"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes + .toBytes(tableName))); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); + final long numRows = 1000; + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10); + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); + conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + MyPreemptiveFastFailInterceptor.class, + PreemptiveFastFailInterceptor.class); + + final Connection connection = ConnectionFactory.createConnection(conf); + + /** + * Write numRows worth of data, so that the workers can arbitrarily read. + */ + try (Table table = connection.getTable(TableName.valueOf(tableName));) { + writeData(table, numRows); + } + + /** + * The number of threads that are going to perform actions against the test + * table. + */ + int nThreads = 200; + ExecutorService service = Executors.newFixedThreadPool(nThreads); + final CountDownLatch continueOtherHalf = new CountDownLatch(1); + final CountDownLatch doneHalfway = new CountDownLatch(nThreads); + + final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); + final AtomicInteger numFailedThreads = new AtomicInteger(0); + + // The total time taken for the threads to perform the second put; + final AtomicLong totalTimeTaken = new AtomicLong(0); + final AtomicInteger numBlockedWorkers = new AtomicInteger(0); + final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); + + List> futures = new ArrayList>(); + for (int i = 0; i < nThreads; i++) { + futures.add(service.submit(new Callable() { + /** + * The workers are going to perform a couple of reads. The second read + * will follow the killing of a regionserver so that we make sure that + * some of threads go into PreemptiveFastFailExcception + */ + public Boolean call() throws Exception { + try (Table table = connection.getTable(TableName.valueOf(tableName))) { + Thread.sleep(Math.abs(random.nextInt()) % 100); // Add some jitter + // here + byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) + % numRows); + Get g = new Get(row); + g.addColumn(FAMILY, QUALIFIER); + try { + table.get(g); + } catch (Exception e) { + LOG.debug("Get failed : ", e); + doneHalfway.countDown(); + return false; + } + + // Done with one get, proceeding to do the next one. + doneHalfway.countDown(); + continueOtherHalf.await(); + + long startTime = System.currentTimeMillis(); + g = new Get(row); + g.addColumn(FAMILY, QUALIFIER); + try { + table.get(g); + // The get was successful + numSuccessfullThreads.addAndGet(1); + } catch (Exception e) { + if (e instanceof PreemptiveFastFailException) { + // We were issued a PreemptiveFastFailException + numPreemptiveFastFailExceptions.addAndGet(1); + } + // Irrespective of PFFE, the request failed. + numFailedThreads.addAndGet(1); + return false; + } finally { + long enTime = System.currentTimeMillis(); + totalTimeTaken.addAndGet(enTime - startTime); + if ((enTime - startTime) >= SLEEPTIME) { + // Considering the slow workers as the blockedWorkers. + // This assumes that the threads go full throttle at performing + // actions. In case the thread scheduling itself is as slow as + // SLEEPTIME, then this test might fail and so, we might have + // set it to a higher number on slower machines. + numBlockedWorkers.addAndGet(1); + } + } + return true; + } catch (Exception e) { + LOG.error("Caught unknown exception", e); + doneHalfway.countDown(); + return false; + } + } + })); + } + + doneHalfway.await(); + + ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus(); + + // Kill a regionserver + TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); + TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); + + // Let the threads continue going + continueOtherHalf.countDown(); + + Thread.sleep(2 * SLEEPTIME); + // Restore the cluster + TEST_UTIL.getHBaseCluster().restoreClusterStatus(status); + + int numThreadsReturnedFalse = 0; + int numThreadsReturnedTrue = 0; + int numThreadsThrewExceptions = 0; + for (Future f : futures) { + try { + numThreadsReturnedTrue += f.get() ? 1 : 0; + numThreadsReturnedFalse += f.get() ? 0 : 1; + } catch (Exception e) { + numThreadsThrewExceptions++; + } + } + LOG.debug("numThreadsReturnedFalse:" + + numThreadsReturnedFalse + + " numThreadsReturnedTrue:" + + numThreadsReturnedTrue + + " numThreadsThrewExceptions:" + + numThreadsThrewExceptions + + " numFailedThreads:" + + numFailedThreads.get() + + " numSuccessfullThreads:" + + numSuccessfullThreads.get() + + " numBlockedWorkers:" + + numBlockedWorkers.get() + + " totalTimeWaited: " + + totalTimeTaken.get() + / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers + .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); + + assertEquals("The expected number of all the successfull and the failed " + + "threads should equal the total number of threads that we spawned", + nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); + assertEquals( + "All the failures should be coming from the secondput failure", + numFailedThreads.get(), numThreadsReturnedFalse); + assertEquals("Number of threads that threw execution exceptions " + + "otherwise should be 0", numThreadsThrewExceptions, 0); + assertEquals("The regionservers that returned true should equal to the" + + " number of successful threads", numThreadsReturnedTrue, + numSuccessfullThreads.get()); + assertTrue( + "There should be atleast one thread that retried instead of failing", + MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); + assertTrue( + "There should be atleast one PreemptiveFastFail exception," + + " otherwise, the test makes little sense." + + "numPreemptiveFastFailExceptions: " + + numPreemptiveFastFailExceptions.get(), + numPreemptiveFastFailExceptions.get() > 0); + assertTrue( + "Only few thread should ideally be waiting for the dead " + + "regionserver to be coming back. numBlockedWorkers:" + + numBlockedWorkers.get() + " threads that retried : " + + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), + numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls + .get()); + } + + public static class MyPreemptiveFastFailInterceptor extends + PreemptiveFastFailInterceptor { + public static AtomicInteger numBraveSouls = new AtomicInteger(); + + @Override + protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { + boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); + if (ret) + numBraveSouls.addAndGet(1); + return ret; + } + + public MyPreemptiveFastFailInterceptor(Configuration conf) { + super(conf); + } + } + + private byte[] longToByteArrayKey(long rowKey) { + return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); + } + + public void writeData(Table table, long numRows) throws IOException, + InterruptedException { + table.flushCommits(); + for (long i = 0; i < numRows; i++) { + byte[] rowKey = longToByteArrayKey(i); + Put put = new Put(rowKey); + byte[] value = rowKey; // value is the same as the row key + put.add(FAMILY, QUALIFIER, value); + table.put(put); + } + LOG.info("Written all puts."); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java new file mode 100644 index 0000000..5aa0d11 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.EOFException; +import java.io.IOException; +import java.io.SyncFailedException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestFastFailWithoutTestUtil { + @Test + public void testInterceptorFactoryMethods() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( + conf); + + RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory + .build(); + assertTrue("We should be getting a PreemptiveFastFailInterceptor", + interceptorBeforeCast instanceof PreemptiveFastFailInterceptor); + PreemptiveFastFailInterceptor interceptor = + (PreemptiveFastFailInterceptor) interceptorBeforeCast; + + RetryingCallerInterceptorContext contextBeforeCast = interceptor + .createEmptyContext(); + assertTrue( + "We should be getting a FastFailInterceptorContext since we are interacting with the" + + " PreemptiveFastFailInterceptor", + contextBeforeCast instanceof FastFailInterceptorContext); + + FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast; + assertTrue(context != null); + + conf = HBaseConfiguration.create(); + interceptorFactory = new RetryingCallerInterceptorFactory(conf); + + interceptorBeforeCast = interceptorFactory.build(); + assertTrue( + "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE", + interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor); + + contextBeforeCast = interceptorBeforeCast.createEmptyContext(); + assertTrue( + "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor", + contextBeforeCast instanceof NoOpRetryingInterceptorContext); + + assertTrue(context != null); + } + + @Test + public void testInterceptorContextClear() { + PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor(); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + context.clear(); + assertFalse(context.getCouldNotCommunicateWithServer().booleanValue()); + assertEquals(context.didTry(), false); + assertEquals(context.getFailureInfo(), null); + assertEquals(context.getServer(), null); + assertEquals(context.getTries(), 0); + } + + @Test + public void testInterceptorContextPrepare() throws IOException { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + RetryingCallable callable = new RegionServerCallable(null, + null, null) { + @Override + public Boolean call(int callTimeout) throws Exception { + return true; + } + + @Override + protected HRegionLocation getLocation() { + return new HRegionLocation(null, ServerName.valueOf("localhost", 1234, + 987654321)); + } + }; + context.prepare(callable); + ServerName server = getSomeServerName(); + assertEquals(context.getServer(), server); + context.clear(); + context.prepare(callable, 2); + assertEquals(context.getServer(), server); + } + + @Test + public void testInterceptorIntercept100Times() throws IOException, InterruptedException { + for (int i = 0; i< 100; i++) { + testInterceptorIntercept(); + } + } + + public void testInterceptorIntercept() throws IOException, + InterruptedException { + Configuration conf = HBaseConfiguration.create(); + long CLEANUP_TIMEOUT = 50; + long FAST_FAIL_THRESHOLD = 10; + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + CLEANUP_TIMEOUT); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + FAST_FAIL_THRESHOLD); + + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getRetryingCallable(getSomeServerName()); + + // Lets simulate some work flow here. + int tries = 0; + context.prepare(callable, tries); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("Interceptor should have updated didTry to true", + context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNull( + "Once a failure is identified, the first time the FailureInfo is generated for the server," + + " but it is not assigned to the context yet. It would be assigned on the next" + + " intercept.", + context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertFalse( + "We are still in the first attempt and so we dont set this variable to true yet.", + context.isRetryDespiteFastFailMode()); + + Thread.sleep(FAST_FAIL_THRESHOLD + 1); // We sleep so as to make sure that we + // actually consider this server as a + // dead server in the next attempt. + tries++; + + context.prepare(callable, tries); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("didTru should remain true", context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNotNull( + "The context this time is updated with a failureInfo, since we already gave it a try.", + context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertTrue( + "Since we are alone here we would be given the permission to retryDespiteFailures.", + context.isRetryDespiteFastFailMode()); + context.clear(); + + Thread.sleep(CLEANUP_TIMEOUT); // Lets try and cleanup the data in the fast + // fail failure maps. + + tries++; + + context.clear(); + context.prepare(callable, tries); + interceptor.occasionallyCleanupFailureInformation(); + assertNull("The cleanup should have cleared the server", + interceptor.repeatedFailuresMap.get(context.getServer())); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("didTru should remain true", context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNull("The failureInfo is cleared off from the maps.", + context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertFalse( + "Since we are alone here we would be given the permission to retryDespiteFailures.", + context.isRetryDespiteFastFailMode()); + context.clear(); + + } + + @Test + public void testExceptionsIdentifiedByInterceptor() throws IOException { + Exception[] networkexceptions = new Exception[] {new ConnectException("Mary is unwell"), + new SocketTimeoutException("Mike is too late"), new ClosedChannelException(), + new SyncFailedException("Dave is not on the same page"), + new TimeoutException("Mike is late again"), + new EOFException("This is the end... "), new ConnectionClosingException("Its closing")}; + + Exception[] nonNetworkExceptions = new Exception[] {new IOException("Bob died"), + new RemoteException("Bob's cousin died", null) + }; + + Configuration conf = HBaseConfiguration.create(); + long CLEANUP_TIMEOUT = 0; + long FAST_FAIL_THRESHOLD = 1000000; + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + CLEANUP_TIMEOUT); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + FAST_FAIL_THRESHOLD); + for (Exception e : networkexceptions) { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getRetryingCallable(getSomeServerName()); + context.prepare(callable, 0); + interceptor.intercept(context); + interceptor.handleFailure(context, e); + interceptor.updateFailureInfo(context); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + } + for (Exception e : nonNetworkExceptions) { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getRetryingCallable(getSomeServerName()); + context.prepare(callable, 0); + interceptor.intercept(context); + interceptor.handleFailure(context, e); + interceptor.updateFailureInfo(context); + assertFalse( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + } + } + + protected static ServerName getSomeServerName() { + return ServerName.valueOf("localhost", 1234, 987654321); + } + + protected static RegionServerCallable getRetryingCallable( + final ServerName serverName) { + return new RegionServerCallable(null, null, null) { + @Override + public T call(int callTimeout) throws Exception { + return null; + } + + @Override + protected HRegionLocation getLocation() { + return new HRegionLocation(null, serverName); + } + }; + } + + protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor( + Configuration conf) { + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( + conf); + RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory + .build(); + return (PreemptiveFastFailInterceptor) interceptorBeforeCast; + } + + static PreemptiveFastFailInterceptor createPreemptiveInterceptor() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + return createPreemptiveInterceptor(conf); + } +} -- 1.9.4