diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index b2f133b..04b9a24 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -221,7 +221,6 @@ static Rpc createEmbedded(RpcDispatcher dispatcher) { private final Channel channel; private final Collection listeners; private final EventExecutorGroup egroup; - private final Object channelLock; private volatile RpcDispatcher dispatcher; private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) { @@ -229,7 +228,6 @@ private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) Preconditions.checkArgument(egroup != null); this.config = config; this.channel = channel; - this.channelLock = new Object(); this.dispatcher = null; this.egroup = egroup; this.listeners = Lists.newLinkedList(); @@ -271,13 +269,13 @@ public boolean isActive() { * @param retType Type of expected reply. * @return A future used to monitor the operation. */ - public Future call(Object msg, Class retType) { + public Future call(final Object msg, Class retType) { Preconditions.checkArgument(msg != null); Preconditions.checkState(channel.isActive(), "RPC channel is closed."); try { final long id = rpcId.getAndIncrement(); final Promise promise = createPromise(); - ChannelFutureListener listener = new ChannelFutureListener() { + final ChannelFutureListener listener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { @@ -290,10 +288,13 @@ public void operationComplete(ChannelFuture cf) { }; dispatcher.registerRpc(id, promise, msg.getClass().getName()); - synchronized (channelLock) { - channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); - channel.writeAndFlush(msg).addListener(listener); - } + channel.eventLoop().submit(new Runnable() { + @Override + public void run() { + channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); + channel.writeAndFlush(msg).addListener(listener); + } + }); return promise; } catch (Exception e) { throw Throwables.propagate(e); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index 77c3d02..5a4801c 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -282,6 +282,9 @@ private void transfer(Rpc serverRpc, Rpc clientRpc) { EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel(); EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel(); + server.runPendingTasks(); + client.runPendingTasks(); + int count = 0; while (!client.outboundMessages().isEmpty()) { server.writeInbound(client.readOutbound());