diff --git a/pom.xml b/pom.xml
index 2aff5a8..527894a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -487,6 +487,11 @@
+ com.ning
+ compress-lzf
+ 0.6.0
+
+
com.google.guava
guava
${guava.version}
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
index 4d8ecbd..affc5a3 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
@@ -58,6 +58,10 @@ public class ByteBufferOutputStream extends OutputStream {
return buf;
}
+ public void putInt(int i) {
+ buf.putInt(i);
+ }
+
private void checkSizeAndGrow(int extra) {
if ( (buf.position() + extra) > buf.limit()) {
// size calculation is complex, because we could overflow negative,
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
index 3451c4b..65585ef 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.ipc;
+import com.ning.compress.lzf.LZFInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -454,8 +455,6 @@ public class HBaseClient {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
NanoProfiler.split(call.id, "serAndSendParam");
- NanoProfiler.split(call.id, "serAndSendParam");
-
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
@@ -773,7 +772,13 @@ public class HBaseClient {
protected Object deserializeResult(ByteBuffer res) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(res);
- DataInputStream in = new DataInputStream(bbis);
+ int compressed = bbis.read();
+ DataInputStream in;
+ if (compressed == 1) {
+ in = new DataInputStream(new LZFInputStream(bbis));
+ } else {
+ in = new DataInputStream(bbis);
+ }
boolean isError = in.readBoolean();
if (isError) {
return new RemoteException(WritableUtils.readString(in),
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
index 1eefa3e..3024905 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import com.ning.compress.lzf.LZFOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -1044,6 +1045,8 @@ public abstract class HBaseServer {
}
CurCall.set(null);
+ boolean compress = false;
+
int size = BUFFER_INITIAL_SIZE;
if (value instanceof WritableWithSize) {
// get the size hint.
@@ -1058,16 +1061,33 @@ public abstract class HBaseServer {
errorClass = ioe.getClass().getName();
error = StringUtils.stringifyException(ioe);
} else {
+
+ // we could compress!
+ if (hint > 50 * 1024)
+ compress = true;
+
size = (int)hint;
}
}
}
+
+ long nstart = System.nanoTime();
+
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
- DataOutputStream out = new DataOutputStream(buf);
- out.writeInt(0xdeadbeef); // size of response.
- out.writeInt(call.id); // write call id
- out.writeBoolean(error != null); // write error flag
+ // compressed flag:
+ DataOutputStream out;
+ buf.putInt(0xdeadbeef);
+ buf.putInt(call.id);
+
+ if (compress) {
+ buf.write(1); // true
+ out = new DataOutputStream(new LZFOutputStream(buf));
+ } else {
+ buf.write(0); // false
+ out = new DataOutputStream(buf);
+ }
+ out.writeBoolean(error != null); // write error flag
if (error == null) {
value.write(out);
} else {
@@ -1075,13 +1095,20 @@ public abstract class HBaseServer {
WritableUtils.writeString(out, error);
}
+ out.flush(); // finished.. finally.
+
+ long serTime = (System.nanoTime() - nstart);
+
+ if (buf.size() > 5000)
+ LOG.warn("Serialized size: " + buf.size() + " in " + serTime
+ + " ns compressed: " + compress);
+
if (buf.size() > warnResponseSize) {
LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+ StringUtils.humanReadableInt(buf.size()));
}
ByteBuffer bb = buf.getByteBuffer();
- // dont include the size field.
int dataSize = bb.limit() - Bytes.SIZEOF_INT;
bb.rewind();
bb.putInt(dataSize); // overwrite the 0xdeadbeef.
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java
index 3aa55ad..5dcd4a2 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java
@@ -71,6 +71,10 @@ public class NanoProfiler {
static class WriterThread extends Thread {
+ public WriterThread() {
+ this.setDaemon(true);
+ }
+
public void run() {
FileWriter fr;
diff --git a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index befcdaf..ca8de34 100644
--- a/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -434,8 +434,9 @@ public class TestSerialization {
KeyValue kvA = new KeyValue(rowA, famA, qfA, valueA);
KeyValue kvB = new KeyValue(rowB, famB, qfB, valueB);
+ KeyValue kbA1 = new KeyValue(rowA, famB, qfA, valueB);
- Result result1 = new Result(new KeyValue[]{kvA, kvB});
+ Result result1 = new Result(new KeyValue[]{kvA, kbA1});
Result result2 = new Result(new KeyValue[]{kvB});
Result result3 = new Result(new KeyValue[]{kvB});