diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 323ec8b..0d7d680 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,17 @@ public final class HConstants {
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
+ /*
+ * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
+ * -1 => Disable aborting
+ * 0 => Abort if even a single handler has died
+ * 0.x => Abort only when this percent of handlers have died
+ * 1 => Abort only all of the handers have died
+ */
+ public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
+ "hbase.regionserver.handler.abort.on.error.percent";
+ public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = -1;
+
public static final String REGION_SERVER_META_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index f285cf7..20a50fa 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1239,4 +1239,12 @@ possible configurations would overwhelm and obscure the important.
with the user issuing the mutation
+
+ hbase.regionserver.handler.abort.on.error.percent
+ -1
+ The percent of region server RPC threads failed to abort RS.
+ -1 Disable aborting; 0 Abort if even a single handler has died;
+ 0.x Abort only when this percent of handlers have died;
+ 1 Abort only all of the handers have died.
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 2418cf7..2a6fd9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@@ -41,12 +42,18 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
- this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+ this(name, handlerCount, numQueues, maxQueueLength, null, null);
+ }
+
+ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final int maxQueueLength, final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> queueClass, Object... initargs) {
- super(name, Math.max(handlerCount, numQueues));
+ super(name, Math.max(handlerCount, numQueues), conf, abortable);
queues = new ArrayList>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 18e2902..e2b003b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -111,6 +111,9 @@ public class CallRunner {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ }
} finally {
if (traceScope != null) {
traceScope.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index ddab8fa..9a3d8cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
@@ -56,24 +58,32 @@ public class RWQueueRpcExecutor extends RpcExecutor {
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, maxQueueLength,
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, null, null);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final int maxQueueLength, final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, conf, abortable,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+ conf, abortable,
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
final int numWriteQueues, final int numReadQueues,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
- super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+ super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues), conf, abortable);
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 233c26e..bb6fdf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -41,15 +45,26 @@ public abstract class RpcExecutor {
private final List handlers;
private final int handlerCount;
private final String name;
+ private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private boolean running;
+ private Configuration conf = null;
+ private Abortable abortable = null;
+
public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
}
+ public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
+ final Abortable abortable) {
+ this(name, handlerCount);
+ this.conf = conf;
+ this.abortable = abortable;
+ }
+
public void start(final int port) {
running = true;
startHandlers(port);
@@ -103,6 +118,9 @@ public abstract class RpcExecutor {
protected void consumerLoop(final BlockingQueue myQueue) {
boolean interrupted = false;
+ double handlerFailureThreshhold =
+ conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try {
while (running) {
try {
@@ -111,11 +129,24 @@ public abstract class RpcExecutor {
activeHandlerCount.incrementAndGet();
task.run();
} catch (Error e) {
- LOG.error("RpcServer handler thread throws error: ", e);
- throw e;
- } catch (RuntimeException e) {
- LOG.error("RpcServer handler thread throws exception: ", e);
- throw e;
+ int failedCount = failedHandlerCount.incrementAndGet();
+ if (handlerFailureThreshhold >= 0
+ && failedCount > handlerCount * handlerFailureThreshhold) {
+ String message =
+ "Number of failed RpcServer handler exceeded threshhold "
+ + handlerFailureThreshhold + " with failed reason: "
+ + StringUtils.stringifyException(e);
+ if (abortable != null) {
+ abortable.abort(message, e);
+ } else {
+ LOG.error("Received " + StringUtils.stringifyException(e)
+ + " but not aborting due to abortable being null");
+ throw e;
+ }
+ } else {
+ LOG.warn("RpcServer handler threads encountered errors "
+ + StringUtils.stringifyException(e));
+ }
} finally {
activeHandlerCount.decrementAndGet();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index a44ec89..5f6f11f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* A scheduler that maintains isolated handler pools for general, high-priority and replication
@@ -49,6 +50,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
/** What level a high priority call is at. */
private final int highPriorityLevel;
+
+ private final Abortable abortable;
/**
* @param conf
@@ -64,12 +67,14 @@ public class SimpleRpcScheduler extends RpcScheduler {
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
+ Abortable abortable,
int highPriorityLevel) {
int maxQueueLength = conf.getInt(CALL_QUEUE_MAX_LENGTH_CONF_KEY,
conf.getInt("ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER));
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
+ this.abortable = abortable;
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY,
conf.getFloat("ipc.server.callqueue.read.share", 0));
@@ -83,19 +88,19 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
callExecutor = new RWQueueRpcExecutor("RW.Default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength);
+ callqReadShare, maxQueueLength, conf, abortable);
} else {
// multiple queues
callExecutor = new BalancedQueueRpcExecutor("B.Default", handlerCount,
- numCallQueues, maxQueueLength);
+ numCallQueues, maxQueueLength, conf, abortable);
}
- this.priorityExecutor =
- priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
- 1, maxQueueLength) : null;
- this.replicationExecutor =
- replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
- replicationHandlerCount, 1, maxQueueLength) : null;
+ this.priorityExecutor =
+ priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
+ 1, maxQueueLength, conf, abortable) : null;
+ this.replicationExecutor =
+ replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
+ replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index b8daa52..a2f1cf8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,6 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
server,
+ server,
HConstants.QOS_THRESHOLD);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 5040e75..435f874 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -68,7 +68,7 @@ public class TestSimpleRpcScheduler {
public void testBasic() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
RpcScheduler scheduler = new SimpleRpcScheduler(
- conf, 10, 0, 0, qosFunction, 0);
+ conf, 10, 0, 0, qosFunction, null, 0);
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
@@ -110,7 +110,7 @@ public class TestSimpleRpcScheduler {
}
RpcScheduler scheduler = new SimpleRpcScheduler(
- conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
+ conf, 1, 1 ,1, qosFunction, null, HConstants.HIGH_QOS);
scheduler.init(CONTEXT);
scheduler.start();
for (CallRunner task : tasks) {