commit 820f629423f21fbd1dcc7a383955443a2595fd5d Author: Enis Soztutar Date: Fri Jan 2 13:07:57 2015 -0800 HBASE-12028 Abort the RegionServer, when it's handler threads die (Alicia Ying Shu) diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 33b71ad..f67449b 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -867,6 +867,17 @@ public final class HConstants { public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count"; public static final int DEFAULT_REGION_SERVER_HANDLER_COUNT = 30; + /* + * REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT: + * -1 => Disable aborting + * 0 => Abort if even a single handler has died + * 0.x => Abort only when this percent of handlers have died + * 1 => Abort only all of the handers have died + */ + public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = + "hbase.regionserver.handler.abort.on.error.percent"; + public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5; + public static final String REGION_SERVER_META_HANDLER_COUNT = "hbase.regionserver.metahandler.count"; public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10; diff --git hbase-common/src/main/resources/hbase-default.xml hbase-common/src/main/resources/hbase-default.xml index 5edf452..e412b8f 100644 --- hbase-common/src/main/resources/hbase-default.xml +++ hbase-common/src/main/resources/hbase-default.xml @@ -1452,4 +1452,12 @@ possible configurations would overwhelm and obscure the important. hbase.http.staticuser.user dr.stack + + hbase.regionserver.handler.abort.on.error.percent + 0.5 + The percent of region server RPC threads failed to abort RS. + -1 Disable aborting; 0 Abort if even a single handler has died; + 0.x Abort only when this percent of handlers have died; + 1 Abort only all of the handers have died. + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 3ba842b..56424df 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -22,9 +22,11 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -40,12 +42,23 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength) { - this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength); + this(name, handlerCount, numQueues, maxQueueLength, null, null); + } + + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final int maxQueueLength, final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength); + } + + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final Class queueClass, Object... initargs) { + this(name, handlerCount, numQueues, null, null, queueClass, initargs); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final Configuration conf, final Abortable abortable, final Class queueClass, Object... initargs) { - super(name, Math.max(handlerCount, numQueues)); + super(name, Math.max(handlerCount, numQueues), conf, abortable); queues = new ArrayList>(numQueues); this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index b89a5d2..56bd96b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.ipc; */ import java.nio.channels.ClosedChannelException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -111,6 +111,9 @@ public class CallRunner { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; error = StringUtils.stringifyException(e); + if (e instanceof Error) { + throw (Error)e; + } } finally { if (traceScope != null) { traceScope.close(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index deac2f8..2b58680 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -26,9 +26,11 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; @@ -60,25 +62,35 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final int numScanQueues; public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, - final float readShare, final int maxQueueLength) { - this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class); + final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { + this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, + conf, abortable, LinkedBlockingQueue.class); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final float scanShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable) { this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, - LinkedBlockingQueue.class); + conf, abortable, LinkedBlockingQueue.class); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, final Class readQueueClass, Object... readQueueInitArgs) { - this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, + this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable, readQueueClass, readQueueInitArgs); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final float scanShare, final int maxQueueLength, + final Configuration conf, final Abortable abortable, final Class readQueueClass, Object... readQueueInitArgs) { this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 1b0934c..709429d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -21,13 +21,17 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -41,15 +45,26 @@ public abstract class RpcExecutor { private final List handlers; private final int handlerCount; private final String name; + private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private boolean running; + private Configuration conf = null; + private Abortable abortable = null; + public RpcExecutor(final String name, final int handlerCount) { this.handlers = new ArrayList(handlerCount); this.handlerCount = handlerCount; this.name = Strings.nullToEmpty(name); } + public RpcExecutor(final String name, final int handlerCount, final Configuration conf, + final Abortable abortable) { + this(name, handlerCount); + this.conf = conf; + this.abortable = abortable; + } + public void start(final int port) { running = true; startHandlers(port); @@ -94,7 +109,7 @@ public abstract class RpcExecutor { }); t.setDaemon(true); t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + - ",queue=" + index + ",port=" + port); + ",queue=" + index + ",port=" + port); t.start(); LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); handlers.add(t); @@ -103,6 +118,9 @@ public abstract class RpcExecutor { protected void consumerLoop(final BlockingQueue myQueue) { boolean interrupted = false; + double handlerFailureThreshhold = + conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); try { while (running) { try { @@ -110,9 +128,30 @@ public abstract class RpcExecutor { try { activeHandlerCount.incrementAndGet(); task.run(); - } catch (Throwable t) { - LOG.error("RpcServer handler thread throws exception: ", t); - throw t; + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (handlerFailureThreshhold >= 0 + && failedCount > handlerCount * handlerFailureThreshhold) { + String message = + "Number of failed RpcServer handler exceeded threshhold " + + handlerFailureThreshhold + " with failed reason: " + + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Received " + StringUtils.stringifyException(e) + + " but not aborting due to abortable being null"); + throw e; + } + } else { + LOG.warn("RpcServer handler threads encountered errors " + + StringUtils.stringifyException(e)); + } + } else { + LOG.warn("RpcServer handler threads encountered exceptions " + + StringUtils.stringifyException(e)); + } } finally { activeHandlerCount.decrementAndGet(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index dd66570..cbe8adc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -22,11 +22,12 @@ import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** @@ -93,6 +94,8 @@ public class SimpleRpcScheduler extends RpcScheduler { /** What level a high priority call is at. */ private final int highPriorityLevel; + private Abortable abortable = null; + /** * @param conf * @param handlerCount the number of handler threads that will be used to process calls @@ -107,11 +110,13 @@ public class SimpleRpcScheduler extends RpcScheduler { int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, + Abortable server, int highPriorityLevel) { int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.priority = priority; this.highPriorityLevel = highPriorityLevel; + this.abortable = server; String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); @@ -127,30 +132,41 @@ public class SimpleRpcScheduler extends RpcScheduler { if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength, + callqReadShare, callqScanShare, maxQueueLength, conf, abortable, BoundedPriorityBlockingQueue.class, callPriority); } else { callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, - callqReadShare, callqScanShare, maxQueueLength); + callqReadShare, callqScanShare, maxQueueLength, conf, abortable); } } else { // multiple queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else { callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, - numCallQueues, maxQueueLength); + numCallQueues, maxQueueLength, conf, abortable); } } this.priorityExecutor = priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, - 1, maxQueueLength) : null; + 1, maxQueueLength, conf, abortable) : null; this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", - replicationHandlerCount, 1, maxQueueLength) : null; + replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + } + + public SimpleRpcScheduler( + Configuration conf, + int handlerCount, + int priorityHandlerCount, + int replicationHandlerCount, + PriorityFunction priority, + int highPriorityLevel) { + this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, + null, highPriorityLevel); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 492b26d..103f705 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -792,7 +792,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcServer = new RpcServer(rs, name, getServices(), initialIsa, // BindAddress is IP we got for this server. rs.conf, - rpcSchedulerFactory.create(rs.conf, this)); + rpcSchedulerFactory.create(rs.conf, this, rs)); scannerLeaseTimeoutPeriod = rs.conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java index 9e55f1f..f554781 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; @@ -34,5 +35,9 @@ public interface RpcSchedulerFactory { /** * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}. */ + RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server); + + @Deprecated RpcScheduler create(Configuration conf, PriorityFunction priority); + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java index e1dba74..b044a43 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; @@ -32,17 +33,25 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { @Override + @Deprecated public RpcScheduler create(Configuration conf, PriorityFunction priority) { + return create(conf, priority, null); + } + + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); return new SimpleRpcScheduler( - conf, - handlerCount, - conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT), - conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), - priority, - HConstants.QOS_THRESHOLD); + conf, + handlerCount, + conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT), + conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), + priority, + server, + HConstants.QOS_THRESHOLD); } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java new file mode 100644 index 0000000..b1e0b69 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +@Category({RPCTests.class, SmallTests.class}) +public class TestRpcHandlerException { + public static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class); + static String example = "xyz"; + static byte[] CELL_BYTES = example.getBytes(); + static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + + private final static Configuration CONF = HBaseConfiguration.create(); + RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class); + + // We are using the test TestRpcServiceProtos generated classes and Service because they are + // available and basic with methods like 'echo', and ping. Below we make a blocking service + // by passing in implementation of blocking interface. We use this service in all tests that + // follow. + private static final BlockingService SERVICE = + TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(new TestRpcServiceProtos + .TestProtobufRpcProto.BlockingInterface() { + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws Error, RuntimeException { + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since + // this is + // an echo, just put them back on the controller creating a new block. Tests our + // block + // building. + CellScanner cellScanner = pcrc.cellScanner(); + List list = null; + if (cellScanner != null) { + list = new ArrayList(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + throw new StackOverflowError(); + } + } catch (StackOverflowError e) { + throw e; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + } + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + }); + + /** + * Instance of server. We actually don't do anything speical in here so could just use + * HBaseRpcServer directly. + */ + private static class TestRpcServer extends RpcServer { + + TestRpcServer() throws IOException { + this(new FifoRpcScheduler(CONF, 1)); + } + + TestRpcServer(RpcScheduler scheduler) throws IOException { + super(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); + } + + @Override + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return super.call(service, md, param, cellScanner, receiveTime, status); + } + } + + /** Tests that the rpc scheduler is called when requests arrive. + * When Rpc handler thread dies, the client will hang and the test will fail. + * The test is meant to be a unit test to test the behavior. + * + * */ + private class AbortServer implements Abortable { + private boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + aborted = true; + } + + @Override + public boolean isAborted() { + return aborted; + } + } + + /* This is a unit test to make sure to abort region server when the number of Rpc handler thread + * caught errors exceeds the threshold. Client will hang when RS aborts. + */ + @Ignore + @Test + public void testRpcScheduler() throws IOException, InterruptedException { + PriorityFunction qosFunction = mock(PriorityFunction.class); + Abortable abortable = new AbortServer(); + RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0); + RpcServer rpcServer = new TestRpcServer(scheduler); + RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT); + try { + rpcServer.start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md + .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); + } catch (Throwable e) { + assert(abortable.isAborted() == true); + } finally { + rpcServer.stop(); + } + } + +}