diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index 0489684..5dde16c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -221,7 +221,6 @@ static Rpc createEmbedded(RpcDispatcher dispatcher) { private final Channel channel; private final Collection listeners; private final EventExecutorGroup egroup; - private final Object channelLock; private volatile RpcDispatcher dispatcher; private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) { @@ -229,7 +228,6 @@ private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) Preconditions.checkArgument(egroup != null); this.config = config; this.channel = channel; - this.channelLock = new Object(); this.dispatcher = null; this.egroup = egroup; this.listeners = Lists.newLinkedList(); @@ -271,13 +269,13 @@ public boolean isActive() { * @param retType Type of expected reply. * @return A future used to monitor the operation. */ - public Future call(Object msg, Class retType) { + public Future call(final Object msg, Class retType) { Preconditions.checkArgument(msg != null); Preconditions.checkState(channel.isActive(), "RPC channel is closed."); try { final long id = rpcId.getAndIncrement(); final Promise promise = createPromise(); - ChannelFutureListener listener = new ChannelFutureListener() { + final ChannelFutureListener listener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { @@ -290,10 +288,13 @@ public void operationComplete(ChannelFuture cf) { }; dispatcher.registerRpc(id, promise, msg.getClass().getName()); - synchronized (channelLock) { - channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); - channel.writeAndFlush(msg).addListener(listener); - } + channel.eventLoop().submit(new Runnable() { + @Override + public void run() { + channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); + channel.writeAndFlush(msg).addListener(listener); + } + }); return promise; } catch (Exception e) { throw Throwables.propagate(e); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java index ebafd13..2b6ab29 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java @@ -152,12 +152,7 @@ private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc r @Override public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("[%s] Caught exception in channel pipeline.", name()), cause); - } else { - LOG.info("[{}] Closing channel due to exception in pipeline ({}).", name(), - cause.getMessage()); - } + LOG.error(String.format("[%s] Closing channel due to exception in pipeline.", name()), cause); if (lastHeader != null) { // There's an RPC waiting for a reply. Exception was most probably caught while processing diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index 77c3d02..5a4801c 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -282,6 +282,9 @@ private void transfer(Rpc serverRpc, Rpc clientRpc) { EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel(); EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel(); + server.runPendingTasks(); + client.runPendingTasks(); + int count = 0; while (!client.outboundMessages().isEmpty()) { server.writeInbound(client.readOutbound());