From 7b7a2650798a9ebd65cd36fa6ce67ded8f1981ae Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 26 Aug 2016 15:10:37 +0800 Subject: [PATCH] HBASE-16435 Implement RpcChannel for RpcClientImpl --- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 291 ++++++++++++--------- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 3 +- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 171 +++--------- .../java/org/apache/hadoop/hbase/ipc/Call.java | 80 +++--- .../hbase/ipc/PayloadCarryingRpcController.java | 28 +- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 6 +- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 181 ++++++------- .../hadoop/hbase/client/TestClientTimeouts.java | 5 +- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 128 +++++++-- .../org/apache/hadoop/hbase/ipc/TestAsyncIPC.java | 8 +- .../java/org/apache/hadoop/hbase/ipc/TestIPC.java | 10 +- .../hadoop/hbase/ipc/TestRpcHandlerException.java | 20 +- 12 files changed, 454 insertions(+), 477 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 3d3339a..bd06107 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -19,23 +19,29 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.netty.util.HashedWheelTimer; + import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -46,8 +52,8 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.compress.CompressionCodec; /** @@ -58,6 +64,9 @@ public abstract class AbstractRpcClient implements RpcClient { // Log level is being changed in tests public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class); + protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( + Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), 10, TimeUnit.MILLISECONDS); + protected final Configuration conf; protected String clusterId; protected final SocketAddress localAddr; @@ -68,7 +77,7 @@ public abstract class AbstractRpcClient implements RpcClient { protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this // time (in ms), it will be closed at any moment. - protected final int maxRetries; //the max. no. of retries for socket connections + protected final int maxRetries; // the max. no. of retries for socket connections protected final long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -82,7 +91,6 @@ public abstract class AbstractRpcClient implements RpcClient { /** * Construct an IPC client for the cluster clusterId - * * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. @@ -95,7 +103,7 @@ public abstract class AbstractRpcClient implements RpcClient { this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); this.ipcUtil = new IPCUtil(conf); @@ -105,7 +113,7 @@ public abstract class AbstractRpcClient implements RpcClient { this.codec = getCodec(); this.compressor = getCompressor(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); @@ -113,23 +121,19 @@ public abstract class AbstractRpcClient implements RpcClient { // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { - LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + - ", tcpKeepAlive=" + this.tcpKeepAlive + - ", tcpNoDelay=" + this.tcpNoDelay + - ", connectTO=" + this.connectTO + - ", readTO=" + this.readTO + - ", writeTO=" + this.writeTO + - ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + - ", maxRetries=" + this.maxRetries + - ", fallbackAllowed=" + this.fallbackAllowed + - ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO + + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" + + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" + + this.fallbackAllowed + ", bind address=" + + (this.localAddr != null ? this.localAddr : "null")); } } @VisibleForTesting public static String getDefaultCodec(final Configuration c) { // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because - // Configuration will complain -- then no default codec (and we'll pb everything). Else + // Configuration will complain -- then no default codec (and we'll pb everything). Else // default is KeyValueCodec return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); } @@ -146,7 +150,7 @@ public abstract class AbstractRpcClient implements RpcClient { return null; } try { - return (Codec)Class.forName(className).newInstance(); + return (Codec) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed getting codec " + className, e); } @@ -168,38 +172,32 @@ public abstract class AbstractRpcClient implements RpcClient { return null; } try { - return (CompressionCodec)Class.forName(className).newInstance(); + return (CompressionCodec) Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed getting compressor " + className, e); } } /** - * Return the pool type specified in the configuration, which must be set to - * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or - * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, - * otherwise default to the former. - * - * For applications with many user threads, use a small round-robin pool. For - * applications with few user threads, you may want to try using a - * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} - * instances should not exceed the operating system's hard limit on the number of - * connections. - * + * Return the pool type specified in the configuration, which must be set to either + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the + * former. For applications with many user threads, use a small round-robin pool. For applications + * with few user threads, you may want to try using a thread-local pool. In any case, the number + * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating + * system's hard limit on the number of connections. * @param config configuration * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} */ protected static PoolMap.PoolType getPoolType(Configuration config) { - return PoolMap.PoolType - .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, - PoolMap.PoolType.ThreadLocal); + return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal); } /** - * Return the pool size specified in the configuration, which is applicable only if - * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. - * + * Return the pool size specified in the configuration, which is applicable only if the pool type + * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. * @param config configuration * @return the maximum pool size */ @@ -210,86 +208,73 @@ public abstract class AbstractRpcClient implements RpcClient { /** * Make a blocking call. Throws exceptions if there are network problems or if the remote code * threw an exception. - * * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so - * will be a - * new Connection each time. + * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a + * new Connection each time. * @return A pair with the Message response and the Cell data (if any). */ Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, Message param, Message returnType, final User ticket, final InetSocketAddress isa) throws ServiceException { - if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); - } - - Pair val; + BlockingRpcCallback done = new BlockingRpcCallback<>(); + callMethod(md, pcrc, param, returnType, ticket, isa, done); + Message val; try { - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); - val = call(pcrc, md, param, returnType, ticket, isa, cs); - // Shove the results into controller so can be carried across the proxy/pb service void. - pcrc.setCellScanner(val.getSecond()); - - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(md, param, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); - } - return val.getFirst(); - } catch (Throwable e) { + val = done.get(); + } catch (IOException e) { throw new ServiceException(e); } + if (pcrc.failed()) { + throw new ServiceException(pcrc.getFailed()); + } else { + return val; + } } - /** - * Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so - * will be a - * new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if call is interrupted - * @throws java.io.IOException if transport failed - */ - protected abstract Pair call(PayloadCarryingRpcController pcrc, - Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress isa, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException; + void callMethod(final Descriptors.MethodDescriptor md, + final PayloadCarryingRpcController pcrc, final Message param, Message returnType, + final User ticket, final InetSocketAddress isa, final RpcCallback done) { + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + call(pcrc, md, param, returnType, ticket, isa, new RpcCallback() { + + @Override + public void run(Message parameter) { + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, param, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + } + done.run(parameter); + } + + }, cs); + } + + protected abstract void call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, + Message param, Message returnType, User ticket, InetSocketAddress isa, + RpcCallback callback, MetricsConnection.CallStats callStats); + + private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { + InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); + if (addr.isUnresolved()) { + throw new UnknownHostException("can not resolve " + sn.getServerName()); + } + return addr; + } @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, - int defaultOperationTimeout) throws UnknownHostException { - return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); + int rpcTimeout) throws UnknownHostException { + return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); } - /** - * Configure a payload carrying controller - * @param controller to configure - * @param channelOperationTimeout timeout for operation - * @return configured payload controller - */ - static PayloadCarryingRpcController configurePayloadCarryingRpcController( - RpcController controller, int channelOperationTimeout) { - PayloadCarryingRpcController pcrc; - if (controller != null && controller instanceof PayloadCarryingRpcController) { - pcrc = (PayloadCarryingRpcController) controller; - if (!pcrc.hasCallTimeout()) { - pcrc.setCallTimeout(channelOperationTimeout); - } - } else { - pcrc = new PayloadCarryingRpcController(); - pcrc.setCallTimeout(channelOperationTimeout); - } - return pcrc; + @Override + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) + throws UnknownHostException { + return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } /** @@ -305,17 +290,57 @@ public abstract class AbstractRpcClient implements RpcClient { protected IOException wrapException(InetSocketAddress addr, Exception exception) { if (exception instanceof ConnectException) { // connection refused; include the host:port in the error - return (ConnectException) new ConnectException("Call to " + addr - + " failed on connection exception: " + exception).initCause(exception); + return (ConnectException) new ConnectException( + "Call to " + addr + " failed on connection exception: " + exception).initCause(exception); } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr - + " failed because " + exception).initCause(exception); + return (SocketTimeoutException) new SocketTimeoutException( + "Call to " + addr + " failed because " + exception).initCause(exception); } else if (exception instanceof ConnectionClosingException) { - return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr - + " failed on local exception: " + exception).initCause(exception); + return (ConnectionClosingException) new ConnectionClosingException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); } else { - return (IOException) new IOException("Call to " + addr + " failed on local exception: " - + exception).initCause(exception); + return (IOException) new IOException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } + } + + private static class AbstractRpcChannel { + + protected final InetSocketAddress addr; + + protected final AbstractRpcClient rpcClient; + + protected final User ticket; + + protected final int rpcTimeout; + + protected AbstractRpcChannel(AbstractRpcClient rpcClient, InetSocketAddress addr, User ticket, + int rpcTimeout) { + this.addr = addr; + this.rpcClient = rpcClient; + this.ticket = ticket; + this.rpcTimeout = rpcTimeout; + } + + /** + * Configure a payload carrying controller + * @param controller to configure + * @param channelOperationTimeout timeout for operation + * @return configured payload controller + */ + protected PayloadCarryingRpcController configurePayloadCarryingRpcController( + RpcController controller) { + PayloadCarryingRpcController pcrc; + if (controller != null) { + pcrc = (PayloadCarryingRpcController) controller; + if (!pcrc.hasCallTimeout()) { + pcrc.setCallTimeout(rpcTimeout); + } + } else { + pcrc = new PayloadCarryingRpcController(); + pcrc.setCallTimeout(rpcTimeout); + } + return pcrc; } } @@ -323,35 +348,41 @@ public abstract class AbstractRpcClient implements RpcClient { * Blocking rpc channel that goes via hbase rpc. */ @VisibleForTesting - public static class BlockingRpcChannelImplementation implements BlockingRpcChannel { - private final InetSocketAddress isa; - private final AbstractRpcClient rpcClient; - private final User ticket; - private final int channelOperationTimeout; + public static class BlockingRpcChannelImplementation extends AbstractRpcChannel + implements BlockingRpcChannel { - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient, - final ServerName sn, final User ticket, int channelOperationTimeout) - throws UnknownHostException { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - if (this.isa.isUnresolved()) { - throw new UnknownHostException(sn.getHostname()); - } - this.rpcClient = rpcClient; - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; + protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, InetSocketAddress addr, + User ticket, int rpcTimeout) { + super(rpcClient, addr, ticket, rpcTimeout); } @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { - PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( - controller, - channelOperationTimeout); + return rpcClient.callBlockingMethod(md, configurePayloadCarryingRpcController(controller), + param, returnType, ticket, addr); + } + } - return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa); + /** + * Async rpc channel that goes via hbase rpc. + */ + public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel { + + protected RpcChannelImplementation(AbstractRpcClient rpcClient, InetSocketAddress addr, + User ticket, int rpcTimeout) throws UnknownHostException { + super(rpcClient, addr, ticket, rpcTimeout); + } + + @Override + public void callMethod(MethodDescriptor md, RpcController controller, Message param, + Message returnType, RpcCallback done) { + // This method does not throw any exceptions, so the caller must provide a + // PayloadCarryingRpcController which is used to pass the exceptions. + this.rpcClient.callMethod(md, + configurePayloadCarryingRpcController(Preconditions.checkNotNull(controller, + "RpcController can not be null for async rpc call")), + param, returnType, ticket, addr, done); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 9550f2a..a0d8f11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -31,6 +31,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.ConnectException; @@ -323,7 +324,7 @@ public class AsyncRpcChannel { * @param priority for request * @return Promise for the response Message */ - public io.netty.util.concurrent.Promise callMethod( + public Promise callMethod( final Descriptors.MethodDescriptor method, final Message request,final CellScanner cellScanner, R responsePrototype, MessageConverter messageConverter, IOExceptionConverter diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 3d343b4..331b040 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -34,7 +32,6 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -45,9 +42,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -59,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.client.MetricsConnection.CallStats; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; @@ -77,10 +73,6 @@ public class AsyncRpcClient extends AbstractRpcClient { public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; - private static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), - 100, TimeUnit.MILLISECONDS); - private static final ChannelInitializer DEFAULT_CHANNEL_INITIALIZER = new ChannelInitializer() { @Override @@ -216,55 +208,44 @@ public class AsyncRpcClient extends AbstractRpcClient { this(configuration, clusterId, localAddress, metrics, null); } - /** - * Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new - * instance of User each time so will be a new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if call is interrupted - * @throws java.io.IOException if a connection failure is encountered - */ @Override - protected Pair call(PayloadCarryingRpcController pcrc, - Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress addr, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException { - if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); - } - final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); + protected void call(final PayloadCarryingRpcController pcrc, MethodDescriptor md, Message param, + Message returnType, User ticket, InetSocketAddress addr, final RpcCallback callback, + CallStats callStats) { + try { + final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - final Promise promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), + final Promise promise = connection.callMethod(md, param, pcrc.cellScanner(), + returnType, getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority()); - pcrc.notifyOnCancel(new RpcCallback() { - @Override - public void run(Object parameter) { - // Will automatically fail the promise with CancellationException - promise.cancel(true); - } - }); + pcrc.notifyOnCancel(new RpcCallback() { + @Override + public void run(Object parameter) { + // Will automatically fail the promise with CancellationException + promise.cancel(true); + } + }); + promise.addListener(new FutureListener() { - long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; - try { - Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); - return new Pair<>(response, pcrc.cellScanner()); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw wrapException(addr, (Exception) e.getCause()); - } - } catch (TimeoutException e) { - CallTimeoutException cte = new CallTimeoutException(promise.toString()); - throw wrapException(addr, cte); + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + callback.run(future.getNow()); + } else { + Throwable error = future.cause(); + if (error instanceof IOException) { + pcrc.setFailed((IOException) error); + } else { + pcrc.setFailed(new IOException(error)); + } + callback.run(null); + } + } + }); + } catch (IOException e) { + pcrc.setFailed(e); + callback.run(null); } } @@ -280,52 +261,6 @@ public class AsyncRpcClient extends AbstractRpcClient { }; } - /** - * Call method async - */ - private void callMethod(final Descriptors.MethodDescriptor md, - final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket, - InetSocketAddress addr, final RpcCallback done) { - final AsyncRpcChannel connection; - try { - connection = createRpcChannel(md.getService().getName(), addr, ticket); - - FutureListener listener = - new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } else { - try { - done.run(future.get()); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } catch (InterruptedException e) { - pcrc.setFailed(new IOException(e)); - } - } - } - }; - connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, - pcrc.getCallTimeout(), pcrc.getPriority()) - .addListener(listener); - } catch (StoppedRpcClientException|FailedServerException e) { - pcrc.setFailed(e); - } - } - private boolean closed = false; /** @@ -461,42 +396,6 @@ public class AsyncRpcClient extends AbstractRpcClient { } } - @Override - public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) { - return new RpcChannelImplementation(this, sn, user, rpcTimeout); - } - - /** - * Blocking rpc channel that goes via hbase rpc. - */ - @VisibleForTesting - public static class RpcChannelImplementation implements RpcChannel { - private final InetSocketAddress isa; - private final AsyncRpcClient rpcClient; - private final User ticket; - private final int channelOperationTimeout; - - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected RpcChannelImplementation(final AsyncRpcClient rpcClient, - final ServerName sn, final User ticket, int channelOperationTimeout) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.rpcClient = rpcClient; - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; - } - - @Override - public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, - Message param, Message returnType, RpcCallback done) { - PayloadCarryingRpcController pcrc = - configurePayloadCarryingRpcController(controller, channelOperationTimeout); - - this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); - } - } - /** * Get a new timeout on this RPC client * @param task to run at timeout diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 73bc0e2..41148e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; + +import io.netty.util.Timeout; import java.io.IOException; @@ -33,11 +36,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; public class Call { final int id; // call id final Message param; // rpc request method param object - /** - * Optionally has cells when making call. Optionally has cells set on response. Used - * passing cells to the rpc and receiving the response. - */ - CellScanner cells; + /** Optionally has cells when making call. */ + final CellScanner requestCells; + + /** Optionally has cells set on response. */ + CellScanner responseCells; Message response; // value, null if error // The return type. Used to create shell into which we deserialize the response if any. Message responseDefaultType; @@ -46,18 +49,21 @@ public class Call { final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. final MetricsConnection.CallStats callStats; + final RpcCallback callback; + Timeout timeoutTask; protected Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, - MetricsConnection.CallStats callStats) { + RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; - this.cells = cells; + this.requestCells = cells; this.callStats = callStats; this.callStats.setStartTime(EnvironmentEdgeManager.currentTime()); this.responseDefaultType = responseDefaultType; this.id = id; this.timeout = timeout; + this.callback = callback; } /** @@ -80,48 +86,62 @@ public class Call { } } - public int remainingTime() { - if (timeout == 0) { - return Integer.MAX_VALUE; - } - - int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime()); - return remaining > 0 ? remaining : 0; - } - @Override public String toString() { return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}"; } - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller + /** + * called from timeoutTask, prevent self cancel + */ + public void setTimeout(IOException error) { + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; + } + callback.run(this); + } + + private void callComplete() { + if (timeoutTask != null) { + timeoutTask.cancel(); + } + callback.run(this); } - /** Set the exception when there is an error. - * Notify the caller the call is done. - * + /** + * Set the exception when there is an error. Notify the caller the call is done. * @param error exception thrown by the call; either local or remote */ public void setException(IOException error) { - this.error = error; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; + } callComplete(); } /** - * Set the return value when there is no error. - * Notify the caller the call is done. - * + * Set the return value when there is no error. Notify the caller the call is done. * @param response return value of the call. * @param cells Can be null */ public void setResponse(Message response, final CellScanner cells) { - this.response = response; - this.cells = cells; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.response = response; + this.responseCells = cells; + } callComplete(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index d9877dc..641eb4a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -46,7 +46,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl protected volatile Integer callTimeout; protected volatile boolean cancelled = false; protected final AtomicReference> cancellationCb = new AtomicReference<>(null); - protected final AtomicReference> failureCb = new AtomicReference<>(null); private IOException exception; public static final int PRIORITY_UNSET = -1; @@ -118,7 +117,6 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl cellScanner = null; exception = null; cancelled = false; - failureCb.set(null); cancellationCb.set(null); callTimeout = null; } @@ -170,25 +168,9 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl } } - /** - * Notify a callback on error. - * For use in async rpc clients - * - * @param failureCb the callback to call on error - */ - public void notifyOnFail(RpcCallback failureCb) { - this.failureCb.set(failureCb); - if (this.exception != null) { - failureCb.run(this.exception); - } - } - @Override public void setFailed(String reason) { this.exception = new IOException(reason); - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } } /** @@ -199,9 +181,13 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl */ public void setFailed(IOException e) { this.exception = e; - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } + } + + /** + * @return the exception + */ + public IOException getFailed() { + return exception; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index a8ec628..26a5739 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -30,7 +30,8 @@ import org.apache.hadoop.hbase.security.User; /** * Interface for RpcClient implementations so ConnectionManager can handle it. */ -@InterfaceAudience.Private public interface RpcClient extends Closeable { +@InterfaceAudience.Private +public interface RpcClient extends Closeable { String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; int FAILED_SERVER_EXPIRY_DEFAULT = 2000; String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose"; @@ -79,7 +80,8 @@ import org.apache.hadoop.hbase.security.User; * * @return A rpc channel that goes via this rpc client instance. */ - RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout); + RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) + throws IOException; /** * Interrupt the connections to the given server. This should be called if the server diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 37b9afd..5966849 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -22,7 +22,9 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -51,6 +53,7 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -66,6 +69,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.client.MetricsConnection.CallStats; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -86,7 +90,6 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -204,9 +207,9 @@ public class RpcClientImpl extends AbstractRpcClient { protected final BlockingQueue callsToWrite; - public CallFuture sendCall(Call call, int priority, Span span) + public CallFuture sendCall(final Call call, int priority, Span span) throws InterruptedException, IOException { - CallFuture cts = new CallFuture(call, priority, span); + final CallFuture cts = new CallFuture(call, priority, span); if (!callsToWrite.offer(cts)) { throw new IOException("Can't add the call " + call.id + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); @@ -237,7 +240,7 @@ public class RpcClientImpl extends AbstractRpcClient { // By removing the call from the expected call list, we make the list smaller, but // it means as well that we don't know how many calls we cancelled. calls.remove(cts.call.id); - cts.call.callComplete(); + cts.call.setResponse(null, null); } /** @@ -896,7 +899,7 @@ public class RpcClientImpl extends AbstractRpcClient { } builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.requestCells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); cellBlockBuilder.setLength(cellBlock.limit()); @@ -1218,119 +1221,79 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. + * Make a call, passing param, to the IPC server running at {@code address} which is + * servicing the {@code protocol} protocol, with the {@code ticket} credentials, returning the + * value. Exception and Cell data(if any) will be filled into the PayloadCarryingRpcController + * passed in. + *

