Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (revision 1406464) +++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (working copy) @@ -17,13 +17,16 @@ */ package org.apache.hama.bsp; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; @@ -58,6 +61,7 @@ LinkedList list = new LinkedList(); list.add(message); messages.put(className, list); + list = null; } else { messages.get(className).add(message); } @@ -73,6 +77,42 @@ return mergeList; } + /** + * @return the approximate size of bundle object + * @throws IOException + */ + public long getApproximateSize() throws IOException { + int sample = 20; + int sum = 0; + int totalMsgs = 0; + int classNames = 0; + DataOutputStream dos = null; + + for (Map.Entry> e : messages.entrySet()) { + classNames += e.getKey().length(); + LinkedList c = e.getValue(); + + if (messages.size() == 1 && c.size() < sample) { + dos = new DataOutputStream(new ByteArrayOutputStream()); + write(dos); + dos.close(); + return dos.size(); + } + + totalMsgs += c.size(); + for (int i = 0; i < sample; i++) { + int idx = (int) (Math.random() * (c.size() - 1)); + dos = new DataOutputStream(new ByteArrayOutputStream()); + c.get(idx).write(dos); + dos.close(); + sum += dos.size(); + } + } + + int avgSize = sum / (sample * messages.size()); + return (totalMsgs * avgSize) + classNames + 4; + } + @Override public void write(DataOutput out) throws IOException { // writes the k/v mapping size Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1406464) +++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.bsp.message.compress.BSPCompressedBundle; import org.apache.hama.ipc.HamaRPCProtocolVersion; -import org.apache.hama.util.CompressionUtil; import org.apache.hama.util.LRUCache; /** @@ -102,18 +101,12 @@ + " to transfer messages to!"); } else { if (compressor != null) { - float bundleSize = CompressionUtil.getBundleSize(bundle); - if (bundleSize > conf.getLong("hama.messenger.compression.threshold", - 1048576)) { + if (bundle.getApproximateSize() > conf.getLong( + "hama.messenger.compression.threshold", 1048576)) { BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle); - if (CompressionUtil.getCompressionRatio( - (float) compMsgBundle.getData().length, bundleSize) < 1.0) { - bspPeerConnection.put(compMsgBundle); - } else { - bspPeerConnection.put(bundle); - } + bspPeerConnection.put(compMsgBundle); } else { - bspPeerConnection.put(bundle); + bspPeerConnection.put(bundle); } } else { bspPeerConnection.put(bundle); Index: core/src/main/java/org/apache/hama/util/CompressionUtil.java =================================================================== --- core/src/main/java/org/apache/hama/util/CompressionUtil.java (revision 1406464) +++ core/src/main/java/org/apache/hama/util/CompressionUtil.java (working copy) @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.util; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hama.bsp.BSPMessageBundle; - -public class CompressionUtil { - - /** - * Calculates the compression ratio. A compression ratio of less than 1 is - * desirable. - * - * @param compressedSize - * @param bundleSize - * @return the compression ratio - * @throws IOException - */ - public static float getCompressionRatio(float compressedSize, float bundleSize) - throws IOException { - return (compressedSize / bundleSize); - } - - public static float getBundleSize(BSPMessageBundle bundle) - throws IOException { - DataOutputStream dos = new DataOutputStream(new ByteArrayOutputStream()); - bundle.write(dos); - dos.close(); - - return dos.size(); - } -} Index: core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (revision 1406464) +++ core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (working copy) @@ -28,10 +28,26 @@ import junit.framework.TestCase; -import org.apache.hadoop.io.BytesWritable;; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; public class TestBSPMessageBundle extends TestCase { + public void testApproximateSize() throws IOException { + BSPMessageBundle bundle = new BSPMessageBundle(); + for (int i = 0; i < 100; i++) { + bundle.addMessage(new IntWritable(i)); + } + + assertTrue(bundle.getApproximateSize() > 400 + && bundle.getApproximateSize() < 500); + + bundle = new BSPMessageBundle(); + bundle.addMessage(new IntWritable(1)); + assertTrue(bundle.getApproximateSize() > 40 + && bundle.getApproximateSize() < 50); + } + public void testEmpty() throws IOException { BSPMessageBundle bundle = new BSPMessageBundle(); // Serialize it.