diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 4621109..1365411 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -33,14 +33,10 @@ import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.util.Hashtable; import java.util.Iterator; -import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -280,7 +276,7 @@ public class HBaseClient { * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || + if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) { throw e; } @@ -552,14 +548,24 @@ public class HBaseClient { touch(); try { - int id = in.readInt(); // try to read an id + // See HBaseServer.Call.setResponse for where we write out the response. + // It writes the call.id (int), a flag byte, then optionally the length + // of the response (int) followed by data. + + // Read the call id. + int id = in.readInt(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); - Call call = calls.remove(id); - boolean isError = in.readBoolean(); // read if error + // Read the flag byte + byte flag = in.readByte(); + boolean isError = ResponseFlag.isError(flag); + if (ResponseFlag.isLength(flag)) { + // Currently length if present is unused. + in.readInt(); + } if (isError) { //noinspection ThrowableInstanceNeverThrown call.setException(new RemoteException( WritableUtils.readString(in), diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 36b0560..1db05cb 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -298,7 +298,8 @@ public abstract class HBaseServer implements RpcServer { if (result instanceof WritableWithSize) { // get the size hint. WritableWithSize ohint = (WritableWithSize) result; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + + (2 * Bytes.SIZEOF_INT); if (hint > Integer.MAX_VALUE) { // oops, new problem. IOException ioe = @@ -313,8 +314,15 @@ public abstract class HBaseServer implements RpcServer { ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); try { - out.writeInt(this.id); // write call id - out.writeBoolean(error != null); // write error flag + // Call id. + out.writeInt(this.id); + // Write flag. + byte flag = (error != null)? + ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly(); + out.writeByte(flag); + // Place holder for length set later below after we + // fill the buffer with data. + out.writeInt(0xdeadbeef); } catch (IOException e) { errorClass = e.getClass().getName(); error = StringUtils.stringifyException(e); @@ -331,7 +339,16 @@ public abstract class HBaseServer implements RpcServer { LOG.warn("Error sending response to call: ", e); } - this.response = buf.getByteBuffer(); + // Set the length into the ByteBuffer after call id and after + // byte flag. + ByteBuffer bb = buf.getByteBuffer(); + int bufSiz = bb.remaining(); + // Move to the size location in our ByteBuffer past call.id + // and past the byte flag. + bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); + bb.putInt(bufSiz); + bb.position(0); + this.response = bb; } @Override