Index: conf/hama-default.xml
===================================================================
--- conf/hama-default.xml (revision 1618812)
+++ conf/hama-default.xml (working copy)
@@ -284,6 +284,11 @@
+ hama.messenger.runtime.compression
+ false
+ True if you want to enable runtime compression
+
+
hama.messenger.compression.class
org.apache.hama.bsp.message.compress.SnappyCompressor
The message compression algorithm to choose. Default is null.
Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (revision 1618812)
+++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (working copy)
@@ -87,17 +87,20 @@
try {
serialized = serialize(message);
- if (compressor != null && serialized.length > threshold) {
- bufferDos.writeBoolean(true);
- compressed = compressor.compress(serialized);
- bufferDos.writeInt(compressed.length);
- bufferDos.write(compressed);
-
- bundleLength += compressed.length;
+ if (compressor != null) {
+ if (serialized.length > threshold) {
+ bufferDos.writeBoolean(true);
+ compressed = compressor.compress(serialized);
+ bufferDos.writeInt(compressed.length);
+ bufferDos.write(compressed);
+ bundleLength += compressed.length;
+ } else {
+ bufferDos.writeBoolean(false);
+ bufferDos.write(serialized);
+ bundleLength += serialized.length;
+ }
} else {
- bufferDos.writeBoolean(false);
bufferDos.write(serialized);
-
bundleLength += serialized.length;
}
} catch (IOException e) {
@@ -114,7 +117,7 @@
public byte[] getBuffer() {
return byteBuffer.toByteArray();
}
-
+
public Iterator iterator() {
bis = new ByteArrayInputStream(byteBuffer.toByteArray());
dis = new DataInputStream(bis);
@@ -140,10 +143,13 @@
@Override
public M next() {
boolean isCompressed = false;
- try {
- isCompressed = dis.readBoolean();
- } catch (IOException e1) {
- e1.printStackTrace();
+
+ if (compressor != null) {
+ try {
+ isCompressed = dis.readBoolean();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
}
Class clazz = null;
@@ -152,10 +158,12 @@
} catch (ClassNotFoundException e) {
LOG.error("Class was not found.", e);
}
+
msg = ReflectionUtils.newInstance(clazz, null);
try {
if (isCompressed) {
+ // LOG.debug(">>>>> decompressing .........");
int length = dis.readInt();
compressed = new byte[length];
dis.readFully(compressed);
@@ -215,7 +223,7 @@
@Override
public void readFields(DataInput in) throws IOException {
this.bundleSize = in.readInt();
-
+
if (this.bundleSize > 0) {
className = in.readUTF();
int bytesLength = in.readInt();
Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1618812)
+++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy)
@@ -353,8 +353,11 @@
throws IOException {
peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 512));
+
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 512));
+ }
Iterator it = bundle.iterator();
while (it.hasNext()) {
Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1618812)
+++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy)
@@ -279,8 +279,10 @@
@Override
public void loopBackBundle(BSPMessageBundle bundle) throws IOException {
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
Iterator extends Writable> it = bundle.iterator();
while (it.hasNext()) {
Index: core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
===================================================================
--- core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (revision 1618812)
+++ core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (working copy)
@@ -86,8 +86,10 @@
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle bundle = new BSPMessageBundle();
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;
Index: core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
===================================================================
--- core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (revision 1618812)
+++ core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (working copy)
@@ -50,7 +50,8 @@
public TestBSPMasterGroomServer() {
configuration = new HamaConfiguration();
configuration.set("bsp.master.address", "localhost");
- configuration.set("hama.child.redirect.log.console", "true");
+ configuration.setBoolean("hama.child.redirect.log.console", true);
+ configuration.setBoolean("hama.messenger.runtime.compression", true);
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", "/tmp/hama-test");
Index: graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (revision 1618812)
+++ graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (working copy)
@@ -119,7 +119,10 @@
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle bundle = new BSPMessageBundle();
- bundle.setCompressor(compressor, conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;