From 6da33a565f811a86c0ef3f98c86fdb47806aa77b Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 26 Feb 2015 21:47:27 +0800 Subject: [PATCH] HBASE-13097 Use same EventLoopGroup for different AsyncRpcClients if possible --- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 95 ++++++++++++++++------ 1 file changed, 69 insertions(+), 26 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 192e583..ffec7d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -63,11 +64,12 @@ import com.google.protobuf.RpcController; /** * Netty client for the requests and responses */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class AsyncRpcClient extends AbstractRpcClient { public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; - public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; + public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; + public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; public static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(100, TimeUnit.MILLISECONDS); @@ -82,13 +84,53 @@ public class AsyncRpcClient extends AbstractRpcClient { protected final AtomicInteger callIdCnt = new AtomicInteger(); - private final EventLoopGroup eventLoopGroup; private final PoolMap connections; final FailedServers failedServers; private final Bootstrap bootstrap; + private final boolean useGlobalEventLoopGroup; + + private static Pair> GLOBAL_EVENT_LOOP_GROUP; + + private synchronized static Pair> + getGlobalEventLoopGroup(Configuration conf) { + if (GLOBAL_EVENT_LOOP_GROUP == null) { + GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Create global event loop group " + + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName()); + } + } + return GLOBAL_EVENT_LOOP_GROUP; + } + + private static Pair> createEventLoopGroup( + Configuration conf) { + // Max amount of threads to use. 0 lets Netty decide based on amount of cores + int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); + + // Config to enable native transport. Does not seem to be stable at time of implementation + // although it is not extensively tested. + boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); + + // Use the faster native epoll transport mechanism on linux if enabled + if (epollEnabled && JVM.isLinux()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads); + } + return new Pair>(new EpollEventLoopGroup(maxThreads, + Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads); + } + return new Pair>(new NioEventLoopGroup(maxThreads, + Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class); + } + } + /** * Constructor for tests * @@ -106,23 +148,16 @@ public class AsyncRpcClient extends AbstractRpcClient { LOG.debug("Starting async Hbase RPC client"); } - // Max amount of threads to use. 0 lets Netty decide based on amount of cores - int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); - - // Config to enable native transport. Does not seem to be stable at time of implementation - // although it is not extensively tested. - boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); - - // Use the faster native epoll transport mechanism on linux if enabled - Class socketChannelClass; - if (epollEnabled && JVM.isLinux()) { - socketChannelClass = EpollSocketChannel.class; - this.eventLoopGroup = - new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + Pair> eventLoopGroupAndChannelClass; + this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true); + if (useGlobalEventLoopGroup) { + eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration); } else { - socketChannelClass = NioSocketChannel.class; - this.eventLoopGroup = - new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + eventLoopGroupAndChannelClass = createEventLoopGroup(configuration); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group " + + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName()); } this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); @@ -133,7 +168,8 @@ public class AsyncRpcClient extends AbstractRpcClient { // Configure the default bootstrap. this.bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroup).channel(socketChannelClass) + bootstrap.group(eventLoopGroupAndChannelClass.getFirst()) + .channel(eventLoopGroupAndChannelClass.getSecond()) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.TCP_NODELAY, tcpNoDelay) .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) @@ -236,6 +272,8 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + private boolean closed = false; + /** * Close netty */ @@ -245,12 +283,18 @@ public class AsyncRpcClient extends AbstractRpcClient { } synchronized (connections) { + if (closed) { + return; + } + closed = true; for (AsyncRpcChannel conn : connections.values()) { conn.close(null); } } - - eventLoopGroup.shutdownGracefully(); + // do not close global EventLoopGroup. + if (!useGlobalEventLoopGroup) { + bootstrap.group().shutdownGracefully(); + } } /** @@ -287,10 +331,6 @@ public class AsyncRpcClient extends AbstractRpcClient { */ private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, User ticket) throws StoppedRpcClientException, FailedServerException { - if (this.eventLoopGroup.isShuttingDown() || this.eventLoopGroup.isShutdown()) { - throw new StoppedRpcClientException(); - } - // Check if server is failed if (this.failedServers.isFailedServer(location)) { if (LOG.isDebugEnabled()) { @@ -305,6 +345,9 @@ public class AsyncRpcClient extends AbstractRpcClient { AsyncRpcChannel rpcChannel; synchronized (connections) { + if (closed) { + throw new StoppedRpcClientException(); + } rpcChannel = connections.get(hashCode); if (rpcChannel == null) { rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); -- 1.9.1