diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 8414290..3fb547b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -262,7 +262,7 @@ public class AsyncRpcChannel { handleSaslConnectionFailure(retryCount, cause, realTicket); // Try to reconnect - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); @@ -289,7 +289,7 @@ public class AsyncRpcChannel { */ private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { if (connectCounter < client.maxRetries) { - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); } @@ -339,7 +339,7 @@ public class AsyncRpcChannel { // Add timeout for cleanup if none is present if (cleanupTimer == null && call.getRpcTimeout() > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(), + client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); } if (!connected) { @@ -601,7 +601,7 @@ public class AsyncRpcChannel { } if (nextCleanupTaskDelay > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay, + client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); } else { cleanupTimer = null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 192e583..5fc5fed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -29,6 +29,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; @@ -69,8 +71,9 @@ public class AsyncRpcClient extends AbstractRpcClient { public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; - public static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(100, TimeUnit.MILLISECONDS); + private static final HashedWheelTimer WHEEL_TIMER = + new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), + 100, TimeUnit.MILLISECONDS); private static final ChannelInitializer DEFAULT_CHANNEL_INITIALIZER = new ChannelInitializer() { @@ -412,4 +415,8 @@ public class AsyncRpcClient extends AbstractRpcClient { this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); } } + + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + return WHEEL_TIMER.newTimeout(task, delay, unit); + } }