diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 8414290..3fb547b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -262,7 +262,7 @@ public class AsyncRpcChannel { handleSaslConnectionFailure(retryCount, cause, realTicket); // Try to reconnect - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); @@ -289,7 +289,7 @@ public class AsyncRpcChannel { */ private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { if (connectCounter < client.maxRetries) { - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); } @@ -339,7 +339,7 @@ public class AsyncRpcChannel { // Add timeout for cleanup if none is present if (cleanupTimer == null && call.getRpcTimeout() > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(), + client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); } if (!connected) { @@ -601,7 +601,7 @@ public class AsyncRpcChannel { } if (nextCleanupTaskDelay > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay, + client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); } else { cleanupTimer = null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 192e583..b2e2341 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -17,6 +17,22 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; @@ -29,19 +45,11 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; @@ -53,13 +61,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - /** * Netty client for the requests and responses */ @@ -69,9 +70,6 @@ public class AsyncRpcClient extends AbstractRpcClient { public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; - public static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(100, TimeUnit.MILLISECONDS); - private static final ChannelInitializer DEFAULT_CHANNEL_INITIALIZER = new ChannelInitializer() { @Override @@ -88,6 +86,7 @@ public class AsyncRpcClient extends AbstractRpcClient { final FailedServers failedServers; private final Bootstrap bootstrap; + private final HashedWheelTimer timer; /** * Constructor for tests @@ -113,16 +112,24 @@ public class AsyncRpcClient extends AbstractRpcClient { // although it is not extensively tested. boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); + this.timer = new HashedWheelTimer(Threads + .newDaemonThreadFactory( + "AsyncRpcChannel-" + clusterId + "-" + localAddress.toString() + "-timers"), + 100, TimeUnit.MILLISECONDS); + + final ThreadFactory ourThreadFactory = Threads + .newDaemonThreadFactory( + "AsyncRpcChannel-" + clusterId + "-" + localAddress.toString() + "-workers"); // Use the faster native epoll transport mechanism on linux if enabled Class socketChannelClass; if (epollEnabled && JVM.isLinux()) { socketChannelClass = EpollSocketChannel.class; this.eventLoopGroup = - new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + new EpollEventLoopGroup(maxThreads, ourThreadFactory); } else { socketChannelClass = NioSocketChannel.class; this.eventLoopGroup = - new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); + new NioEventLoopGroup(maxThreads, ourThreadFactory); } this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); @@ -244,13 +251,17 @@ public class AsyncRpcClient extends AbstractRpcClient { LOG.debug("Stopping async HBase RPC client"); } - synchronized (connections) { - for (AsyncRpcChannel conn : connections.values()) { - conn.close(null); + try { + synchronized (connections) { + for (AsyncRpcChannel conn : connections.values()) { + conn.close(null); + } } - } - eventLoopGroup.shutdownGracefully(); + eventLoopGroup.shutdownGracefully(); + } finally { + timer.stop(); + } } /** @@ -412,4 +423,9 @@ public class AsyncRpcClient extends AbstractRpcClient { this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); } } + + public Timeout newTimeout(TimerTask task, long delay, + TimeUnit unit) { + return timer.newTimeout(task, delay, unit); + } }