Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java (revision 0) @@ -0,0 +1,113 @@ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRPCCompression { + final Log LOG = LogFactory.getLog(getClass()); + private static HBaseTestingUtility TEST_UTIL; + private static int SLAVES = 1; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // start the cluster + Configuration conf = HBaseConfiguration.create(); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testCompressedRPC() throws Exception { + byte[] TABLE = Bytes.toBytes("testRPCCompression"); + byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; + byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + byte [] VALUE = Bytes.toBytes("testValue"); + + // create a table + TEST_UTIL.createTable(TABLE, FAMILIES); + LOG.debug("Created table " + new String(TABLE)); + + // open the table with compressed RPC + Configuration conf = HBaseConfiguration.create(); + String zkPortStr = TEST_UTIL.getConfiguration().get( + "hbase.zookeeper.property.clientPort"); + conf.setInt("hbase.zookeeper.property.clientPort", + Integer.parseInt(zkPortStr)); + conf.set(HConstants.HBASE_RPC_COMPRESSION_KEY, + Compression.Algorithm.GZ.getName()); + HTable table = new HTable(conf, TABLE); + + // put some values + byte [][] ROWS = { Bytes.toBytes("a"), Bytes.toBytes("b") }; + for (int i = 0; i < ROWS.length; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILIES[0], QUALIFIER, VALUE); + table.put(put); + } + LOG.debug("Wrote some puts to table " + new String(TABLE)); + + // flush the table + table.flushCommits(); + LOG.debug("Flushed table " + new String(TABLE)); + + // read back the values + for (int i = 0; i < ROWS.length; i++) { + Get get = new Get(ROWS[i]); + get.addColumn(FAMILIES[0], QUALIFIER); + Result result = table.get(get); + + assertEquals(new String(VALUE), + new String(result.getValue(FAMILIES[0], QUALIFIER))); + } + LOG.debug("Read and verified from table " + new String(TABLE)); + } +} Index: hbase-server/src/main/protobuf/RPC.proto =================================================================== --- hbase-server/src/main/protobuf/RPC.proto (revision 1397358) +++ hbase-server/src/main/protobuf/RPC.proto (working copy) @@ -78,6 +78,9 @@ /** Monotonically increasing callId, mostly to keep track of RPCs */ required uint32 callId = 1; optional RPCTInfo tinfo = 2; + /** The compression used for compressing the request bytes + */ + optional string compressionAlgo = 3; } /** * The RPC request body @@ -114,6 +117,9 @@ FATAL = 2; } required Status status = 2; + /** The compression used for compressing the response bytes + */ + optional string compressionAlgo = 3; } /** * The RPC response body Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision 1397358) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (working copy) @@ -1110,6 +1110,10 @@ boolean hasTinfo(); org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo getTinfo(); org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfoOrBuilder getTinfoOrBuilder(); + + // optional string compressionAlgo = 3; + boolean hasCompressionAlgo(); + String getCompressionAlgo(); } public static final class RpcRequestHeader extends com.google.protobuf.GeneratedMessage @@ -1163,9 +1167,42 @@ return tinfo_; } + // optional string compressionAlgo = 3; + public static final int COMPRESSIONALGO_FIELD_NUMBER = 3; + private java.lang.Object compressionAlgo_; + public boolean hasCompressionAlgo() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getCompressionAlgo() { + java.lang.Object ref = compressionAlgo_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + compressionAlgo_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getCompressionAlgoBytes() { + java.lang.Object ref = compressionAlgo_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + compressionAlgo_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { callId_ = 0; tinfo_ = org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo.getDefaultInstance(); + compressionAlgo_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1189,6 +1226,9 @@ if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, tinfo_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, getCompressionAlgoBytes()); + } getUnknownFields().writeTo(output); } @@ -1206,6 +1246,10 @@ size += com.google.protobuf.CodedOutputStream .computeMessageSize(2, tinfo_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, getCompressionAlgoBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1239,6 +1283,11 @@ result = result && getTinfo() .equals(other.getTinfo()); } + result = result && (hasCompressionAlgo() == other.hasCompressionAlgo()); + if (hasCompressionAlgo()) { + result = result && getCompressionAlgo() + .equals(other.getCompressionAlgo()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1256,6 +1305,10 @@ hash = (37 * hash) + TINFO_FIELD_NUMBER; hash = (53 * hash) + getTinfo().hashCode(); } + if (hasCompressionAlgo()) { + hash = (37 * hash) + COMPRESSIONALGO_FIELD_NUMBER; + hash = (53 * hash) + getCompressionAlgo().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -1381,6 +1434,8 @@ tinfoBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); + compressionAlgo_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -1431,6 +1486,10 @@ } else { result.tinfo_ = tinfoBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.compressionAlgo_ = compressionAlgo_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1453,6 +1512,9 @@ if (other.hasTinfo()) { mergeTinfo(other.getTinfo()); } + if (other.hasCompressionAlgo()) { + setCompressionAlgo(other.getCompressionAlgo()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1502,6 +1564,11 @@ setTinfo(subBuilder.buildPartial()); break; } + case 26: { + bitField0_ |= 0x00000004; + compressionAlgo_ = input.readBytes(); + break; + } } } } @@ -1619,6 +1686,42 @@ return tinfoBuilder_; } + // optional string compressionAlgo = 3; + private java.lang.Object compressionAlgo_ = ""; + public boolean hasCompressionAlgo() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getCompressionAlgo() { + java.lang.Object ref = compressionAlgo_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + compressionAlgo_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setCompressionAlgo(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + compressionAlgo_ = value; + onChanged(); + return this; + } + public Builder clearCompressionAlgo() { + bitField0_ = (bitField0_ & ~0x00000004); + compressionAlgo_ = getDefaultInstance().getCompressionAlgo(); + onChanged(); + return this; + } + void setCompressionAlgo(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000004; + compressionAlgo_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:RpcRequestHeader) } @@ -2294,6 +2397,10 @@ // required .RpcResponseHeader.Status status = 2; boolean hasStatus(); org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status getStatus(); + + // optional string compressionAlgo = 3; + boolean hasCompressionAlgo(); + String getCompressionAlgo(); } public static final class RpcResponseHeader extends com.google.protobuf.GeneratedMessage @@ -2416,9 +2523,42 @@ return status_; } + // optional string compressionAlgo = 3; + public static final int COMPRESSIONALGO_FIELD_NUMBER = 3; + private java.lang.Object compressionAlgo_; + public boolean hasCompressionAlgo() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getCompressionAlgo() { + java.lang.Object ref = compressionAlgo_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + compressionAlgo_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getCompressionAlgoBytes() { + java.lang.Object ref = compressionAlgo_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + compressionAlgo_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { callId_ = 0; status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status.SUCCESS; + compressionAlgo_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2446,6 +2586,9 @@ if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeEnum(2, status_.getNumber()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, getCompressionAlgoBytes()); + } getUnknownFields().writeTo(output); } @@ -2463,6 +2606,10 @@ size += com.google.protobuf.CodedOutputStream .computeEnumSize(2, status_.getNumber()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, getCompressionAlgoBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -2496,6 +2643,11 @@ result = result && (getStatus() == other.getStatus()); } + result = result && (hasCompressionAlgo() == other.hasCompressionAlgo()); + if (hasCompressionAlgo()) { + result = result && getCompressionAlgo() + .equals(other.getCompressionAlgo()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -2513,6 +2665,10 @@ hash = (37 * hash) + STATUS_FIELD_NUMBER; hash = (53 * hash) + hashEnum(getStatus()); } + if (hasCompressionAlgo()) { + hash = (37 * hash) + COMPRESSIONALGO_FIELD_NUMBER; + hash = (53 * hash) + getCompressionAlgo().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -2633,6 +2789,8 @@ bitField0_ = (bitField0_ & ~0x00000001); status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status.SUCCESS; bitField0_ = (bitField0_ & ~0x00000002); + compressionAlgo_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -2679,6 +2837,10 @@ to_bitField0_ |= 0x00000002; } result.status_ = status_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.compressionAlgo_ = compressionAlgo_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2701,6 +2863,9 @@ if (other.hasStatus()) { setStatus(other.getStatus()); } + if (other.hasCompressionAlgo()) { + setCompressionAlgo(other.getCompressionAlgo()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2756,6 +2921,11 @@ } break; } + case 26: { + bitField0_ |= 0x00000004; + compressionAlgo_ = input.readBytes(); + break; + } } } } @@ -2807,6 +2977,42 @@ return this; } + // optional string compressionAlgo = 3; + private java.lang.Object compressionAlgo_ = ""; + public boolean hasCompressionAlgo() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public String getCompressionAlgo() { + java.lang.Object ref = compressionAlgo_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + compressionAlgo_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setCompressionAlgo(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + compressionAlgo_ = value; + onChanged(); + return this; + } + public Builder clearCompressionAlgo() { + bitField0_ = (bitField0_ & ~0x00000004); + compressionAlgo_ = getDefaultInstance().getCompressionAlgo(); + onChanged(); + return this; + } + void setCompressionAlgo(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000004; + compressionAlgo_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:RpcResponseHeader) } @@ -3760,18 +3966,19 @@ "\030\002 \001(\t\"w\n\020ConnectionHeader\022\"\n\010userInfo\030\001" + " \001(\0132\020.UserInformation\022?\n\010protocol\030\002 \001(\t" + ":-org.apache.hadoop.hbase.client.ClientP" + - "rotocol\"<\n\020RpcRequestHeader\022\016\n\006callId\030\001 " + - "\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\"n\n\016RpcRequ" + - "estBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientPro" + - "tocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020r" + - "equestClassName\030\004 \001(\t\"{\n\021RpcResponseHead", - "er\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.Rpc" + - "ResponseHeader.Status\"+\n\006Status\022\013\n\007SUCCE" + - "SS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcRespons" + - "eBody\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022" + - "\025\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001" + - "(\tB<\n*org.apache.hadoop.hbase.protobuf.g" + - "eneratedB\tRPCProtosH\001\240\001\001" + "rotocol\"U\n\020RpcRequestHeader\022\016\n\006callId\030\001 " + + "\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\022\027\n\017compres" + + "sionAlgo\030\003 \001(\t\"n\n\016RpcRequestBody\022\022\n\nmeth" + + "odName\030\001 \002(\t\022\035\n\025clientProtocolVersion\030\002 " + + "\001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020requestClassName", + "\030\004 \001(\t\"\224\001\n\021RpcResponseHeader\022\016\n\006callId\030\001" + + " \002(\r\022)\n\006status\030\002 \002(\0162\031.RpcResponseHeader" + + ".Status\022\027\n\017compressionAlgo\030\003 \001(\t\"+\n\006Stat" + + "us\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n" + + "\017RpcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014Rp" + + "cException\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\nsta" + + "ckTrace\030\002 \001(\tB<\n*org.apache.hadoop.hbase" + + ".protobuf.generatedB\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3799,7 +4006,7 @@ internal_static_RpcRequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RpcRequestHeader_descriptor, - new java.lang.String[] { "CallId", "Tinfo", }, + new java.lang.String[] { "CallId", "Tinfo", "CompressionAlgo", }, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader.Builder.class); internal_static_RpcRequestBody_descriptor = @@ -3815,7 +4022,7 @@ internal_static_RpcResponseHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RpcResponseHeader_descriptor, - new java.lang.String[] { "CallId", "Status", }, + new java.lang.String[] { "CallId", "Status", "CompressionAlgo", }, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Builder.class); internal_static_RpcResponseBody_descriptor = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1397358) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -26,6 +26,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.security.User; @@ -88,6 +91,8 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.security.AccessControlException; @@ -326,6 +331,8 @@ // set at call completion protected long size; // size of current call protected boolean isError; + protected Compression.Algorithm compressionAlgo = + Compression.Algorithm.NONE; protected TraceInfo tinfo; public Call(int id, RpcRequestBody rpcRequestBody, Connection connection, @@ -365,12 +372,15 @@ } else { buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE); } - DataOutputStream out = new DataOutputStream(buf); + DataOutputStream rawOS = new DataOutputStream(buf); + DataOutputStream out = rawOS; + Compressor compressor = null; try { RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder(); // Call id. builder.setCallId(this.id); builder.setStatus(status); + builder.setCompressionAlgo(compressionAlgo.getName()); builder.build().writeDelimitedTo(out); if (error != null) { RpcException.Builder b = RpcException.newBuilder(); @@ -378,15 +388,31 @@ b.setStackTrace(error); b.build().writeDelimitedTo(out); } else { + if (compressionAlgo != Compression.Algorithm.NONE) { + compressor = getRPCCompression().getCompressor(); + OutputStream compressedOutputStream = + getRPCCompression().createCompressionStream(rawOS, compressor, 0); + out = new DataOutputStream(compressedOutputStream); + } if (value != null) { ((Message)value).writeDelimitedTo(out); } + try { + out.flush(); + buf.flush(); + } catch (IOException e) { + LOG.error("Got ", e); + } } if (connection.useWrap) { wrapWithSasl(buf); } } catch (IOException e) { LOG.warn("Exception while creating response " + e); + } finally { + if (compressor != null) { + getRPCCompression().returnCompressor(compressor); + } } ByteBuffer bb = buf.getByteBuffer(); bb.position(0); @@ -485,6 +511,13 @@ this.responder.doRespond(this); } } + public void setRPCCompression(Compression.Algorithm compressionAlgo) { + this.compressionAlgo = compressionAlgo; + } + + public Compression.Algorithm getRPCCompression() { + return this.compressionAlgo; + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -1624,7 +1657,19 @@ } RpcRequestBody rpcRequestBody; + Decompressor decompressor = null; + Compression.Algorithm rpcCompression = null; try { + if (request.hasCompressionAlgo()) { + rpcCompression = + Compression.getCompressionAlgorithmByName(request.getCompressionAlgo()); + if (rpcCompression != Compression.Algorithm.NONE) { + decompressor = rpcCompression.getDecompressor(); + InputStream is = rpcCompression.createDecompressionStream( + dis, decompressor, 0); + dis = new DataInputStream(is); + } + } rpcRequestBody = RpcRequestBody.parseDelimitedFrom(dis); } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + @@ -1638,6 +1683,10 @@ "IPC server unable to read call parameters: " + t.getMessage()); responder.doRespond(readParamsFailedCall); return; + } finally { + if (decompressor != null) { + rpcCompression.returnDecompressor(decompressor); + } } Call call; @@ -1648,6 +1697,9 @@ } else { call = new Call(id, rpcRequestBody, this, responder, callSize, null); } + if (request.hasCompressionAlgo()) { + call.setRPCCompression(rpcCompression); + } callQueueSize.add(callSize); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1397358) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; @@ -74,8 +75,11 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -89,6 +93,7 @@ import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; import com.google.protobuf.Message.Builder; @@ -263,6 +268,9 @@ IOException error; // exception, null if value boolean done; // true when call is done long startTime; + protected Compression.Algorithm compressionAlgo = + Compression.Algorithm.NONE; + protected int version = HBaseServer.CURRENT_VERSION; protected Call(RpcRequestBody param) { this.param = param; @@ -302,6 +310,15 @@ public long getStartTime() { return this.startTime; } + + public void setRPCCompression(Compression.Algorithm compressionAlgo) { + this.compressionAlgo = compressionAlgo; + } + + public Compression.Algorithm getRPCCompression() { + return this.compressionAlgo; + } + } protected static Map> tokenHandlers = new HashMap>(); @@ -934,6 +951,7 @@ RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder(); headerBuilder.setCallId(call.id); + headerBuilder.setCompressionAlgo(call.compressionAlgo.getName()); if (Trace.isTracing()) { Span s = Trace.currentTrace(); @@ -942,17 +960,41 @@ .setTraceId(s.getTraceId())); } + int requestSerializedSize = call.param.getSerializedSize(); + DataOutputBuffer d = new DataOutputBuffer(requestSerializedSize); + if (call.compressionAlgo != Compression.Algorithm.NONE) { + //pass the bytes through the compressor + Compressor compressor = null; + try { + compressor = call.getRPCCompression().getCompressor(); + OutputStream compressedOutputStream = null; + compressedOutputStream = call.getRPCCompression().createCompressionStream( + d, compressor, 0); + call.param.writeDelimitedTo(compressedOutputStream); + compressedOutputStream.flush(); + d.flush(); + requestSerializedSize = d.getLength(); + } finally { + if (compressor != null) { + call.getRPCCompression().returnCompressor(compressor); + } + } + } + //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC RpcRequestHeader header = headerBuilder.build(); int serializedHeaderSize = header.getSerializedSize(); - int requestSerializedSize = call.param.getSerializedSize(); this.out.writeInt(serializedHeaderSize + CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) + requestSerializedSize + CodedOutputStream.computeRawVarint32Size(requestSerializedSize)); header.writeDelimitedTo(this.out); - call.param.writeDelimitedTo(this.out); + if (call.compressionAlgo != Compression.Algorithm.NONE) { + this.out.write(d.getData()); + } else { + call.param.writeDelimitedTo(this.out); + } this.out.flush(); } } catch(IOException e) { @@ -968,7 +1010,8 @@ return; } touch(); - + Compression.Algorithm rpcCompression = null; + Decompressor decompressor = null; try { // See HBaseServer.Call.setResponse for where we write out the response. // It writes the call.id (int), a boolean signifying any error (and if @@ -989,6 +1032,17 @@ Status status = response.getStatus(); if (status == Status.SUCCESS) { + if (response.hasCompressionAlgo()) { + String compressionAlgoName = response.getCompressionAlgo(); + rpcCompression = + Compression.getCompressionAlgorithmByName(compressionAlgoName); + if (rpcCompression != Compression.Algorithm.NONE) { + decompressor = rpcCompression.getDecompressor(); + InputStream is = rpcCompression.createDecompressionStream( + in, decompressor, 0); + in = new DataInputStream(is); + } + } Message rpcResponseType; try { rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType( @@ -1037,6 +1091,9 @@ if (remoteId.rpcTimeout > 0) { cleanupCalls(remoteId.rpcTimeout); } + if (decompressor != null) { + rpcCompression.returnDecompressor(decompressor); + } } } @@ -1263,17 +1320,18 @@ public Message call(RpcRequestBody param, InetSocketAddress addr, User ticket, int rpcTimeout) throws IOException, InterruptedException { - return call(param, addr, null, ticket, rpcTimeout); + return call(param, addr, null, ticket, rpcTimeout, Compression.Algorithm.NONE); } /** Make a call, passing param, to the IPC server running at * address which is servicing the protocol protocol, * with the ticket credentials, returning the value. * Throws exceptions if there are network problems or if the remote code - * threw an exception. */ + * threw an exception. + * @param rpcCompression TODO*/ public Message call(RpcRequestBody param, InetSocketAddress addr, Class protocol, - User ticket, int rpcTimeout) + User ticket, int rpcTimeout, Compression.Algorithm rpcCompression) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (revision 1397358) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Objects; import org.codehaus.jackson.map.ObjectMapper; +import org.apache.hadoop.hbase.HConstants; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -96,6 +98,7 @@ private boolean isClosed = false; final private int rpcTimeout; private final long clientProtocolVersion; + private final Compression.Algorithm rpcCompression; public Invoker(Class protocol, InetSocketAddress addr, User ticket, Configuration conf, @@ -119,6 +122,13 @@ protocol, e); } } + Compression.Algorithm rpcCompression = HConstants.DEFAULT_HBASE_RPC_COMPRESSION; + String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY); + if (compressionAlgo != null) { + rpcCompression = + Compression.getCompressionAlgorithmByName(compressionAlgo); + } + this.rpcCompression = rpcCompression; } private RpcRequestBody constructRpcRequest(Method method, @@ -175,7 +185,7 @@ RpcRequestBody rpcRequest = constructRpcRequest(method, args); Message val = null; try { - val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout); + val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout, rpcCompression); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1397358) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.util.Bytes; /** @@ -473,6 +474,13 @@ */ public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size"; + /** + * compression for each RPC and its default value + */ + public static String HBASE_RPC_COMPRESSION_KEY = "hbase.rpc.compression"; + public static Compression.Algorithm DEFAULT_HBASE_RPC_COMPRESSION = + Compression.Algorithm.NONE; + /** * Maximum number of bytes returned when calling a scanner's next method. * Note that when a single row is larger than this limit the row is still