diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 33b71ad..a48d73a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -867,6 +867,17 @@ public final class HConstants {
public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30;
+ /*
+ * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT:
+ * -1 => Disable aborting
+ * 0 => Abort if even a single handler has died
+ * 0.x => Abort only when this percent of handlers have died
+ * 1 => Abort only all of the handers have died
+ */
+ public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
+ "hbase.regionserver.handler.abort.on.error.percent";
+ public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
+
public static final String REGION_SERVER_META_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 5edf452..ea4db5e 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1452,4 +1452,12 @@ possible configurations would overwhelm and obscure the important.
hbase.http.staticuser.user
dr.stack
+
+ hbase.regionserver.handler.abort.on.error.percent
+ 0.5
+ The percent of region server RPC threads failed to abort RS.
+ -1 Disable aborting; 0 Abort if even a single handler has died;
+ 0.x Abort only when this percent of handlers have died;
+ 1 Abort only all of the handers have died.
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 3ba842b..7e677c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -22,9 +22,11 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@@ -40,12 +42,23 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
- this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+ this(name, handlerCount, numQueues, maxQueueLength, null, null);
}
-
+
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final int maxQueueLength, final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
+ }
+
+ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final Class extends BlockingQueue> queueClass, Object... initargs) {
+ this(name, handlerCount, numQueues, null, null, queueClass, initargs);
+ }
+
+ public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> queueClass, Object... initargs) {
- super(name, Math.max(handlerCount, numQueues));
+ super(name, Math.max(handlerCount, numQueues), conf, abortable);
queues = new ArrayList>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index b89a5d2..56bd96b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.ipc;
*/
import java.nio.channels.ClosedChannelException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -111,6 +111,9 @@ public class CallRunner {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
error = StringUtils.stringifyException(e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ }
} finally {
if (traceScope != null) {
traceScope.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index deac2f8..c0db56e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
@@ -60,25 +62,35 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
- final float readShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+ final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
+ conf, abortable, LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
- LinkedBlockingQueue.class);
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
+ conf, abortable, LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
- final float readShare, final int maxQueueLength,
+ final float readShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
- this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
readQueueClass, readQueueInitArgs);
}
-
+
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
- final float readShare, final float scanShare, final int maxQueueLength,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Configuration conf, final Abortable abortable,
final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1b0934c..fcbc0bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -21,13 +21,17 @@ package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -41,15 +45,26 @@ public abstract class RpcExecutor {
private final List handlers;
private final int handlerCount;
private final String name;
+ private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private boolean running;
+ private Configuration conf = null;
+ private Abortable abortable = null;
+
public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
}
+ public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
+ final Abortable abortable) {
+ this(name, handlerCount);
+ this.conf = conf;
+ this.abortable = abortable;
+ }
+
public void start(final int port) {
running = true;
startHandlers(port);
@@ -94,7 +109,7 @@ public abstract class RpcExecutor {
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
- ",queue=" + index + ",port=" + port);
+ ",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
@@ -103,6 +118,9 @@ public abstract class RpcExecutor {
protected void consumerLoop(final BlockingQueue myQueue) {
boolean interrupted = false;
+ double handlerFailureThreshhold =
+ conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try {
while (running) {
try {
@@ -110,10 +128,31 @@ public abstract class RpcExecutor {
try {
activeHandlerCount.incrementAndGet();
task.run();
- } catch (Throwable t) {
- LOG.error("RpcServer handler thread throws exception: ", t);
- throw t;
- } finally {
+ } catch (Throwable e) {
+ if (e instanceof Error) {
+ int failedCount = failedHandlerCount.incrementAndGet();
+ if (handlerFailureThreshhold >= 0
+ && failedCount > handlerCount * handlerFailureThreshhold) {
+ String message =
+ "Number of failed RpcServer handler exceeded threshhold "
+ + handlerFailureThreshhold + " with failed reason: "
+ + StringUtils.stringifyException(e);
+ if (abortable != null) {
+ abortable.abort(message, e);
+ } else {
+ LOG.error("Received " + StringUtils.stringifyException(e)
+ + " but not aborting due to abortable being null");
+ throw e;
+ }
+ } else {
+ LOG.warn("RpcServer handler threads encountered errors "
+ + StringUtils.stringifyException(e));
+ }
+ } else {
+ LOG.warn("RpcServer handler threads encountered exceptions "
+ + StringUtils.stringifyException(e));
+ }
+ } finally {
activeHandlerCount.decrementAndGet();
}
} catch (InterruptedException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index dd66570..b1203d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -22,11 +22,12 @@ import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
@@ -92,6 +93,8 @@ public class SimpleRpcScheduler extends RpcScheduler {
/** What level a high priority call is at. */
private final int highPriorityLevel;
+
+ private Abortable abortable = null;
/**
* @param conf
@@ -107,11 +110,13 @@ public class SimpleRpcScheduler extends RpcScheduler {
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
+ Abortable server,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
+ this.abortable = server;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
@@ -127,32 +132,43 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
- callqReadShare, callqScanShare, maxQueueLength,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
- callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
- callqReadShare, callqScanShare, maxQueueLength);
+ callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
+ callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
}
} else {
// multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
- callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
- BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+ conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
- numCallQueues, maxQueueLength);
+ numCallQueues, maxQueueLength, conf, abortable);
}
}
this.priorityExecutor =
priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
- 1, maxQueueLength) : null;
+ 1, maxQueueLength, conf, abortable) : null;
this.replicationExecutor =
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
- replicationHandlerCount, 1, maxQueueLength) : null;
+ replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
}
+ public SimpleRpcScheduler(
+ Configuration conf,
+ int handlerCount,
+ int priorityHandlerCount,
+ int replicationHandlerCount,
+ PriorityFunction priority,
+ int highPriorityLevel) {
+ this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
+ null, highPriorityLevel);
+ }
+
@Override
public void init(Context context) {
this.port = context.getListenerAddress().getPort();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 492b26d..103f705 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -792,7 +792,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer = new RpcServer(rs, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
rs.conf,
- rpcSchedulerFactory.create(rs.conf, this));
+ rpcSchedulerFactory.create(rs.conf, this, rs));
scannerLeaseTimeoutPeriod = rs.conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
index 9e55f1f..bb7a912 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
@@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
@@ -34,5 +35,9 @@ public interface RpcSchedulerFactory {
/**
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
*/
+ RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server);
+
+ @Deprecated
RpcScheduler create(Configuration conf, PriorityFunction priority);
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index e1dba74..fc416ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
@@ -32,17 +33,25 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override
+ @Deprecated
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
- int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ return create(conf, priority, null);
+ }
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
- conf,
- handlerCount,
- conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
- conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
- priority,
- HConstants.QOS_THRESHOLD);
+ conf,
+ handlerCount,
+ conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
+ conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+ priority,
+ server,
+ HConstants.QOS_THRESHOLD);
}
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index be16529..77267f6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@@ -31,11 +33,12 @@ public class TestCallRunner {
*/
@Test
public void testSimpleCall() {
- RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
- Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
- RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
- mockCall.connection = Mockito.mock(RpcServer.Connection.class);
- CallRunner cr = new CallRunner(mockRpcServer, mockCall, new UserProvider());
- cr.run();
+ RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
+ Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
+ RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
+ mockCall.connection = Mockito.mock(RpcServer.Connection.class);
+ mockCall.connection.channel = Mockito.mock(SocketChannel.class);
+ CallRunner cr = new CallRunner(mockRpcServer, mockCall, new UserProvider());
+ cr.run();
}
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
new file mode 100644
index 0000000..ed94ce7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@Category({RPCTests.class, SmallTests.class})
+public class TestRpcHandlerException {
+ public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class);
+ static String example = "xyz";
+ static byte[] CELL_BYTES = example.getBytes();
+ static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+
+ private final static Configuration CONF = HBaseConfiguration.create();
+ RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class);
+
+ // We are using the test TestRpcServiceProtos generated classes and Service because they are
+ // available and basic with methods like 'echo', and ping. Below we make a blocking service
+ // by passing in implementation of blocking interface. We use this service in all tests that
+ // follow.
+ private static final BlockingService SERVICE =
+ TestRpcServiceProtos.TestProtobufRpcProto
+ .newReflectiveBlockingService(new TestRpcServiceProtos
+ .TestProtobufRpcProto.BlockingInterface() {
+
+ @Override
+ public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
+ throws Error, RuntimeException {
+ if (controller instanceof PayloadCarryingRpcController) {
+ PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
+ // If cells, scan them to check we are able to iterate what we were given and since
+ // this is
+ // an echo, just put them back on the controller creating a new block. Tests our
+ // block
+ // building.
+ CellScanner cellScanner = pcrc.cellScanner();
+ List list = null;
+ if (cellScanner != null) {
+ list = new ArrayList| ();
+ try {
+ while (cellScanner.advance()) {
+ list.add(cellScanner.current());
+ throw new StackOverflowError();
+ }
+ } catch (StackOverflowError e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ cellScanner = CellUtil.createCellScanner(list);
+ ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
+ }
+ return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
+ }
+ });
+
+ /**
+ * Instance of server. We actually don't do anything speical in here so could just use
+ * HBaseRpcServer directly.
+ */
+ private static class TestRpcServer extends RpcServer {
+
+ TestRpcServer() throws IOException {
+ this(new FifoRpcScheduler(CONF, 1));
+ }
+
+ TestRpcServer(RpcScheduler scheduler) throws IOException {
+ super(null, "testRpcServer",
+ Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), CONF, scheduler);
+ }
+
+ @Override
+ public Pair call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+ throws IOException {
+ return super.call(service, md, param, cellScanner, receiveTime, status);
+ }
+ }
+
+ /** Tests that the rpc scheduler is called when requests arrive.
+ * When Rpc handler thread dies, the client will hang and the test will fail.
+ * The test is meant to be a unit test to test the behavior.
+ *
+ * */
+ private class AbortServer implements Abortable {
+ private boolean aborted = false;
+
+ @Override
+ public void abort(String why, Throwable e) {
+ aborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+ }
+
+ /* This is a unit test to make sure to abort region server when the number of Rpc handler thread
+ * caught errors exceeds the threshold. Client will hang when RS aborts.
+ */
+ @Ignore
+ @Test
+ public void testRpcScheduler() throws IOException, InterruptedException {
+ PriorityFunction qosFunction = mock(PriorityFunction.class);
+ Abortable abortable = new AbortServer();
+ RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
+ RpcServer rpcServer = new TestRpcServer(scheduler);
+ RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
+ try {
+ rpcServer.start();
+ MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+ EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md
+ .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
+ } catch (Throwable e) {
+ assert(abortable.isAborted() == true);
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+}
| |