diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 1bfd9a6..a152eaa 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -1158,18 +1158,18 @@ public class RpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); + return; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); - if (expectedCall) call.setException(re); + call.setException(re); if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } } else { Message value = null; - // Call may be null because it may have timeout and been cleaned up on this side already - if (expectedCall && call.responseDefaultType != null) { + if (call.responseDefaultType != null) { Builder builder = call.responseDefaultType.newBuilderForType(); builder.mergeDelimitedFrom(in); value = builder.build(); @@ -1181,9 +1181,7 @@ public class RpcClient { IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); } - // it's possible that this call may have been cleaned up due to a RPC - // timeout, so check if it still exists before setting the value. - if (expectedCall) call.setResponse(value, cellBlockScanner); + call.setResponse(value, cellBlockScanner); } } catch (IOException e) { if (expectedCall) call.setException(e); @@ -1486,7 +1484,6 @@ public class RpcClient { @Override public void run(Object parameter) { connection.callSender.remove(cts); - call.callComplete(); } }); if (pcrc.isCanceled()) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index e3654cb..1880f98 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -870,22 +870,26 @@ public class RpcServer implements RpcServerInterface { /** * Take the list of the connections that want to write, and register them - * in the selector. + * in the selector. */ - private void registerWrites(){ + private void registerWrites() { Iterator it = writingCons.iterator(); - while (it.hasNext()){ + while (it.hasNext()) { Connection c = it.next(); it.remove(); SelectionKey sk = c.channel.keyFor(writeSelector); - if (sk == null){ - try { - c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); - } catch (ClosedChannelException e) { - // ignore: the client went away. + try { + if (sk == null) { + try { + c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); + } catch (ClosedChannelException e) { + // ignore: the client went away. + } + } else { + sk.interestOps(SelectionKey.OP_WRITE); } - } else { - sk.interestOps(SelectionKey.OP_WRITE); + } catch (CancelledKeyException e) { + // ignore: the client went away. } } }