.../java/org/apache/hadoop/hbase/util/Bytes.java | 22 +++++++++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 37 ++++++++++++++++++---- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 987f1e2..2d17c56 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -950,6 +951,27 @@ public class Bytes implements Comparable { } /** + * Convert an int value and write it to CodedOutputStream. Big-endian. Same as + * what DataOutputStream.writeInt does. + * + * @param val + * value + * @throws IOException + */ + public static void toBytes(int val, CodedOutputStream cos) throws IOException { + byte[] b = new byte[4]; + for (int i = 3; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + for (int i = 0; i <= 3; i++) { + // Instead of iterating store in 4 vars and write as RawByte? + cos.writeRawByte(b[i]); + } + } + + /** * Converts a byte array to an int value * @param bytes byte array * @return the int value diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a9c64a3..28bc09e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -131,7 +131,9 @@ import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.HBaseZeroCopyByteString; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; @@ -429,12 +431,35 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. - ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); - ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); - int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + - (this.cellBlock == null? 0: this.cellBlock.limit()); - ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); - bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock); + int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, + resultVintSize = 0; + if (header != null) { + headerSerializedSize = header.getSerializedSize(); + headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize); + } + if (result != null) { + resultSerializedSize = result.getSerializedSize(); + resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize); + } + // calculate the total size + int totalSize = headerSerializedSize + headerVintSize + + (result == null ? 0 : (resultSerializedSize + resultVintSize)) + + (this.cellBlock == null ? 0 : this.cellBlock.limit()); + byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT]; + CodedOutputStream cos = CodedOutputStream.newInstance(b); + Bytes.toBytes(totalSize, cos); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + + bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); + if (connection.useWrap) { bc = wrapWithSasl(bc); }