Index: conf/hama-default.xml =================================================================== --- conf/hama-default.xml (revision 1618812) +++ conf/hama-default.xml (working copy) @@ -284,6 +284,11 @@ + hama.messenger.runtime.compression + false + True if you want to enable runtime compression + + hama.messenger.compression.class org.apache.hama.bsp.message.compress.SnappyCompressor The message compression algorithm to choose. Default is null. Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (revision 1618812) +++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (working copy) @@ -87,17 +87,20 @@ try { serialized = serialize(message); - if (compressor != null && serialized.length > threshold) { - bufferDos.writeBoolean(true); - compressed = compressor.compress(serialized); - bufferDos.writeInt(compressed.length); - bufferDos.write(compressed); - - bundleLength += compressed.length; + if (compressor != null) { + if (serialized.length > threshold) { + bufferDos.writeBoolean(true); + compressed = compressor.compress(serialized); + bufferDos.writeInt(compressed.length); + bufferDos.write(compressed); + bundleLength += compressed.length; + } else { + bufferDos.writeBoolean(false); + bufferDos.write(serialized); + bundleLength += serialized.length; + } } else { - bufferDos.writeBoolean(false); bufferDos.write(serialized); - bundleLength += serialized.length; } } catch (IOException e) { @@ -114,7 +117,7 @@ public byte[] getBuffer() { return byteBuffer.toByteArray(); } - + public Iterator iterator() { bis = new ByteArrayInputStream(byteBuffer.toByteArray()); dis = new DataInputStream(bis); @@ -140,10 +143,13 @@ @Override public M next() { boolean isCompressed = false; - try { - isCompressed = dis.readBoolean(); - } catch (IOException e1) { - e1.printStackTrace(); + + if (compressor != null) { + try { + isCompressed = dis.readBoolean(); + } catch (IOException e1) { + e1.printStackTrace(); + } } Class clazz = null; @@ -152,10 +158,12 @@ } catch (ClassNotFoundException e) { LOG.error("Class was not found.", e); } + msg = ReflectionUtils.newInstance(clazz, null); try { if (isCompressed) { + // LOG.debug(">>>>> decompressing ........."); int length = dis.readInt(); compressed = new byte[length]; dis.readFully(compressed); @@ -215,7 +223,7 @@ @Override public void readFields(DataInput in) throws IOException { this.bundleSize = in.readInt(); - + if (this.bundleSize > 0) { className = in.readUTF(); int bytesLength = in.readInt(); Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1618812) +++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -353,8 +353,11 @@ throws IOException { peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, bundle.getLength()); - bundle.setCompressor(compressor, - conf.getLong("hama.messenger.compression.threshold", 512)); + + if (conf.getBoolean("hama.messenger.runtime.compression", false)) { + bundle.setCompressor(compressor, + conf.getLong("hama.messenger.compression.threshold", 512)); + } Iterator it = bundle.iterator(); while (it.hasNext()) { Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1618812) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -279,8 +279,10 @@ @Override public void loopBackBundle(BSPMessageBundle bundle) throws IOException { - bundle.setCompressor(compressor, - conf.getLong("hama.messenger.compression.threshold", 128)); + if (conf.getBoolean("hama.messenger.runtime.compression", false)) { + bundle.setCompressor(compressor, + conf.getLong("hama.messenger.compression.threshold", 128)); + } Iterator it = bundle.iterator(); while (it.hasNext()) { Index: core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (revision 1618812) +++ core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (working copy) @@ -86,8 +86,10 @@ if (!outgoingBundles.containsKey(targetPeerAddress)) { BSPMessageBundle bundle = new BSPMessageBundle(); - bundle.setCompressor(compressor, - conf.getLong("hama.messenger.compression.threshold", 128)); + if (conf.getBoolean("hama.messenger.runtime.compression", false)) { + bundle.setCompressor(compressor, + conf.getLong("hama.messenger.compression.threshold", 128)); + } outgoingBundles.put(targetPeerAddress, bundle); } return targetPeerAddress; Index: core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (revision 1618812) +++ core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (working copy) @@ -50,7 +50,8 @@ public TestBSPMasterGroomServer() { configuration = new HamaConfiguration(); configuration.set("bsp.master.address", "localhost"); - configuration.set("hama.child.redirect.log.console", "true"); + configuration.setBoolean("hama.child.redirect.log.console", true); + configuration.setBoolean("hama.messenger.runtime.compression", true); assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); configuration.set("bsp.local.dir", "/tmp/hama-test"); Index: graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (revision 1618812) +++ graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (working copy) @@ -119,7 +119,10 @@ if (!outgoingBundles.containsKey(targetPeerAddress)) { BSPMessageBundle bundle = new BSPMessageBundle(); - bundle.setCompressor(compressor, conf.getLong("hama.messenger.compression.threshold", 128)); + if (conf.getBoolean("hama.messenger.runtime.compression", false)) { + bundle.setCompressor(compressor, + conf.getLong("hama.messenger.compression.threshold", 128)); + } outgoingBundles.put(targetPeerAddress, bundle); } return targetPeerAddress;