From cd1a33165033d013611ba700b7699727b1fe0198 Mon Sep 17 00:00:00 2001 From: manukranthk Date: Tue, 23 Sep 2014 19:15:09 -0700 Subject: [PATCH] Implement Preemptive Fast Fail --- .../AbstractRetryingCallerInterceptorContext.java | 29 ++ .../hadoop/hbase/client/ClusterConnection.java | 3 + .../hadoop/hbase/client/ConnectionAdapter.java | 5 + .../hadoop/hbase/client/ConnectionManager.java | 10 +- .../apache/hadoop/hbase/client/FailureInfo.java | 60 ++++ .../org/apache/hadoop/hbase/client/HTable.java | 3 +- .../client/NoOpRetryableCallerInterceptor.java | 62 ++++ .../client/NoOpRetryingInterceptorContext.java | 41 +++ .../client/PreemptiveFastFailInterceptor.java | 389 +++++++++++++++++++++ .../client/RetryableCallerInterceptorFactory.java | 58 +++ .../hbase/client/RetryingCallerInterceptor.java | 51 +++ .../client/RetryingCallerInterceptorContext.java | 115 ++++++ .../hadoop/hbase/client/RpcRetryingCaller.java | 21 +- .../hbase/client/RpcRetryingCallerFactory.java | 19 +- .../exceptions/ConnectionClosingException.java | 59 ++++ .../exceptions/PreemptiveFastFailException.java | 68 ++++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 18 +- .../java/org/apache/hadoop/hbase/HConstants.java | 27 ++ .../apache/hadoop/hbase/util/ExceptionUtil.java | 6 + .../apache/hadoop/hbase/client/TestFastFail.java | 295 ++++++++++++++++ 20 files changed, 1327 insertions(+), 12 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallerInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryableCallerInterceptorFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallerInterceptorContext.java new file mode 100644 index 0000000..0566719 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRetryingCallerInterceptorContext.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +public abstract class AbstractRetryingCallerInterceptorContext { + + public abstract void clear(); + + public abstract AbstractRetryingCallerInterceptorContext prepare( + RetryingCallable callable); + + public abstract AbstractRetryingCallerInterceptorContext prepare( + RetryingCallable callable, int tries); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index ad67f63..78b97e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -270,4 +270,7 @@ public interface ClusterConnection extends HConnection { * @return Default AsyncProcess associated with this connection. */ AsyncProcess getAsyncProcess(); + + public RetryingCallerInterceptor getRetryingCallerInterceptor(); } + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 1e569b8..1f39610 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -440,4 +440,9 @@ class ConnectionAdapter implements ClusterConnection { public AsyncProcess getAsyncProcess() { return wrappedConnection.getAsyncProcess(); } + + @Override + public RetryingCallerInterceptor getRetryingCallerInterceptor() { + return RetryableCallerInterceptorFactory.NO_OP_INTERCEPTOR; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 80e22b9..e485112 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -601,6 +601,8 @@ class ConnectionManager { private RpcControllerFactory rpcControllerFactory; + private RetryingCallerInterceptor interceptor; + /** * Cluster registry of basic info such as clusterid and meta region location. */ @@ -631,7 +633,8 @@ class ConnectionManager { retrieveClusterId(); this.rpcClient = new RpcClient(this.conf, this.clusterId); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + this.interceptor = (new RetryableCallerInterceptorFactory(conf)).build(); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? @@ -2532,6 +2535,11 @@ class ConnectionManager { master.close(); } } + + @Override + public RetryingCallerInterceptor getRetryingCallerInterceptor() { + return this.interceptor; + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java new file mode 100644 index 0000000..3895cb9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Keeps track of repeated failures to any region server. + * + */ +public class FailureInfo { + // The number of consecutive failures. + public final AtomicLong numConsecutiveFailures = new AtomicLong(); + // The time when the server started to become unresponsive + // Once set, this would never be updated. + public long timeOfFirstFailureMilliSec; + // The time when the client last tried to contact the server. + // This is only updated by one client at a time + public volatile long timeOfLatestAttemptMilliSec; + // The time when the client last cleared cache for regions assigned + // to the server. Used to ensure we don't clearCache too often. + public volatile long timeOfLatestCacheClearMilliSec; + // Used to keep track of concurrent attempts to contact the server. + // In Fast fail mode, we want just one client thread to try to connect + // the rest of the client threads will fail fast. + public final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean( + false); + + @Override + public String toString() { + return "FailureInfo: numConsecutiveFailures = " + + numConsecutiveFailures + " timeOfFirstFailureMilliSec = " + + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = " + + timeOfLatestAttemptMilliSec + + " timeOfLatestCacheClearMilliSec = " + + timeOfLatestCacheClearMilliSec + + " exclusivelyRetringInspiteOfFastFail = " + + exclusivelyRetringInspiteOfFastFail.get(); + } + + FailureInfo(long firstFailureTime) { + this.timeOfFirstFailureMilliSec = firstFailureTime; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e19272b..c9f103d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -342,7 +342,8 @@ public class HTable implements HTableInterface, RegionLocator { this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, + connection.getRetryingCallerInterceptor()); this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); // puts need to track errors globally due to how the APIs currently work. ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java new file mode 100644 index 0000000..9083b9e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; + +public class NoOpRetryableCallerInterceptor extends RetryingCallerInterceptor { + private static final Log LOG =LogFactory.getLog(NoOpRetryableCallerInterceptor.class); + public NoOpRetryableCallerInterceptor() {} + public NoOpRetryableCallerInterceptor(Configuration conf) { + super(); + } + + @Override + public void intercept( + AbstractRetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext) + throws PreemptiveFastFailException { + } + + @Override + public void handleFailure(AbstractRetryingCallerInterceptorContext context, + Throwable t) throws IOException { + } + + @Override + public void updateFailureInfo(AbstractRetryingCallerInterceptorContext context) { + } + + private static final AbstractRetryingCallerInterceptorContext NO_OP_CONTEXT = + new NoOpRetryingInterceptorContext(); + + @Override + public AbstractRetryingCallerInterceptorContext createEmptyContext() { + // TODO Auto-generated method stub + return NO_OP_CONTEXT; + } + + @Override + public String toString() { + return "NoOpRetryableCallerInterceptor"; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java new file mode 100644 index 0000000..0063e29 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +public class NoOpRetryingInterceptorContext extends AbstractRetryingCallerInterceptorContext { + + @Override + public void clear() { + // Do Nothing + } + + @Override + public AbstractRetryingCallerInterceptorContext prepare( + RetryingCallable callable) { + // Do Nothing + return this; + } + + @Override + public AbstractRetryingCallerInterceptorContext prepare( + RetryingCallable callable, int tries) { + // Do Nothing + return this; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java new file mode 100644 index 0000000..434cd63 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.EOFException; +import java.io.IOException; +import java.io.SyncFailedException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.common.net.HostAndPort; + +public class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { + + public static final Log LOG = LogFactory.getLog(PreemptiveFastFailInterceptor.class); + // amount of time to wait before we consider a server to be in fast fail + // mode + protected final long fastFailThresholdMilliSec; + // Keeps track of failures when we cannot talk to a server. Helps in + // fast failing clients if the server is down for a long time. + protected final ConcurrentMap repeatedFailuresMap = + new ConcurrentHashMap(); + // We populate repeatedFailuresMap every time there is a failure. So, to + // keep it + // from growing unbounded, we garbage collect the failure information + // every cleanupInterval. + protected final long failureMapCleanupIntervalMilliSec; + protected volatile long lastFailureMapCleanupTimeMilliSec; + // clear failure Info. Used to clean out all entries. + // A safety valve, in case the client does not exit the + // fast fail mode for any reason. + private long fastFailClearingTimeMilliSec; + private final ThreadLocal threadRetryingInFastFailMode = + new ThreadLocal(); + public PreemptiveFastFailInterceptor(Configuration conf) { + super(conf); + this.fastFailThresholdMilliSec = conf.getLong( + HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); + this.failureMapCleanupIntervalMilliSec = conf.getLong( + HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); + lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); + } + + public void intercept(RetryingCallerInterceptorContext context) throws PreemptiveFastFailException { + context.setfInfo(repeatedFailuresMap.get(context.getServer())); + if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context.getfInfo())); + if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry + LOG.debug("Throwing PFFE : " + context.getfInfo() + + " tries : " + context.getTries()); + throw new PreemptiveFastFailException( + context.getfInfo().numConsecutiveFailures.get(), + context.getfInfo().timeOfFirstFailureMilliSec, + context.getfInfo().timeOfLatestAttemptMilliSec, context.getServer().getHostText()); + } + context.setDidTry(true); + } + } + + public void handleFailure(RetryingCallerInterceptorContext context, + Throwable t) throws IOException { + handleFailureToServer(context.getServer(), t); + handleThrowable(t, context.getServer(), + context.getCouldNotCommunicateWithServer()); + } + + public void updateFailureInfo(RetryingCallerInterceptorContext context) { + updateFailureInfoForServer(context.getServer(), context.getfInfo(), + context.isDidTry(), context.getCouldNotCommunicateWithServer().booleanValue(), + context.isRetryDespiteFastFailMode()); + } + + public void handleThrowable(Throwable t1, HostAndPort server, + MutableBoolean couldNotCommunicateWithServer) throws IOException { + Throwable t2 = translateException(t1); + boolean isLocalException = !(t2 instanceof RemoteException); + // translateException throws DoNotRetryException or any + // non-IOException. + LOG.debug("Handling this as a candidate to PFFE : isLocalException: " + isLocalException + " isNetworkException(t2): " + isConnectionException(t2)); + if (isLocalException && isConnectionException(t2)) { + LOG.debug("Handling this as a candidate to PFFE"); + couldNotCommunicateWithServer.setValue(true); + handleFailureToServer(server, t2); + } + } + + private Throwable translateException(Throwable t) throws IOException { + if (t instanceof NoSuchMethodError) { + // We probably can't recover from this exception by retrying. + LOG.error(t); + throw (NoSuchMethodError)t; + } + + if (t instanceof NullPointerException) { + // The same here. This is probably a bug. + LOG.error(t.getMessage(), t); + throw (NullPointerException)t; + } + + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException)t; + } + if (t instanceof Error) { + throw (Error)t; + } + return t; + } + + /** + * Check if the exception is something that indicates that we cannot + * contact/communicate with the server. + * + * @param e + * @return + */ + private boolean isConnectionException(Throwable e) { + if (e == null) return false; + // This list covers most connectivity exceptions but not all. + // For example, in SocketOutputStream a plain IOException is thrown + // at times when the channel is closed. + return (e instanceof SocketTimeoutException || + e instanceof ConnectException || + e instanceof ClosedChannelException || + e instanceof SyncFailedException || + e instanceof EOFException || + e instanceof TimeoutException || + e instanceof ConnectionClosingException || + e instanceof FailedServerException); + } + + /** + * Occasionally cleans up unused information in repeatedFailuresMap. + * + * repeatedFailuresMap stores the failure information for all remote hosts + * that had failures. In order to avoid these from growing indefinitely, + * occassionallyCleanupFailureInformation() will clear these up once every + * cleanupInterval ms. + */ + private void occasionallyCleanupFailureInformation() { + long now = System.currentTimeMillis(); + if (!(now > lastFailureMapCleanupTimeMilliSec + + failureMapCleanupIntervalMilliSec)) + return; + + // remove entries that haven't been attempted in a while + // No synchronization needed. It is okay if multiple threads try to + // remove the entry again and again from a concurrent hash map. + StringBuilder sb = new StringBuilder(); + for (Entry entry : repeatedFailuresMap + .entrySet()) { + if (now > entry.getValue().timeOfLatestAttemptMilliSec + + failureMapCleanupIntervalMilliSec) { // no recent failures + repeatedFailuresMap.remove(entry.getKey()); + } else if (now > entry.getValue().timeOfFirstFailureMilliSec + + this.fastFailClearingTimeMilliSec) { // been failing for a long + // time + LOG.error(entry.getKey() + + " been failing for a long time. clearing out." + + entry.getValue().toString()); + repeatedFailuresMap.remove(entry.getKey()); + } else { + sb.append(entry.getKey().toString()).append(" failing ").append(entry.getValue().toString()) + .append("\n"); + } + } + if (sb.length() > 0 + // If there are multiple threads cleaning up, try to see that only one + // will log the msg. + && now > this.lastFailureMapCleanupTimeMilliSec + + this.failureMapCleanupIntervalMilliSec) { + LOG.warn("Preemptive failure enabled for : " + sb.toString()); + } + lastFailureMapCleanupTimeMilliSec = now; + } + + /** + * Handles failures encountered when communicating with a server. + * + * Updates the FailureInfo in repeatedFailuresMap to reflect the failure. + * Throws RepeatedConnectException if the client is in Fast fail mode. + * + * @param server + * @param t + * - the throwable to be handled. + * @throws PreemptiveFastFailException + */ + private void handleFailureToServer(HostAndPort server, Throwable t) { + if (server == null || t == null) { + return; + } + long currentTime = EnvironmentEdgeManager.currentTime(); + FailureInfo fInfo = repeatedFailuresMap.get(server); + if (fInfo == null) { + fInfo = new FailureInfo(currentTime); + FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo); + + if (oldfInfo != null) { + fInfo = oldfInfo; + } + } + fInfo.timeOfLatestAttemptMilliSec = currentTime; + fInfo.numConsecutiveFailures.incrementAndGet(); + } + + /** + * Checks to see if we are in the Fast fail mode for requests to the server. + * + * If a client is unable to contact a server for more than + * fastFailThresholdMilliSec the client will get into fast fail mode. + * + * @param server + * @return true if the client is in fast fail mode for the server. + */ + private boolean inFastFailMode(HostAndPort server) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + // if fInfo is null --> The server is considered good. + // If the server is bad, wait long enough to believe that the server is + // down. + return (fInfo != null && System.currentTimeMillis() > + (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); + } + + /** + * Checks to see if the current thread is already in FastFail mode for + * *some* server. + * + * @return true, if the thread is already in FF mode. + */ + private boolean currentThreadInFastFailMode() { + return (this.threadRetryingInFastFailMode.get() != null && + (this.threadRetryingInFastFailMode.get().booleanValue() == true)); + } + + /** + * Check to see if the client should try to connnect to the server, inspite + * of knowing that it is in the fast fail mode. + * + * The idea here is that we want just one client thread to be actively + * trying to reconnect, while all the other threads trying to reach the + * server will short circuit. + * + * @param fInfo + * @return true if the client should try to connect to the server. + */ + protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { + // We believe that the server is down, But, we want to have just one + // client + // actively trying to connect. If we are the chosen one, we will retry + // and not throw an exception. + if (fInfo != null + && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, + true)) { + MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode + .get(); + if (threadAlreadyInFF == null) { + threadAlreadyInFF = new MutableBoolean(); + this.threadRetryingInFastFailMode.set(threadAlreadyInFF); + } + threadAlreadyInFF.setValue(true); + return true; + } else { + return false; + } + } + + + /** + * updates the failure information for the server. + * + * @param server + * @param fInfo + * @param couldNotCommunicate + * @param retryDespiteFastFailMode + */ + private void updateFailureInfoForServer(HostAndPort server, + FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate, + boolean retryDespiteFastFailMode) { + if (server == null || fInfo == null || didTry == false) + return; + + // If we were able to connect to the server, reset the failure + // information. + if (couldNotCommunicate == false) { + LOG.info("Clearing out PFFE for server " + server.getHostText()); + repeatedFailuresMap.remove(server); + } else { + // update time of last attempt + long currentTime = System.currentTimeMillis(); + fInfo.timeOfLatestAttemptMilliSec = currentTime; + + // Release the lock if we were retrying inspite of FastFail + if (retryDespiteFastFailMode) { + fInfo.exclusivelyRetringInspiteOfFastFail.set(false); + threadRetryingInFastFailMode.get().setValue(false); + } + } + + occasionallyCleanupFailureInformation(); + } + + public void updateFailureInfoForServer(HostAndPort server, + boolean didTry, boolean couldNotCommunicate) { + FailureInfo fInfo = repeatedFailuresMap.get(server); + boolean retryDespiteFastFailMode = false; + if (inFastFailMode(server) && !currentThreadInFastFailMode()) { + // In Fast-fail mode, all but one thread will fast fail. Check + // if we are that one chosen thread. + retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo); + } + + updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode); + } + + @Override + public void intercept(AbstractRetryingCallerInterceptorContext context) + throws PreemptiveFastFailException { + if (context instanceof RetryingCallerInterceptorContext) { + intercept((RetryingCallerInterceptorContext)context); + } + } + + @Override + public void handleFailure(AbstractRetryingCallerInterceptorContext context, + Throwable t) throws IOException { + if (context instanceof RetryingCallerInterceptorContext) { + handleFailure((RetryingCallerInterceptorContext)context, t); + } + } + + @Override + public void updateFailureInfo(AbstractRetryingCallerInterceptorContext context) { + if (context instanceof RetryingCallerInterceptorContext) { + updateFailureInfo((RetryingCallerInterceptorContext)context); + } + } + + @Override + public AbstractRetryingCallerInterceptorContext createEmptyContext() { + return new RetryingCallerInterceptorContext(); + } + + @Override + public String toString() { + return "PreemptiveFastFailInterceptor"; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryableCallerInterceptorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryableCallerInterceptorFactory.java new file mode 100644 index 0000000..6ffe052 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryableCallerInterceptorFactory.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; + +public class RetryableCallerInterceptorFactory { + @SuppressWarnings("unused") + private static final Log LOG = + LogFactory.getLog(RetryableCallerInterceptorFactory.class); + private Configuration conf; + private final boolean failFast; + public static final RetryingCallerInterceptor NO_OP_INTERCEPTOR = + new NoOpRetryableCallerInterceptor(null); + public RetryableCallerInterceptorFactory(Configuration conf) { + this.conf = conf; + failFast = conf.getBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, + HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT); + } + + public RetryingCallerInterceptor build() { + if (failFast) { + try { + Class c = conf.getClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + PreemptiveFastFailInterceptor.class); + // Default HCM#HCI is not accessible; make it so before invoking. + Constructor constructor = + c.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + return (RetryingCallerInterceptor) constructor.newInstance(conf); + } catch (Exception e) { + return new PreemptiveFastFailInterceptor(conf); + } + } + return NO_OP_INTERCEPTOR; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java new file mode 100644 index 0000000..30fcbe9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; + +public abstract class RetryingCallerInterceptor { + private Configuration conf; + + protected RetryingCallerInterceptor() { + // Empty constructor for NoOpRetryableCallerInterceptor + } + + public RetryingCallerInterceptor(Configuration conf) { + this.conf = conf; + } + + public abstract AbstractRetryingCallerInterceptorContext createEmptyContext(); + + public abstract void handleFailure( + AbstractRetryingCallerInterceptorContext context, Throwable t) + throws IOException; + + public abstract void intercept( + AbstractRetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext) + throws PreemptiveFastFailException; + + @Override + public abstract String toString(); + + public abstract void updateFailureInfo( + AbstractRetryingCallerInterceptorContext context); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java new file mode 100644 index 0000000..0b95b9a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang.mutable.MutableBoolean; + +import com.google.common.net.HostAndPort; + +public class RetryingCallerInterceptorContext extends AbstractRetryingCallerInterceptorContext{ + private MutableBoolean couldNotCommunicateWithServer = + new MutableBoolean(false); + + private boolean didTry = false; + + private FailureInfo fInfo = null; + + private boolean retryDespiteFastFailMode = false; + + private HostAndPort server; + + private long startTimeMs; + + private int tries; + + public void clear() { + startTimeMs = -1; + server = null; + fInfo = null; + didTry = false; + couldNotCommunicateWithServer = new MutableBoolean(false); + retryDespiteFastFailMode = false; + } + + public MutableBoolean getCouldNotCommunicateWithServer() { + return couldNotCommunicateWithServer; + } + + public FailureInfo getfInfo() { + return fInfo; + } + + public HostAndPort getServer() { + return server; + } + + public long getStartTimeMs() { + return startTimeMs; + } + + public int getTries() { + return tries; + } + public boolean isDidTry() { + return didTry; + } + public boolean isRetryDespiteFastFailMode() { + return retryDespiteFastFailMode; + } + public RetryingCallerInterceptorContext prepare(RetryingCallable callable) { + return prepare(callable, 0); + } + public RetryingCallerInterceptorContext prepare(RetryingCallable callable, + int tries) { + if (callable instanceof RegionServerCallable) { + RegionServerCallable retryingCallable = + (RegionServerCallable)callable; + server = + HostAndPort.fromString(retryingCallable.getLocation().getHostnamePort()); + } + this.tries = tries; + return this; + } + public void setCouldNotCommunicateWithServer( + MutableBoolean couldNotCommunicateWithServer) { + this.couldNotCommunicateWithServer = couldNotCommunicateWithServer; + } + public void setDidTry(boolean didTry) { + this.didTry = didTry; + } + + public void setfInfo(FailureInfo fInfo) { + this.fInfo = fInfo; + } + + public void setRetryDespiteFastFailMode(boolean retryDespiteFastFailMode) { + this.retryDespiteFastFailMode = retryDespiteFastFailMode; + } + + public void setServer(HostAndPort server) { + this.server = server; + } + + public void setStartTimeMs(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + + public void setTries(int tries) { + this.tries = tries; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index a10f561..c639592 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; @@ -61,10 +62,19 @@ public class RpcRetryingCaller { private final long pause; private final int retries; private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final RetryingCallerInterceptor interceptor; + private final AbstractRetryingCallerInterceptorContext context; public RpcRetryingCaller(long pause, int retries) { + this(pause, retries, RetryableCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public RpcRetryingCaller(long pause, int retries, + RetryingCallerInterceptor interceptor) { this.pause = pause; this.retries = retries; + this.interceptor = interceptor; + context = interceptor.createEmptyContext(); } private int getRemainingTime(int callTimeout) { @@ -83,7 +93,7 @@ public class RpcRetryingCaller { return remainingTime; } } - + public void cancel(){ cancelled.set(true); synchronized (cancelled){ @@ -104,11 +114,15 @@ public class RpcRetryingCaller { List exceptions = new ArrayList(); this.globalStartTime = EnvironmentEdgeManager.currentTime(); + context.clear(); for (int tries = 0;; tries++) { long expectedSleep; try { callable.prepare(tries != 0); // if called with false, check table status on ZK + interceptor.intercept(context.prepare(callable, tries)); return callable.call(getRemainingTime(callTimeout)); + } catch (PreemptiveFastFailException e) { + throw e; } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); if (LOG.isTraceEnabled()) { @@ -118,6 +132,7 @@ public class RpcRetryingCaller { } // translateException throws exception when should not retry: i.e. when request is bad. + interceptor.handleFailure(context, t); t = translateException(t); callable.throwable(t, retries != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = @@ -139,6 +154,8 @@ public class RpcRetryingCaller { ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } + } finally { + interceptor.updateFailureInfo(context); } try { if (expectedSleep > 0) { @@ -188,7 +205,7 @@ public class RpcRetryingCaller { } } } - + /** * Get the good or the remote exception if any, throws the DoNotRetryIOException. * @param t the throwable to analyze diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 9b070a5..924b7fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -26,32 +28,45 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; */ public class RpcRetryingCallerFactory { + private static final Log LOG = + LogFactory.getLog(RpcRetryingCallerFactory.class); /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; protected final Configuration conf; private final long pause; private final int retries; + private final RetryingCallerInterceptor interceptor; public RpcRetryingCallerFactory(Configuration conf) { + this(conf, RetryableCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { this.conf = conf; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.interceptor = interceptor; } public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller(pause, retries); + return new RpcRetryingCaller(pause, retries, interceptor); } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + return instantiate(configuration, RetryableCallerInterceptorFactory.NO_OP_INTERCEPTOR); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { - return new RpcRetryingCallerFactory(configuration); + return new RpcRetryingCallerFactory(configuration, interceptor); } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java new file mode 100644 index 0000000..cb8e5df --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** +* Thrown when the client believes that we are trying to communicate to has +* been repeatedly unresponsive for a while. +* +* On receiving such an exception. The HConnectionManager will skip all +* retries and fast fail the operation. +*/ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ConnectionClosingException extends IOException { + public ConnectionClosingException(String string) { + super(string); + } + + private static final long serialVersionUID = -8980028569652624236L; +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java new file mode 100644 index 0000000..0e857ed --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java @@ -0,0 +1,68 @@ +package org.apache.hadoop.hbase.exceptions; + +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +import java.net.ConnectException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Thrown when the client believes that we are trying to communicate to has + * been repeatedly unresponsive for a while. + * + * On receiving such an exception. The HConnectionManager will skip all + * retries and fast fail the operation. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public class PreemptiveFastFailException extends ConnectException { + private static final long serialVersionUID = 7129103682617007177L; + private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec; + + /** + * @param count + * @param timeOfFirstFailureMilliSec + * @param timeOfLatestAttemptMilliSec + * @param serverName + */ + public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, + long timeOfLatestAttemptMilliSec, String serverName) { + super("Exception happened " + count + " times. to" + serverName); + this.failureCount = count; + this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; + this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; + } + + public long getFirstFailureAt() { + return timeOfFirstFailureMilliSec; + } + + public long getLastAttemptAt() { + return timeOfLatestAttemptMilliSec; + } + + public long getFailureCount() { + return failureCount; + } + + public boolean wasOperationAttemptedByServer() { + return false; + } + } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1bfd9a6..00d3512 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -27,6 +27,7 @@ import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -76,6 +78,7 @@ import org.htrace.TraceScope; import javax.net.SocketFactory; import javax.security.sasl.SaslException; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; @@ -501,7 +504,7 @@ public class RpcClient { private void cleanup() { assert shouldCloseConnection.get(); - IOException ie = new IOException("Connection to " + server + " is closing."); + IOException ie = new ConnectionClosingException("Connection to " + server + " is closing."); while (true) { CallFuture cts = callsToWrite.poll(); if (cts == null) { @@ -716,7 +719,7 @@ public class RpcClient { */ private void checkIsOpen() throws IOException { if (shouldCloseConnection.get()) { - throw new IOException(getName() + " is closing"); + throw new ConnectionClosingException(getName() + " is closing"); } } @@ -906,7 +909,7 @@ public class RpcClient { } if (shouldCloseConnection.get()){ - throw new IOException("This connection is closing"); + throw new ConnectionClosingException("This connection is closing"); } if (failedServers.isFailedServer(remoteId.getAddress())) { @@ -1251,7 +1254,7 @@ public class RpcClient { itor.remove(); } else if (allCalls) { long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime(); - IOException ie = new IOException("Connection to " + getRemoteAddress() + IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress() + " is closing. Call id=" + c.id + ", waitTime=" + waitTime); c.setException(ie); itor.remove(); @@ -1507,8 +1510,8 @@ public class RpcClient { break; } if (connection.shouldCloseConnection.get()) { - throw new IOException("Call id=" + call.id + " on server " - + addr + " aborted: connection is closing"); + throw new ConnectionClosingException("Call id=" + call.id + + " on server " + addr + " aborted: connection is closing"); } try { synchronized (call) { @@ -1556,6 +1559,9 @@ public class RpcClient { } else if (exception instanceof SocketTimeoutException) { return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr + " failed because " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosingException){ + return (ConnectionClosingException) new ConnectionClosingException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); } else { return (IOException)new IOException("Call to " + addr + " failed on local exception: " + exception).initCause(exception); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 6945eb0..a3798d4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1042,6 +1042,33 @@ public final class HConstants { */ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size"; + /** + * HConstants for fast fail on the client side follow + */ + /** + * Config for enabling/disabling the fast fail mode. + */ + public static final String HBASE_CLIENT_ENABLE_FAST_FAIL_MODE = + "hbase.client.enable.fast.fail.mode"; + + public static final boolean HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT = + false; + + public static final String HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS = + "hbase.client.fastfail.threshold"; + + public static final long HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT = + 60000; + + public static final String HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS = + "hbase.client.fast.fail.cleanup.duration"; + + public static final long HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT = + 600000; + + public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = + "hbase.client.fast.fail.interceptor.impl"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index d56055a..13ea56f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -49,6 +49,12 @@ public class ExceptionUtil { InterruptedIOException iie = asInterrupt(t); if (iie != null) throw iie; } + + public static void rethrowIfInterruptedOrInFastFail(Throwable t) + throws InterruptedIOException { + InterruptedIOException iie = asInterrupt(t); + if (iie != null) throw iie; + } /** * @return an InterruptedIOException if t was an interruption, null otherwise diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java new file mode 100644 index 0000000..1d8223a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFastFail { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL + = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final Random random = new Random(); + private static int SLAVES = 3; + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final int SLEEPTIME = 1000; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testClientRelearning() throws IOException, InterruptedException { + Admin admin = TEST_UTIL.getHBaseAdmin(); + + final String tableName = "testClientRelearningExperiment"; + HTableDescriptor desc = new HTableDescriptor( + TableName.valueOf(Bytes.toBytes(tableName))); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); + final long numRows = 1000; + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME/10); + conf.setBoolean(HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); + conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + MyPreemptiveFastFailInterceptor.class, + PreemptiveFastFailInterceptor.class); + + final Connection connection = ConnectionFactory.createConnection(conf); + + /** + * Write numRows worth of data, so that the workers can arbitrarily read. + */ + try (Table table = connection.getTable(TableName.valueOf(tableName));) { + writeData(table, numRows); + } + + /** + * The number of threads that are going to perform actions against the + * test table. + */ + int nThreads = 200; + ExecutorService service = + Executors.newFixedThreadPool(nThreads); + final CountDownLatch continueOtherHalf = new CountDownLatch(1); + final CountDownLatch doneHalfway = new CountDownLatch(nThreads); + + final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); + final AtomicInteger numFailedThreads = new AtomicInteger(0); + + // The total time taken for the threads to perform the second put; + final AtomicLong totalTimeTaken = new AtomicLong(0); + final AtomicInteger numBlockedWorkers = new AtomicInteger(0); + final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); + + List> futures = new ArrayList>(); + for (int i = 0; i < nThreads; i++) { + futures.add(service.submit(new Callable() { + /** + * The workers are going to perform a couple of reads. The second + * read will follow the killing of a regionserver so that we make sure + * that some of threads go into PreemptiveFastFailExcception + */ + public Boolean call() throws Exception { + try (Table table = + connection.getTable(TableName.valueOf(tableName))) { + Thread.sleep(Math.abs(random.nextInt()) % 100); // Add some jitter here + byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) % numRows); + Get g = new Get(row); + g.addColumn(FAMILY, QUALIFIER); + try { + table.get(g); + } catch (Exception e) { + LOG.debug("Get failed : ", e); + doneHalfway.countDown(); + return false; + } + + // Done with one get, proceeding to do the next one. + doneHalfway.countDown(); + continueOtherHalf.await(); + + long startTime = System.currentTimeMillis(); + g = new Get(row); + g.addColumn(FAMILY, QUALIFIER); + try { + table.get(g); + // The get was successful + numSuccessfullThreads.addAndGet(1); + } catch (Exception e) { + if (e instanceof PreemptiveFastFailException) { + // We were issued a PreemptiveFastFailException + numPreemptiveFastFailExceptions.addAndGet(1); + } + // Irrespective of PFFE, the request failed. + numFailedThreads.addAndGet(1); + return false; + } finally { + long enTime = System.currentTimeMillis(); + totalTimeTaken.addAndGet(enTime - startTime); + if ((enTime - startTime) >= SLEEPTIME) { + // Considering the slow workers as the blockedWorkers. + // This assumes that the threads go full throttle at performing + // actions. In case the thread scheduling itself is as slow as + // SLEEPTIME, then this test might fail and so, we might have + // set it to a higher number on slower machines. + numBlockedWorkers.addAndGet(1); + } + } + return true; + } catch (Exception e) { + LOG.error("Caught unknown exception", e); + doneHalfway.countDown(); + return false; + } + } + })); + } + + doneHalfway.await(); + + ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus(); + + // Kill a regionserver + TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); + TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); + + // Let the threads continue going + continueOtherHalf.countDown(); + + Thread.sleep(2 * SLEEPTIME); + // Restore the cluster + TEST_UTIL.getHBaseCluster().restoreClusterStatus(status); + + int numThreadsReturnedFalse = 0; + int numThreadsReturnedTrue = 0; + int numThreadsThrewExceptions = 0; + for (Future f : futures) { + try { + numThreadsReturnedTrue += f.get() ? 1 : 0; + numThreadsReturnedFalse += f.get() ? 0 : 1; + } catch (Exception e) { + numThreadsThrewExceptions++; + } + } + LOG.debug("numThreadsReturnedFalse:" + numThreadsReturnedFalse + + " numThreadsReturnedTrue:" + numThreadsReturnedTrue + + " numThreadsThrewExceptions:" + numThreadsThrewExceptions + + " numFailedThreads:" + numFailedThreads.get() + + " numSuccessfullThreads:" + numSuccessfullThreads.get() + + " numBlockedWorkers:" + numBlockedWorkers.get() + + " totalTimeWaited: " + totalTimeTaken.get() / + (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers.get()) + + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); + + assertEquals("The expected number of all the successfull and the failed " + + "threads should equal the total number of threads that we spawned", + nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); + assertEquals("All the failures should be coming from the secondput failure", + numFailedThreads.get(), numThreadsReturnedFalse); + assertEquals("Number of threads that threw execution exceptions " + + "otherwise should be 0", numThreadsThrewExceptions, 0); + assertEquals("The regionservers that returned true should equal to the" + + " number of successful threads", numThreadsReturnedTrue, + numSuccessfullThreads.get()); + assertTrue("There should be atleast one thread that retried instead of failing", + MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); + assertTrue("There should be atleast one PreemptiveFastFail exception," + + " otherwise, the test makes little sense." + + "numPreemptiveFastFailExceptions: " + numPreemptiveFastFailExceptions.get(), + numPreemptiveFastFailExceptions.get() > 0); + assertTrue("Only few thread should ideally be waiting for the dead " + + "regionserver to be coming back. numBlockedWorkers:" + + numBlockedWorkers.get() + " threads that retried : " + + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), + numBlockedWorkers.get()<= MyPreemptiveFastFailInterceptor.numBraveSouls.get()); + } + + public static class MyPreemptiveFastFailInterceptor extends PreemptiveFastFailInterceptor { + public static AtomicInteger numBraveSouls = new AtomicInteger(); + @Override + protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { + boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); + if (ret) numBraveSouls.addAndGet(1); + return ret; + } + + public MyPreemptiveFastFailInterceptor(Configuration conf) { + super(conf); + } + } + + private byte[] longToByteArrayKey(long rowKey) { + return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); + } + + public void writeData(Table table, long numRows) + throws IOException, InterruptedException { + table.flushCommits(); + for (long i = 0; i < numRows; i++) { + byte [] rowKey = longToByteArrayKey(i); + Put put = new Put(rowKey); + byte[] value = rowKey; // value is the same as the row key + put.add(FAMILY, QUALIFIER, value); + table.put(put); + } + LOG.info("Written all puts."); + } + +} -- 1.9.4