From 863df6fee26034ee82cc95cf20fe05afcc0c914a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 27 Feb 2015 15:03:46 +0800 Subject: [PATCH] HBASE-13097 Use same EventLoopGroup for different AsyncRpcClients if possible --- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 102 +++- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 269 ++++++++++ .../org/apache/hadoop/hbase/ipc/TestAsyncIPC.java | 298 +++++++++++ .../hadoop/hbase/ipc/TestGlobalEventLoopGroup.java | 54 ++ .../java/org/apache/hadoop/hbase/ipc/TestIPC.java | 584 ++------------------- 5 files changed, 752 insertions(+), 555 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 192e583..71752f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -63,11 +64,12 @@ import com.google.protobuf.RpcController; /** * Netty client for the requests and responses */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class AsyncRpcClient extends AbstractRpcClient { public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; - public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; + public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; + public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; public static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(100, TimeUnit.MILLISECONDS); @@ -82,12 +84,54 @@ public class AsyncRpcClient extends AbstractRpcClient { protected final AtomicInteger callIdCnt = new AtomicInteger(); - private final EventLoopGroup eventLoopGroup; private final PoolMap connections; final FailedServers failedServers; - private final Bootstrap bootstrap; + @VisibleForTesting + final Bootstrap bootstrap; + + private final boolean useGlobalEventLoopGroup; + + @VisibleForTesting + static Pair> GLOBAL_EVENT_LOOP_GROUP; + + private synchronized static Pair> + getGlobalEventLoopGroup(Configuration conf) { + if (GLOBAL_EVENT_LOOP_GROUP == null) { + GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Create global event loop group " + + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName()); + } + } + return GLOBAL_EVENT_LOOP_GROUP; + } + + private static Pair> createEventLoopGroup( + Configuration conf) { + // Max amount of threads to use. 0 lets Netty decide based on amount of cores + int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); + + // Config to enable native transport. Does not seem to be stable at time of implementation + // although it is not extensively tested. + boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); + + // Use the faster native epoll transport mechanism on linux if enabled + if (epollEnabled && JVM.isLinux()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads); + } + return new Pair>(new EpollEventLoopGroup(maxThreads, + Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads); + } + return new Pair>(new NioEventLoopGroup(maxThreads, + Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class); + } + } /** * Constructor for tests @@ -106,23 +150,16 @@ public class AsyncRpcClient extends AbstractRpcClient { LOG.debug("Starting async Hbase RPC client"); } - // Max amount of threads to use. 0 lets Netty decide based on amount of cores - int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); - - // Config to enable native transport. Does not seem to be stable at time of implementation - // although it is not extensively tested. - boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); - - // Use the faster native epoll transport mechanism on linux if enabled - Class socketChannelClass; - if (epollEnabled && JVM.isLinux()) { - socketChannelClass = EpollSocketChannel.class; - this.eventLoopGroup = - new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + Pair> eventLoopGroupAndChannelClass; + this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true); + if (useGlobalEventLoopGroup) { + eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration); } else { - socketChannelClass = NioSocketChannel.class; - this.eventLoopGroup = - new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + eventLoopGroupAndChannelClass = createEventLoopGroup(configuration); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group " + + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName()); } this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); @@ -133,7 +170,8 @@ public class AsyncRpcClient extends AbstractRpcClient { // Configure the default bootstrap. this.bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(socketChannelClass) + bootstrap.group(eventLoopGroupAndChannelClass.getFirst()) + .channel(eventLoopGroupAndChannelClass.getSecond()) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.TCP_NODELAY, tcpNoDelay) .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) @@ -176,6 +214,9 @@ public class AsyncRpcClient extends AbstractRpcClient { protected Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress addr) throws IOException, InterruptedException { + if (pcrc == null) { + pcrc = new PayloadCarryingRpcController(); + } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); Promise promise = connection.callMethod(md, pcrc, param, returnType); @@ -236,6 +277,8 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + private boolean closed = false; + /** * Close netty */ @@ -245,12 +288,18 @@ public class AsyncRpcClient extends AbstractRpcClient { } synchronized (connections) { + if (closed) { + return; + } + closed = true; for (AsyncRpcChannel conn : connections.values()) { conn.close(null); } } - - eventLoopGroup.shutdownGracefully(); + // do not close global EventLoopGroup. + if (!useGlobalEventLoopGroup) { + bootstrap.group().shutdownGracefully(); + } } /** @@ -287,10 +336,6 @@ public class AsyncRpcClient extends AbstractRpcClient { */ private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, User ticket) throws StoppedRpcClientException, FailedServerException { - if (this.eventLoopGroup.isShuttingDown() || this.eventLoopGroup.isShutdown()) { - throw new StoppedRpcClientException(); - } - // Check if server is failed if (this.failedServers.isFailedServer(location)) { if (LOG.isDebugEnabled()) { @@ -305,6 +350,9 @@ public class AsyncRpcClient extends AbstractRpcClient { AsyncRpcChannel rpcChannel; synchronized (connections) { + if (closed) { + throw new StoppedRpcClientException(); + } rpcChannel = connections.get(hashCode); if (rpcChannel == null) { rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java new file mode 100644 index 0000000..32eb9f6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.util.StringUtils; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Some basic ipc tests. + */ +public abstract class AbstractTestIPC { + + private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class); + + private static byte[] CELL_BYTES = Bytes.toBytes("xyz"); + private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; + static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); + static final Configuration CONF = HBaseConfiguration.create(); + // We are using the test TestRpcServiceProtos generated classes and Service because they are + // available and basic with methods like 'echo', and ping. Below we make a blocking service + // by passing in implementation of blocking interface. We use this service in all tests that + // follow. + static final BlockingService SERVICE = + TestRpcServiceProtos.TestProtobufRpcProto + .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return null; + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws ServiceException { + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since + // this is + // an echo, just put them back on the controller creating a new block. Tests our + // block + // building. + CellScanner cellScanner = pcrc.cellScanner(); + List list = null; + if (cellScanner != null) { + list = new ArrayList(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + } + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + }); + + /** + * Instance of server. We actually don't do anything speical in here so could just use + * HBaseRpcServer directly. + */ + static class TestRpcServer extends RpcServer { + + TestRpcServer() throws IOException { + this(new FifoRpcScheduler(CONF, 1)); + } + + TestRpcServer(RpcScheduler scheduler) throws IOException { + super(null, "testRpcServer", Lists + .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( + "localhost", 0), CONF, scheduler); + } + + @Override + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return super.call(service, md, param, cellScanner, receiveTime, status); + } + } + + protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); + + /** + * Ensure we do not HAVE TO HAVE a codec. + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testNoCodec() throws InterruptedException, IOException { + Configuration conf = HBaseConfiguration.create(); + AbstractRpcClient client = createRpcClientNoCodec(conf); + TestRpcServer rpcServer = new TestRpcServer(); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + final String message = "hello"; + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); + Pair r = + client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + assertTrue(r.getSecond() == null); + // Silly assertion that the message is in the returned pb. + assertTrue(r.getFirst().toString().contains(message)); + } finally { + client.close(); + rpcServer.stop(); + } + } + + protected abstract AbstractRpcClient createRpcClient(Configuration conf); + + /** + * It is hard to verify the compression is actually happening under the wraps. Hope that if + * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to + * confirm that compression is happening down in the client and server). + * @throws IOException + * @throws InterruptedException + * @throws SecurityException + * @throws NoSuchMethodException + */ + @Test + public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, + NoSuchMethodException, ServiceException { + Configuration conf = new Configuration(HBaseConfiguration.create()); + conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); + List cells = new ArrayList(); + int count = 3; + for (int i = 0; i < count; i++) { + cells.add(CELL); + } + AbstractRpcClient client = createRpcClient(conf); + TestRpcServer rpcServer = new TestRpcServer(); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + PayloadCarryingRpcController pcrc = + new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + Pair r = + client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + int index = 0; + while (r.getSecond().advance()) { + assertTrue(CELL.equals(r.getSecond().current())); + index++; + } + assertEquals(count, index); + } finally { + client.close(); + rpcServer.stop(); + } + } + + protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) + throws IOException; + + @Test + public void testRTEDuringConnectionSetup() throws Exception { + Configuration conf = HBaseConfiguration.create(); + TestRpcServer rpcServer = new TestRpcServer(); + AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + client.call(null, md, param, null, User.getCurrent(), address); + fail("Expected an exception to have been thrown!"); + } catch (Exception e) { + LOG.info("Caught expected exception: " + e.toString()); + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } finally { + client.close(); + rpcServer.stop(); + } + } + + /** Tests that the rpc scheduler is called when requests arrive. */ + @Test + public void testRpcScheduler() throws IOException, InterruptedException { + RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); + RpcServer rpcServer = new TestRpcServer(scheduler); + verify(scheduler).init((RpcScheduler.Context) anyObject()); + AbstractRpcClient client = createRpcClient(CONF); + try { + rpcServer.start(); + verify(scheduler).start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + for (int i = 0; i < 10; i++) { + client.call( + new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, md + .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress()); + } + verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); + } finally { + rpcServer.stop(); + verify(scheduler).stop(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java new file mode 100644 index 0000000..768871c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.util.StringUtils; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; + +@RunWith(Parameterized.class) +@Category({ RPCTests.class, SmallTests.class }) +public class TestAsyncIPC extends AbstractTestIPC { + + private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Parameters + public static Collection parameters() { + List paramList = new ArrayList(); + paramList.add(new Object[] { false, false }); + paramList.add(new Object[] { false, true }); + paramList.add(new Object[] { true, false }); + paramList.add(new Object[] { true, true }); + return paramList; + } + + private final boolean useNativeTransport; + + private final boolean useGlobalEventLoopGroup; + + public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) { + this.useNativeTransport = useNativeTransport; + this.useGlobalEventLoopGroup = useGlobalEventLoopGroup; + } + + private void setConf(Configuration conf) { + conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport); + conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useGlobalEventLoopGroup); + if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) { + if (useNativeTransport + && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) + || (!useNativeTransport + && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) { + AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully(); + AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null; + } + } + } + + @Override + protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { + setConf(conf); + return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { + + @Override + Codec getCodec() { + return super.getCodec(); + } + + }; + } + + @Override + protected AsyncRpcClient createRpcClient(Configuration conf) { + setConf(conf); + return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + } + + @Override + protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { + setConf(conf); + return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, + new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + promise.setFailure(new RuntimeException("Injected fault")); + } + }); + } + }); + } + + @Test + public void testAsyncConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + AsyncRpcClient client = createRpcClient(CONF); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = + client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + channel.callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType() + .toProto(), new RpcCallback() { + @Override + public void run(Message parameter) { + done.set(true); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + client.close(); + rpcServer.stop(); + } + } + + @Test + public void testRTEDuringAsyncConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + AsyncRpcClient client = createRpcClientRTEDuringConnectionSetup(CONF); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = + client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.notifyOnFail(new RpcCallback() { + @Override + public void run(IOException e) { + done.set(true); + LOG.info("Caught expected exception: " + e.toString()); + assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); + } + }); + + channel.callMethod(md, controller, param, md.getOutputType().toProto(), + new RpcCallback() { + @Override + public void run(Message parameter) { + done.set(true); + fail("Expected an exception to have been thrown!"); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + client.close(); + rpcServer.stop(); + } + } + + public static void main(String[] args) throws IOException, SecurityException, + NoSuchMethodException, InterruptedException { + if (args.length != 2) { + System.out.println("Usage: TestAsyncIPC "); + return; + } + // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); + // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); + int cycles = Integer.parseInt(args[0]); + int cellcount = Integer.parseInt(args[1]); + Configuration conf = HBaseConfiguration.create(); + TestRpcServer rpcServer = new TestRpcServer(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + KeyValue kv = BIG_CELL; + Put p = new Put(CellUtil.cloneRow(kv)); + for (int i = 0; i < cellcount; i++) { + p.add(kv); + } + RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); + rm.add(p); + try { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + long startTime = System.currentTimeMillis(); + User user = User.getCurrent(); + for (int i = 0; i < cycles; i++) { + List cells = new ArrayList(); + // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); + ClientProtos.RegionAction.Builder builder = + RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, + RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), + MutationProto.newBuilder()); + builder.setRegion(RegionSpecifier + .newBuilder() + .setType(RegionSpecifierType.REGION_NAME) + .setValue( + ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); + if (i % 100000 == 0) { + LOG.info("" + i); + // Uncomment this for a thread dump every so often. + // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), + // "Thread dump " + Thread.currentThread().getName()); + } + PayloadCarryingRpcController pcrc = + new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); + // Pair response = + client.call(pcrc, md, builder.build(), param, user, address); + /* + * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), + * count); + */ + } + LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + + (System.currentTimeMillis() - startTime) + "ms"); + } finally { + client.close(); + rpcServer.stop(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java new file mode 100644 index 0000000..60dbd1b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestGlobalEventLoopGroup { + + @Test + public void test() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); + AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); + AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + assertSame(client.bootstrap.group(), client1.bootstrap.group()); + client1.close(); + assertFalse(client.bootstrap.group().isShuttingDown()); + + conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); + AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); + client2.close(); + + client.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 0933f52..67e4e4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -1,76 +1,48 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.SocketChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; @@ -78,500 +50,55 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.StringUtils; -import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -/** - * Some basic ipc tests. - */ -@Category({RPCTests.class, SmallTests.class}) -public class TestIPC { - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - public static final Log LOG = LogFactory.getLog(TestIPC.class); - - static byte [] CELL_BYTES = Bytes.toBytes("xyz"); - static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); - static byte [] BIG_CELL_BYTES = new byte [10 * 1024]; - static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); - private final static Configuration CONF = HBaseConfiguration.create(); - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - private static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, - EmptyRequestProto request) throws ServiceException { - // TODO Auto-generated method stub - return null; - } +@Category({ RPCTests.class, SmallTests.class }) +public class TestIPC extends AbstractTestIPC { - @Override - public EmptyResponseProto error(RpcController controller, - EmptyRequestProto request) throws ServiceException { - // TODO Auto-generated method stub - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; - // If cells, scan them to check we are able to iterate what we were given and since this is - // an echo, just put them back on the controller creating a new block. Tests our block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List list = null; - if (cellScanner != null) { - list = new ArrayList(); - try { - while(cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController)controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); - - /** - * Instance of server. We actually don't do anything speical in here so could just use - * HBaseRpcServer directly. - */ - private static class TestRpcServer extends RpcServer { - - TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); - } - - TestRpcServer(RpcScheduler scheduler) throws IOException { - super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } + private static final Log LOG = LogFactory.getLog(TestIPC.class); - @Override - public Pair call(BlockingService service, - MethodDescriptor md, Message param, CellScanner cellScanner, - long receiveTime, MonitoredRPCHandler status) throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); - } - } - - /** - * Ensure we do not HAVE TO HAVE a codec. - * @throws InterruptedException - * @throws IOException - */ - @Test - public void testNoCodec() throws InterruptedException, IOException { - Configuration conf = HBaseConfiguration.create(); - RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { + @Override + protected RpcClientImpl createRpcClientNoCodec(Configuration conf) { + return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { @Override Codec getCodec() { return null; } }; - TestRpcServer rpcServer = new TestRpcServer(); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - final String message = "hello"; - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - Pair r = client.call(null, md, param, - md.getOutputType().toProto(), User.getCurrent(), address); - assertTrue(r.getSecond() == null); - // Silly assertion that the message is in the returned pb. - assertTrue(r.getFirst().toString().contains(message)); - } finally { - client.close(); - rpcServer.stop(); - } } - /** - * Ensure we do not HAVE TO HAVE a codec. - * - * @throws InterruptedException - * @throws IOException - */ - @Test public void testNoCodecAsync() throws InterruptedException, IOException, ServiceException { - Configuration conf = HBaseConfiguration.create(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { - @Override Codec getCodec() { - return null; - } - }; - TestRpcServer rpcServer = new TestRpcServer(); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - final String message = "hello"; - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - - BlockingRpcChannel channel = client - .createBlockingRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), User.getCurrent(), 0); - - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - Message response = - channel.callBlockingMethod(md, controller, param, md.getOutputType().toProto()); - - assertTrue(controller.cellScanner() == null); - // Silly assertion that the message is in the returned pb. - assertTrue(response.toString().contains(message)); - } finally { - client.close(); - rpcServer.stop(); - } - } - - /** - * It is hard to verify the compression is actually happening under the wraps. Hope that if - * unsupported, we'll get an exception out of some time (meantime, have to trace it manually - * to confirm that compression is happening down in the client and server). - * @throws IOException - * @throws InterruptedException - * @throws SecurityException - * @throws NoSuchMethodException - */ - @Test - public void testCompressCellBlock() - throws IOException, InterruptedException, SecurityException, NoSuchMethodException, - ServiceException { - Configuration conf = new Configuration(HBaseConfiguration.create()); - conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); - doSimpleTest(new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT)); - - // Another test for the async client - doAsyncSimpleTest(new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null)); - } - - private void doSimpleTest(final RpcClientImpl client) - throws InterruptedException, IOException { - TestRpcServer rpcServer = new TestRpcServer(); - List cells = new ArrayList(); - int count = 3; - for (int i = 0; i < count; i++) cells.add(CELL); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - Pair r = client - .call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); - int index = 0; - while (r.getSecond().advance()) { - assertTrue(CELL.equals(r.getSecond().current())); - index++; - } - assertEquals(count, index); - } finally { - client.close(); - rpcServer.stop(); - } - } - - private void doAsyncSimpleTest(final AsyncRpcClient client) - throws InterruptedException, IOException, ServiceException { - TestRpcServer rpcServer = new TestRpcServer(); - List cells = new ArrayList(); - int count = 3; - for (int i = 0; i < count; i++) - cells.add(CELL); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - - BlockingRpcChannel channel = client.createBlockingRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - channel.callBlockingMethod(md, pcrc, param, md.getOutputType().toProto()); - - CellScanner cellScanner = pcrc.cellScanner(); - - int index = 0; - while (cellScanner.advance()) { - assertTrue(CELL.equals(cellScanner.current())); - index++; - } - assertEquals(count, index); - } finally { - client.close(); - rpcServer.stop(); - } + @Override + protected RpcClientImpl createRpcClient(Configuration conf) { + return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); } - @Test - public void testRTEDuringConnectionSetup() throws Exception { - Configuration conf = HBaseConfiguration.create(); + @Override + protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf) + throws IOException { SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf)); Mockito.doAnswer(new Answer() { @Override public Socket answer(InvocationOnMock invocation) throws Throwable { - Socket s = spy((Socket)invocation.callRealMethod()); + Socket s = spy((Socket) invocation.callRealMethod()); doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt()); return s; } }).when(spyFactory).createSocket(); - TestRpcServer rpcServer = new TestRpcServer(); - RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(null, md, param, null, User.getCurrent(), address); - fail("Expected an exception to have been thrown!"); - } catch (Exception e) { - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } finally { - client.close(); - rpcServer.stop(); - } - } - - @Test - public void testRTEDuringAsyncBlockingConnectionSetup() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, - new ChannelInitializer() { - - @Override protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); - } - }); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - BlockingRpcChannel channel = client.createBlockingRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - channel.callBlockingMethod(md, new PayloadCarryingRpcController(), param, - md.getOutputType().toProto()); - - fail("Expected an exception to have been thrown!"); - } catch (Exception e) { - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } finally { - client.close(); - rpcServer.stop(); - } + return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); } - - @Test - public void testRTEDuringAsyncConnectionSetup() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, - new ChannelInitializer() { - - @Override protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); - } - }); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = client.createRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.notifyOnFail(new RpcCallback() { - @Override - public void run(IOException e) { - done.set(true); - LOG.info("Caught expected exception: " + e.toString()); - assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); - } - }); - - channel.callMethod(md, controller, param, - md.getOutputType().toProto(), new RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - fail("Expected an exception to have been thrown!"); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - - @Test - public void testAsyncConnectionSetup() throws Exception { - Configuration conf = HBaseConfiguration.create(); - - TestRpcServer rpcServer = new TestRpcServer(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); - try { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - - RpcChannel channel = client.createRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - - final AtomicBoolean done = new AtomicBoolean(false); - - channel.callMethod(md, new PayloadCarryingRpcController(), param, - md.getOutputType().toProto(), new RpcCallback() { - @Override - public void run(Message parameter) { - done.set(true); - } - }); - - TEST_UTIL.waitFor(1000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return done.get(); - } - }); - } finally { - client.close(); - rpcServer.stop(); - } - } - - /** Tests that the rpc scheduler is called when requests arrive. */ - @Test - public void testRpcScheduler() throws IOException, InterruptedException { - RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); - RpcServer rpcServer = new TestRpcServer(scheduler); - verify(scheduler).init((RpcScheduler.Context) anyObject()); - RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT); - try { - rpcServer.start(); - verify(scheduler).start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - for (int i = 0; i < 10; i++) { - client.call( - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))), - md, param, md.getOutputType().toProto(), User.getCurrent(), - rpcServer.getListenerAddress()); - } - verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); - } finally { - rpcServer.stop(); - verify(scheduler).stop(); - } - } - - /** - * Tests that the rpc scheduler is called when requests arrive. - */ - @Test - public void testRpcSchedulerAsync() - throws IOException, InterruptedException, ServiceException { - RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); - RpcServer rpcServer = new TestRpcServer(scheduler); - verify(scheduler).init((RpcScheduler.Context) anyObject()); - AbstractRpcClient client = new AsyncRpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT, null); - try { - rpcServer.start(); - verify(scheduler).start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - ServerName serverName = ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()); - - for (int i = 0; i < 10; i++) { - BlockingRpcChannel channel = client.createBlockingRpcChannel( - serverName, User.getCurrent(), 0); - - channel.callBlockingMethod(md, - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))), - param, md.getOutputType().toProto()); - } - verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); - } finally { - rpcServer.stop(); - verify(scheduler).stop(); - } - } - - public static void main(String[] args) - throws IOException, SecurityException, NoSuchMethodException, InterruptedException { + public static void main(String[] args) throws IOException, SecurityException, + NoSuchMethodException, InterruptedException { if (args.length != 2) { System.out.println("Usage: TestIPC "); return; @@ -585,12 +112,12 @@ public class TestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); - Put p = new Put(kv.getRow()); + KeyValue kv = BIG_CELL; + Put p = new Put(CellUtil.cloneRow(kv)); for (int i = 0; i < cellcount; i++) { p.add(kv); } - RowMutations rm = new RowMutations(kv.getRow()); + RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); rm.add(p); try { rpcServer.start(); @@ -600,35 +127,36 @@ public class TestIPC { for (int i = 0; i < cycles; i++) { List cells = new ArrayList(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction( - HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), - ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME). - setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); + ClientProtos.RegionAction.Builder builder = + RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, + RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), + MutationProto.newBuilder()); + builder.setRegion(RegionSpecifier + .newBuilder() + .setType(RegionSpecifierType.REGION_NAME) + .setValue( + ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); if (i % 100000 == 0) { LOG.info("" + i); // Uncomment this for a thread dump every so often. // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); + // "Thread dump " + Thread.currentThread().getName()); } PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - Pair response = - client.call(pcrc, md, builder.build(), param, user, address); + // Pair response = + client.call(pcrc, md, builder.build(), param, user, address); /* - int count = 0; - while (p.getSecond().advance()) { - count++; - } - assertEquals(cells.size(), count);*/ + * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), + * count); + */ } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + - (System.currentTimeMillis() - startTime) + "ms"); + LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + + (System.currentTimeMillis() - startTime) + "ms"); } finally { client.close(); rpcServer.stop(); } } + } -- 1.9.1