* @param ticket Be careful which ticket you pass. A new user will mean a new Connection. * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if the call is interrupted - * @throws IOException if something fails on the connection */ @Override - protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, - Message param, Message returnType, User ticket, InetSocketAddress addr, - MetricsConnection.CallStats callStats) - throws IOException, InterruptedException { - if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); - } - - Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats); - - return new Pair<>(call.response, call.cells); - } - - - /** - * Make a call, passing param, to the IPC server running at - * address which is servicing the protocol protocol, - * with the ticket credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a - * new Connection each time. - * @return A Call - * @throws InterruptedException if the call is interrupted - * @throws IOException if something fails on the connection - */ - private Call call(MethodDescriptor method, Message request, - R responsePrototype, PayloadCarryingRpcController pcrc, User ticket, - InetSocketAddress addr, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException { - + protected void call(final PayloadCarryingRpcController pcrc, MethodDescriptor md, Message param, + Message returnType, User ticket, final InetSocketAddress addr, final RpcCallback callback, + CallStats callStats) { CellScanner cells = pcrc.cellScanner(); - - final Call call = new Call(callIdCnt.getAndIncrement(), method, request, cells, - responsePrototype, pcrc.getCallTimeout(), callStats); - - final Connection connection = getConnection(ticket, call, addr); - final CallFuture cts; - if (connection.callSender != null) { - cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan()); - pcrc.notifyOnCancel(new RpcCallback() { - @Override - public void run(Object parameter) { - connection.callSender.remove(cts); - } - }); - if (pcrc.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.callComplete(); - return call; - } - } else { - cts = null; - connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan()); - } - - while (!call.done) { - if (call.checkAndSetTimeout()) { - if (cts != null){ - connection.callSender.remove(cts); - } - break; - } - if (connection.shouldCloseConnection.get()) { - throw new ConnectionClosingException("Call id=" + call.id + - " on server " + addr + " aborted: connection is closing"); - } - try { - synchronized (call) { - if (call.done){ - break; + final Call call = new Call(callIdCnt.getAndIncrement(), md, param, cells, returnType, + pcrc.getCallTimeout(), new RpcCallback() { + + @Override + public void run(Call c) { + if (c.error != null) { + if (c.error instanceof RemoteException) { + c.error.fillInStackTrace(); + pcrc.setFailed(c.error); + } else { + // local exception + pcrc.setFailed(wrapException(addr, c.error)); + } + } else if (c.responseCells != null) { + pcrc.setCellScanner(c.responseCells); + } + callback.run(c.response); } - call.wait(Math.min(call.remainingTime(), 1000) + 1); - } - } catch (InterruptedException e) { - call.setException(new InterruptedIOException()); - if (cts != null) { + }, callStats); + try { + final Connection connection = getConnection(ticket, call, addr); + if (call.timeout > 0) { + call.timeoutTask = WHEEL_TIMER.newTimeout(new TimerTask() { + + @Override + public void run(Timeout timeout) throws Exception { + call.setTimeout(new IOException( + "Timed out waiting for response, timeout = " + call.timeout + "ms, waitTime = " + + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + "ms")); + // there is a race that if the call is timed out before adding to calls, then the call + // maybe left in the calls for ever if the remote server does not return. + connection.calls.remove(call.id); + } + }, call.timeout, TimeUnit.MILLISECONDS); + } + if (connection.callSender != null) { + cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan()); + pcrc.notifyOnCancel(new RpcCallback() { + @Override + public void run(Object parameter) { + connection.callSender.remove(cts); + } + }); + if (pcrc.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) connection.callSender.remove(cts); } - throw e; - } - } - - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); + } else { + cts = null; + connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan()); + } + } catch (IOException e) { + pcrc.setFailed(e); + callback.run(null); + } catch (InterruptedException e) { + pcrc.setFailed((IOException) new InterruptedIOException().initCause(e)); + callback.run(null); } - - return call; - } - - @Override - public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) { - throw new UnsupportedOperationException(); } /** @@ -1379,4 +1342,6 @@ public class RpcClientImpl extends AbstractRpcClient { return connection; } + + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index f49c558..f085d97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -154,8 +155,8 @@ public class TestClientTimeouts { private static AtomicInteger invokations = new AtomicInteger(); RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) throws UnknownHostException { - super(rpcClient, sn, ticket, rpcTimeout); + final User ticket, final int rpcTimeout) { + super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 4cfa25c..186a2e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -31,6 +29,7 @@ import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -41,6 +40,7 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -167,9 +167,10 @@ public abstract class AbstractTestIPC { * Ensure we do not HAVE TO HAVE a codec. * @throws InterruptedException * @throws IOException + * @throws ServiceException */ @Test - public void testNoCodec() throws InterruptedException, IOException { + public void testNoCodec() throws InterruptedException, IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { @@ -181,12 +182,12 @@ public abstract class AbstractTestIPC { if (address == null) { throw new IOException("Listener channel is closed"); } - Pair r = - client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); - assertTrue(r.getSecond() == null); + PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + Message ret = client.callBlockingMethod(md, pcrc, param, + EchoResponseProto.getDefaultInstance(), User.getCurrent(), address); + assertNull(pcrc.cellScanner()); // Silly assertion that the message is in the returned pb. - assertTrue(r.getFirst().toString().contains(message)); + assertTrue(ret.toString().contains(message)); } finally { rpcServer.stop(); } @@ -224,12 +225,11 @@ public abstract class AbstractTestIPC { if (address == null) { throw new IOException("Listener channel is closed"); } - Pair r = - client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, pcrc, param, EchoResponseProto.getDefaultInstance(), + User.getCurrent(), address); int index = 0; - while (r.getSecond().advance()) { - assertTrue(CELL.equals(r.getSecond().current())); + while (pcrc.cellScanner().advance()) { + assertTrue(CELL.equals(pcrc.cellScanner().current())); index++; } assertEquals(count, index); @@ -253,8 +253,8 @@ public abstract class AbstractTestIPC { if (address == null) { throw new IOException("Listener channel is closed"); } - client.call(null, md, param, null, User.getCurrent(), address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, new PayloadCarryingRpcController(), param, + EchoResponseProto.getDefaultInstance(), User.getCurrent(), address); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -264,9 +264,10 @@ public abstract class AbstractTestIPC { } } - /** Tests that the rpc scheduler is called when requests arrive. */ + /** Tests that the rpc scheduler is called when requests arrive. + * @throws ServiceException */ @Test - public void testRpcScheduler() throws IOException, InterruptedException { + public void testRpcScheduler() throws IOException, InterruptedException, ServiceException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcServer rpcServer = new TestRpcServer(scheduler, CONF); verify(scheduler).init((RpcScheduler.Context) anyObject()); @@ -281,10 +282,10 @@ public abstract class AbstractTestIPC { throw new IOException("Listener channel is closed"); } for (int i = 0; i < 10; i++) { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, + new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), + param, md.getOutputType().toProto(), User.getCurrent(), address); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { @@ -311,12 +312,12 @@ public abstract class AbstractTestIPC { throw new IOException("Listener channel is closed"); } try { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, + new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), + param, md.getOutputType().toProto(), User.getCurrent(), address); fail("RPC should have failed because it exceeds max request size"); - } catch(IOException ex) { + } catch(ServiceException ex) { // pass } } finally { @@ -409,4 +410,75 @@ public abstract class AbstractTestIPC { .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) .getCause() instanceof CallTimeoutException); } + + @Test + public void testAsyncProtobufConnectionSetup() throws Exception { + TestRpcServer rpcServer = new TestRpcServer(); + try (RpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + + RpcChannel channel = client.createRpcChannel( + ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0); + + final AtomicBoolean done = new AtomicBoolean(false); + + channel.callMethod(md, new PayloadCarryingRpcController(), param, + md.getOutputType().toProto(), new com.google.protobuf.RpcCallback() { + @Override + public void run(Message parameter) { + done.set(true); + } + }); + + TEST_UTIL.waitFor(1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return done.get(); + } + }); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testAsyncEcho() throws IOException { + Configuration conf = HBaseConfiguration.create(); + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient client = createRpcClient(conf)) { + rpcServer.start(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + int num = 10; + List pcrcList = new ArrayList<>(); + List> callbackList = new ArrayList<>(); + for (int i = 0; i < num; i++) { + PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + BlockingRpcCallback done = new BlockingRpcCallback<>(); + client.callMethod(md, pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), + EchoResponseProto.getDefaultInstance(), User.getCurrent(), address, done); + pcrcList.add(pcrc); + callbackList.add(done); + } + for (int i = 0; i < num; i++) { + PayloadCarryingRpcController pcrc = pcrcList.get(i); + assertFalse(pcrc.failed()); + assertNull(pcrc.cellScanner()); + assertEquals("hello-" + i, ((EchoResponseProto) callbackList.get(i).get()).getMessage()); + } + } finally { + rpcServer.stop(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index 7efe198..78d059f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.ServiceException; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -42,7 +44,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -137,7 +138,7 @@ public class TestAsyncIPC extends AbstractTestIPC { } public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { + NoSuchMethodException, InterruptedException, ServiceException { if (args.length != 2) { System.out.println("Usage: TestAsyncIPC "); return; @@ -186,8 +187,7 @@ public class TestAsyncIPC extends AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); // Pair response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, pcrc, builder.build(), param, user, address); /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 56de07d..e815114 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.spy; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; @@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -98,7 +99,7 @@ public class TestIPC extends AbstractTestIPC { } public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { + NoSuchMethodException, InterruptedException, ServiceException { if (args.length != 2) { System.out.println("Usage: TestIPC "); return; @@ -147,9 +148,8 @@ public class TestIPC extends AbstractTestIPC { } PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); + client.callBlockingMethod(md, pcrc, builder.build(), param, user, address); + /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index a37ba11..dad6bd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.mockito.Mockito.mock; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; @@ -24,6 +26,12 @@ import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,7 +42,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -50,13 +57,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import static org.mockito.Mockito.mock; - @Category({RPCTests.class, SmallTests.class}) public class TestRpcHandlerException { private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class); @@ -184,8 +184,8 @@ public class TestRpcHandlerException { if (address == null) { throw new IOException("Listener channel is closed"); } - client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - address, new MetricsConnection.CallStats()); + client.callBlockingMethod(md, controller, param, md.getOutputType().toProto(), + User.getCurrent(), address); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally { -- 2.7.4