diff --git a/pom.xml b/pom.xml index 2aff5a8..527894a 100644 --- a/pom.xml +++ b/pom.xml @@ -487,6 +487,11 @@ + com.ning + compress-lzf + 0.6.0 + + com.google.guava guava ${guava.version} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java index 4d8ecbd..affc5a3 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java @@ -58,6 +58,10 @@ public class ByteBufferOutputStream extends OutputStream { return buf; } + public void putInt(int i) { + buf.putInt(i); + } + private void checkSizeAndGrow(int extra) { if ( (buf.position() + extra) > buf.limit()) { // size calculation is complex, because we could overflow negative, diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 3451c4b..65585ef 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; +import com.ning.compress.lzf.LZFInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -454,8 +455,6 @@ public class HBaseClient { synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC NanoProfiler.split(call.id, "serAndSendParam"); - NanoProfiler.split(call.id, "serAndSendParam"); - if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); @@ -773,7 +772,13 @@ public class HBaseClient { protected Object deserializeResult(ByteBuffer res) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(res); - DataInputStream in = new DataInputStream(bbis); + int compressed = bbis.read(); + DataInputStream in; + if (compressed == 1) { + in = new DataInputStream(new LZFInputStream(bbis)); + } else { + in = new DataInputStream(bbis); + } boolean isError = in.readBoolean(); if (isError) { return new RemoteException(WritableUtils.readString(in), diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 1eefa3e..3024905 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -52,6 +52,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import com.ning.compress.lzf.LZFOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -1044,6 +1045,8 @@ public abstract class HBaseServer { } CurCall.set(null); + boolean compress = false; + int size = BUFFER_INITIAL_SIZE; if (value instanceof WritableWithSize) { // get the size hint. @@ -1058,16 +1061,33 @@ public abstract class HBaseServer { errorClass = ioe.getClass().getName(); error = StringUtils.stringifyException(ioe); } else { + + // we could compress! + if (hint > 50 * 1024) + compress = true; + size = (int)hint; } } } + + long nstart = System.nanoTime(); + ByteBufferOutputStream buf = new ByteBufferOutputStream(size); - DataOutputStream out = new DataOutputStream(buf); - out.writeInt(0xdeadbeef); // size of response. - out.writeInt(call.id); // write call id - out.writeBoolean(error != null); // write error flag + // compressed flag: + DataOutputStream out; + buf.putInt(0xdeadbeef); + buf.putInt(call.id); + + if (compress) { + buf.write(1); // true + out = new DataOutputStream(new LZFOutputStream(buf)); + } else { + buf.write(0); // false + out = new DataOutputStream(buf); + } + out.writeBoolean(error != null); // write error flag if (error == null) { value.write(out); } else { @@ -1075,13 +1095,20 @@ public abstract class HBaseServer { WritableUtils.writeString(out, error); } + out.flush(); // finished.. finally. + + long serTime = (System.nanoTime() - nstart); + + if (buf.size() > 5000) + LOG.warn("Serialized size: " + buf.size() + " in " + serTime + + " ns compressed: " + compress); + if (buf.size() > warnResponseSize) { LOG.warn(getName()+", responseTooLarge for: "+call+": Size: " + StringUtils.humanReadableInt(buf.size())); } ByteBuffer bb = buf.getByteBuffer(); - // dont include the size field. int dataSize = bb.limit() - Bytes.SIZEOF_INT; bb.rewind(); bb.putInt(dataSize); // overwrite the 0xdeadbeef. diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java index 3aa55ad..5dcd4a2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java @@ -71,6 +71,10 @@ public class NanoProfiler { static class WriterThread extends Thread { + public WriterThread() { + this.setDaemon(true); + } + public void run() { FileWriter fr; diff --git a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java index befcdaf..ca8de34 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java +++ b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java @@ -434,8 +434,9 @@ public class TestSerialization { KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA); KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB); + KeyValue kbA1 = new KeyValue(rowA, famB, qfA, valueB); - Result result1 = new Result(new KeyValue[]{kvA, kvB}); + Result result1 = new Result(new KeyValue[]{kvA, kbA1}); Result result2 = new Result(new KeyValue[]{kvB}); Result result3 = new Result(new KeyValue[]{kvB});