Index: src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1331811) +++ src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -35,9 +35,11 @@ import org.apache.commons.logging.*; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; @@ -128,6 +130,8 @@ private HBaseClient client; private boolean isClosed = false; final private int rpcTimeout; + private Compression.Algorithm rpcCompression = + HConstants.DEFAULT_HBASE_RPC_COMPRESSION; public Invoker(Class protocol, InetSocketAddress address, User ticket, @@ -137,6 +141,11 @@ this.ticket = ticket; this.client = CLIENTS.getClient(conf, factory); this.rpcTimeout = rpcTimeout; + String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY); + if (compressionAlgo != null) { + rpcCompression = + Compression.getCompressionAlgorithmByName(compressionAlgo); + } } public Object invoke(Object proxy, Method method, Object[] args) @@ -149,7 +158,7 @@ HbaseObjectWritable value = (HbaseObjectWritable) client.call(new Invocation(method, args), address, - protocol, ticket, rpcTimeout); + protocol, ticket, rpcTimeout, rpcCompression); if (logDebug) { // FIGURE HOW TO TURN THIS OFF! long callTime = System.currentTimeMillis() - startTime; @@ -212,9 +221,15 @@ for (int i = 0; i < params.length; i++) invocations[i] = new Invocation(method, params[i]); HBaseClient client = CLIENTS.getClient(conf); + Compression.Algorithm rpcCompression = Compression.Algorithm.NONE; + String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY); + if (compressionAlgo != null) { + rpcCompression = + Compression.getCompressionAlgorithmByName(compressionAlgo); + } try { Writable[] wrappedValues = - client.call(invocations, addrs, protocol, ticket); + client.call(invocations, addrs, protocol, ticket, rpcCompression); if (method.getReturnType() == Void.TYPE) { return null; Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1331811) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -25,6 +25,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -61,6 +63,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.security.User; @@ -68,6 +72,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -92,8 +98,12 @@ * The first four bytes of Hadoop RPC connections */ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - public static final byte CURRENT_VERSION = 3; + public static final byte VERSION_3 = 3; + // 4 : includes support for compression on RPCs + public static final byte VERSION_COMPRESSED_RPC = 4; + public static final byte CURRENT_VERSION = VERSION_COMPRESSED_RPC; + /** * How many calls/handler are allowed in the queue. */ @@ -269,6 +279,9 @@ // set at call completion protected long size; // size of current call protected boolean isError; + protected Compression.Algorithm compressionAlgo = + Compression.Algorithm.NONE; + protected int version = CURRENT_VERSION; // version used for the call public Call(int id, Writable param, Connection connection, Responder responder, long size) { @@ -283,6 +296,22 @@ this.size = size; } + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } + + public void setRPCCompression(Compression.Algorithm compressionAlgo) { + this.compressionAlgo = compressionAlgo; + } + + public Compression.Algorithm getRPCCompression() { + return this.compressionAlgo; + } + @Override public String toString() { return param.toString() + " from " + connection.toString(); @@ -308,7 +337,6 @@ result = new HbaseObjectWritable(value); } } - int size = BUFFER_INITIAL_SIZE; if (result instanceof WritableWithSize) { // get the size hint. @@ -325,13 +353,14 @@ size = (int)hint; } } - ByteBufferOutputStream buf = new ByteBufferOutputStream(size); - DataOutputStream out = new DataOutputStream(buf); + DataOutputStream rawOS = new DataOutputStream(buf); + DataOutputStream out = rawOS; + Compressor compressor = null; try { - // Call id. + // 1. write call id uncompressed out.writeInt(this.id); - // Write flag. + // 2. write error flag uncompressed byte flag = (error != null)? ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly(); out.writeByte(flag); @@ -343,9 +372,19 @@ errorClass = e.getClass().getName(); error = StringUtils.stringifyException(e); } - try { if (error == null) { + if (getVersion() >= VERSION_COMPRESSED_RPC) { + // 3. write the compression type for the rest of the response + out.writeUTF(getRPCCompression().getName()); + // 4. create a compressed output stream if compression was enabled + if (getRPCCompression() != Compression.Algorithm.NONE) { + compressor = getRPCCompression().getCompressor(); + OutputStream compressedOutputStream = + getRPCCompression().createCompressionStream(rawOS, compressor, 0); + out = new DataOutputStream(compressedOutputStream); + } + } result.write(out); } else { WritableUtils.writeString(out, errorClass); @@ -355,6 +394,15 @@ LOG.warn("Error sending response to call: ", e); } + try { + out.flush(); + buf.flush(); + + getRPCCompression().returnCompressor(compressor); + } catch (IOException e) { + LOG.error("Got ", e); + } + // Set the length into the ByteBuffer after call id and after // byte flag. ByteBuffer bb = buf.getByteBuffer(); @@ -364,6 +412,7 @@ bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); bb.putInt(bufSiz); bb.position(0); + this.response = bb; } @@ -1045,6 +1094,7 @@ protected class Connection { private boolean versionRead = false; //if initial signature and //version are read + private int version = -1; private boolean headerRead = false; //if the connection header that //follows version is read. protected SocketChannel channel; @@ -1146,15 +1196,18 @@ if (count <= 0) { return count; } - int version = versionBuffer.get(0); + version = versionBuffer.get(0); dataLengthBuffer.flip(); - if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { + if (!HEADER.equals(dataLengthBuffer) || + version < VERSION_3 || version > CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + - " got version " + version + - " expected version " + CURRENT_VERSION); + " got header " + dataLengthBuffer + + ", version " + version + + " supported versions [" + VERSION_3 + + " ... " + CURRENT_VERSION + "]"); setupBadVersionResponse(version); return -1; } @@ -1242,9 +1295,16 @@ } protected void processData(byte[] buf) throws IOException, InterruptedException { - DataInputStream dis = + DataInputStream uncompressedIs = new DataInputStream(new ByteArrayInputStream(buf)); - int id = dis.readInt(); // try to read an id + Compression.Algorithm rxCompression = null; + Compression.Algorithm txCompression = Algorithm.NONE; + DataInputStream dis = uncompressedIs; + Decompressor decompressor = null; + + // 1. read the call id uncompressed + int id = uncompressedIs.readInt(); + long callSize = buf.length; if (LOG.isDebugEnabled()) { @@ -1263,6 +1323,28 @@ return; } + if (version >= VERSION_COMPRESSED_RPC) { + + // 2. read the compression used for the request + String rxCompressionAlgoName = uncompressedIs.readUTF(); + rxCompression = + Compression.getCompressionAlgorithmByName(rxCompressionAlgoName); + + // 3. read the compression requested for the response + String txCompressionAlgoName = uncompressedIs.readUTF(); + txCompression = + Compression.getCompressionAlgorithmByName(txCompressionAlgoName); + + // 4. set up a decompressor to read the rest of the request + if (rxCompression != Compression.Algorithm.NONE) { + decompressor = rxCompression.getDecompressor(); + InputStream is = rxCompression.createDecompressionStream( + uncompressedIs, decompressor, 0); + dis = new DataInputStream(is); + } + } + + // 5. read the rest of the params Writable param; try { param = ReflectionUtils.newInstance(paramClass, conf);//read param @@ -1279,8 +1361,13 @@ "IPC server unable to read call parameters: " + t.getMessage()); responder.doRespond(readParamsFailedCall); return; + } finally { + if (decompressor != null) + rxCompression.returnDecompressor(decompressor); } Call call = new Call(id, param, this, responder, callSize); + call.setRPCCompression(txCompression); + call.setVersion(version); callQueueSize.add(callSize); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1331811) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -22,11 +22,13 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; @@ -45,6 +47,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap; @@ -53,6 +56,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; @@ -165,6 +170,9 @@ IOException error; // exception, null if value boolean done; // true when call is done long startTime; + protected Compression.Algorithm compressionAlgo = + Compression.Algorithm.NONE; + protected int version = HBaseServer.CURRENT_VERSION; protected Call(Writable param) { this.param = param; @@ -204,6 +212,22 @@ public long getStartTime() { return this.startTime; } + + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } + + public void setRPCCompression(Compression.Algorithm compressionAlgo) { + this.compressionAlgo = compressionAlgo; + } + + public Compression.Algorithm getRPCCompression() { + return this.compressionAlgo; + } } /** Thread that reads responses and notifies callers. Each connection owns a @@ -255,7 +279,8 @@ protected synchronized boolean addCall(Call call) { if (shouldCloseConnection.get()) return false; - calls.put(call.id, call); + calls.put(new Integer(call.id), call); + LOG.debug("IPC Client (" + socketFactory.hashCode() + ") added call " + call.id); notify(); return true; } @@ -348,7 +373,7 @@ * the connection thread that waits for responses. * @throws java.io.IOException e */ - protected synchronized void setupIOstreams() + protected synchronized void setupIOstreams(byte version) throws IOException, InterruptedException { if (socket != null || shouldCloseConnection.get()) { @@ -364,7 +389,7 @@ (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream (new BufferedOutputStream(NetUtils.getOutputStream(socket))); - writeHeader(); + writeHeader(version); // update last activity time touch(); @@ -431,9 +456,9 @@ /* Write the header for each connection * Out is not synchronized because only the first thread does this. */ - private void writeHeader() throws IOException { + private void writeHeader(byte version) throws IOException { out.write(HBaseServer.HEADER.array()); - out.write(HBaseServer.CURRENT_VERSION); + out.write(version); //When there are more fields we can have ConnectionHeader Writable. DataOutputBuffer buf = new DataOutputBuffer(); header.write(buf); @@ -527,29 +552,57 @@ // For serializing the data to be written. - final DataOutputBuffer d = new DataOutputBuffer(); + DataOutputStream uncompressedOS = null; + DataOutputStream outOS = null; + Compressor compressor = null; try { - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id); + } - d.writeInt(0xdeadbeef); // placeholder for data length - d.writeInt(call.id); - call.param.write(d); - byte[] data = d.getData(); - int dataLength = d.getLength(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + uncompressedOS = new DataOutputStream(baos); + outOS = uncompressedOS; + + uncompressedOS.writeInt(0xdeadbeef); // placeholder for data length + uncompressedOS.writeInt(call.id); + + // preserve backwards compatibility + if (call.getRPCCompression() != Compression.Algorithm.NONE) { + // 2. write the compression algo used to compress the request being sent + uncompressedOS.writeUTF(call.getRPCCompression().getName()); + // 3. write the compression algo to use for the response + uncompressedOS.writeUTF(call.getRPCCompression().getName()); + // 4. setup the compressor + compressor = call.getRPCCompression().getCompressor(); + OutputStream compressedOutputStream = + call.getRPCCompression().createCompressionStream( + uncompressedOS, compressor, 0); + outOS = new DataOutputStream(compressedOutputStream); + } + call.param.write(outOS); + outOS.flush(); + baos.flush(); + byte[] data = baos.toByteArray(); + int dataLength = data.length; // fill in the placeholder Bytes.putInt(data, 0, dataLength - 4); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC out.write(data, 0, dataLength); out.flush(); + } } catch(IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early - IOUtils.closeStream(d); + if (outOS != uncompressedOS) { + IOUtils.closeStream(outOS); + } + IOUtils.closeStream(uncompressedOS); + call.getRPCCompression().returnCompressor(compressor); } } @@ -567,30 +620,49 @@ // It writes the call.id (int), a flag byte, then optionally the length // of the response (int) followed by data. + DataInputStream localIn = in; + Decompressor decompressor = null; + Compression.Algorithm rpcCompression = null; + // Read the call id. - int id = in.readInt(); + int id = localIn.readInt(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); - Call call = calls.remove(id); - - // Read the flag byte - byte flag = in.readByte(); + Call call = calls.remove(new Integer(id)); + // 2. Read the flag byte + byte flag = localIn.readByte(); boolean isError = ResponseFlag.isError(flag); if (ResponseFlag.isLength(flag)) { // Currently length if present is unused. - in.readInt(); + localIn.readInt(); } - int state = in.readInt(); // Read the state. Currently unused. + // 3. Read the state. Currently unused. + int state = localIn.readInt(); if (isError) { if (call != null) { //noinspection ThrowableInstanceNeverThrown - call.setException(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); + call.setException(new RemoteException(WritableUtils.readString(localIn), + WritableUtils.readString(localIn))); } } else { + if (call.getVersion() >= HBaseServer.VERSION_COMPRESSED_RPC) { + // 4. read the compression type used for the rest of the response + String compressionAlgoName = localIn.readUTF(); + rpcCompression = + Compression.getCompressionAlgorithmByName(compressionAlgoName); + // 5. setup the correct decompressor (if any) + if (rpcCompression != Compression.Algorithm.NONE) { + decompressor = rpcCompression.getDecompressor(); + InputStream is = rpcCompression.createDecompressionStream( + localIn, decompressor, 0); + localIn = new DataInputStream(is); + } + } Writable value = ReflectionUtils.newInstance(valueClass, conf); - value.readFields(in); // read value + value.readFields(localIn); // read value + if (decompressor != null) + rpcCompression.returnDecompressor(decompressor); // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. if (call != null) { @@ -598,6 +670,7 @@ } } } catch (IOException e) { + LOG.debug("eating ", e); if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified @@ -875,13 +948,14 @@ */ public Writable call(Writable param, InetSocketAddress address) throws IOException, InterruptedException { - return call(param, address, null, 0); + return call(param, address, null, 0, Compression.Algorithm.NONE); } public Writable call(Writable param, InetSocketAddress addr, - User ticket, int rpcTimeout) + User ticket, int rpcTimeout, + Compression.Algorithm rpcCompression) throws IOException, InterruptedException { - return call(param, addr, null, ticket, rpcTimeout); + return call(param, addr, null, ticket, rpcTimeout, rpcCompression); } /** Make a call, passing param, to the IPC server running at @@ -891,9 +965,11 @@ * threw an exception. */ public Writable call(Writable param, InetSocketAddress addr, Class protocol, - User ticket, int rpcTimeout) + User ticket, int rpcTimeout, + Compression.Algorithm rpcCompression) throws InterruptedException, IOException { Call call = new Call(param); + call.setRPCCompression(rpcCompression); Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter boolean interrupted = false; @@ -965,12 +1041,13 @@ * @param addresses socket addresses * @return Writable[] * @throws IOException e - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead + * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User, Compression.Algorithm)} instead */ @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses) + public Writable[] call(Writable[] params, InetSocketAddress[] addresses, + Compression.Algorithm rpcCompression) throws IOException, InterruptedException { - return call(params, addresses, null, null); + return call(params, addresses, null, null, rpcCompression); } /** Makes a set of calls in parallel. Each parameter is sent to the @@ -979,7 +1056,7 @@ * contains nulls for calls that timed out or errored. */ public Writable[] call(Writable[] params, InetSocketAddress[] addresses, Class protocol, - User ticket) + User ticket, Compression.Algorithm rpcCompression) throws IOException, InterruptedException { if (addresses.length == 0) return new Writable[0]; @@ -989,6 +1066,7 @@ synchronized (results) { for (int i = 0; i < params.length; i++) { ParallelCall call = new ParallelCall(params[i], results, i); + call.setRPCCompression(rpcCompression); try { Connection connection = getConnection(addresses[i], protocol, ticket, 0, call); @@ -1022,12 +1100,19 @@ // the client is stopped throw new IOException("The client is stopped"); } + // RPC compression is only supported from version 4, so make backward compatible + byte version = HBaseServer.CURRENT_VERSION; + if (call.getRPCCompression() == Compression.Algorithm.NONE) { + version = HBaseServer.VERSION_3; + } + call.setVersion(version); Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); + ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout, + call.getVersion()); do { synchronized (connections) { connection = connections.get(remoteId); @@ -1042,7 +1127,7 @@ //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. - connection.setupIOstreams(); + connection.setupIOstreams(version); return connection; } @@ -1056,15 +1141,17 @@ final int rpcTimeout; Class protocol; private static final int PRIME = 16777619; + final private int version; ConnectionId(InetSocketAddress address, Class protocol, User ticket, - int rpcTimeout) { + int rpcTimeout, int version) { this.protocol = protocol; this.address = address; this.ticket = ticket; this.rpcTimeout = rpcTimeout; + this.version = version; } InetSocketAddress getAddress() { @@ -1085,7 +1172,8 @@ ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && protocol == id.protocol && ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; + (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout + && version == id.version; } return false; } @@ -1094,7 +1182,7 @@ public int hashCode() { return (address.hashCode() + PRIME * ( PRIME * System.identityHashCode(protocol) ^ - (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; + (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout ^ version; } } } Index: src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java (revision 0) @@ -0,0 +1,113 @@ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRPCCompression { + final Log LOG = LogFactory.getLog(getClass()); + private static HBaseTestingUtility TEST_UTIL; + private static int SLAVES = 1; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // start the cluster + Configuration conf = HBaseConfiguration.create(); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testCompressedRPC() throws Exception { + byte[] TABLE = Bytes.toBytes("testRPCCompression"); + byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; + byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + byte [] VALUE = Bytes.toBytes("testValue"); + + // create a table + TEST_UTIL.createTable(TABLE, FAMILIES); + LOG.debug("Created table " + new String(TABLE)); + + // open the table with compressed RPC + Configuration conf = HBaseConfiguration.create(); + String zkPortStr = TEST_UTIL.getConfiguration().get( + "hbase.zookeeper.property.clientPort"); + conf.setInt("hbase.zookeeper.property.clientPort", + Integer.parseInt(zkPortStr)); + conf.set(HConstants.HBASE_RPC_COMPRESSION_KEY, + Compression.Algorithm.GZ.getName()); + HTable table = new HTable(conf, TABLE); + + // put some values + byte [][] ROWS = { Bytes.toBytes("a"), Bytes.toBytes("b") }; + for (int i = 0; i < ROWS.length; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILIES[0], QUALIFIER, VALUE); + table.put(put); + } + LOG.debug("Wrote some puts to table " + new String(TABLE)); + + // flush the table + table.flushCommits(); + LOG.debug("Flushed table " + new String(TABLE)); + + // read back the values + for (int i = 0; i < ROWS.length; i++) { + Get get = new Get(ROWS[i]); + get.addColumn(FAMILIES[0], QUALIFIER); + Result result = table.get(get); + + assertEquals(new String(VALUE), + new String(result.getValue(FAMILIES[0], QUALIFIER))); + } + LOG.debug("Read and verified from table " + new String(TABLE)); + } +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1331811) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.regex.Pattern; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -444,6 +445,13 @@ public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size"; /** + * compression for each RPC and its default value + */ + public static String HBASE_RPC_COMPRESSION_KEY = "hbase.rpc.compression"; + public static Compression.Algorithm DEFAULT_HBASE_RPC_COMPRESSION = + Compression.Algorithm.NONE; + + /** * Maximum number of bytes returned when calling a scanner's next method. * Note that when a single row is larger than this limit the row is still * returned completely.