diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 8f6c162..957180f 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -265,7 +265,7 @@ public class HBaseClient { * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || + if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) { throw e; } @@ -545,6 +545,8 @@ public class HBaseClient { Call call = calls.get(id); + // Right now we dont use this for anyone, but it's there to ignore. + int responseSize = in.readInt(); boolean isError = in.readBoolean(); // read if error if (isError) { //noinspection ThrowableInstanceNeverThrown diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index f36fe62..8170b93 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -1069,7 +1069,7 @@ public abstract class HBaseServer implements RpcServer { if (value instanceof WritableWithSize) { // get the size hint. WritableWithSize ohint = (WritableWithSize)value; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + 2*Bytes.SIZEOF_INT; if (hint > 0) { if ((hint) > Integer.MAX_VALUE) { // oops, new problem. @@ -1085,6 +1085,7 @@ public abstract class HBaseServer implements RpcServer { ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); out.writeInt(call.id); // write call id + out.writeInt(0xdeadbeef); // place holder for size. out.writeBoolean(error != null); // write error flag if (error == null) { @@ -1098,9 +1099,15 @@ public abstract class HBaseServer implements RpcServer { LOG.warn(getName()+", responseTooLarge for: "+call+": Size: " + StringUtils.humanReadableInt(buf.size())); } + ByteBuffer bb = buf.getByteBuffer(); + // chop 4 bytes off the top for the 'id'. + int bufSiz = bb.remaining() - (2*Bytes.SIZEOF_INT); + bb.position(Bytes.SIZEOF_INT); + bb.putInt(bufSiz); + bb.position(0); - call.setResponse(buf.getByteBuffer()); + call.setResponse(bb); responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it