Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1102151) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hama.Constants; +import org.apache.hama.util.Bytes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; Index: src/java/org/apache/hama/bsp/ByteMessage.java =================================================================== --- src/java/org/apache/hama/bsp/ByteMessage.java (revision 1102151) +++ src/java/org/apache/hama/bsp/ByteMessage.java (working copy) @@ -32,22 +32,18 @@ public ByteMessage(byte[] tag, byte[] data) { super(); - this.tag = new byte[tag.length]; - this.data = new byte[data.length]; - System.arraycopy(tag, 0, this.tag, 0, tag.length); - System.arraycopy(data, 0, this.data, 0, data.length); + this.tag = tag; + this.data = data; } @Override public byte[] getTag() { - byte[] result = this.tag; - return result; + return this.tag; } @Override public byte[] getData() { - byte[] result = this.data; - return result; + return this.data; } @Override Index: src/test/org/apache/hama/bsp/TestBSPPeer.java =================================================================== --- src/test/org/apache/hama/bsp/TestBSPPeer.java (revision 1102151) +++ src/test/org/apache/hama/bsp/TestBSPPeer.java (working copy) @@ -47,7 +47,7 @@ public class TestBSPPeer extends HamaCluster implements Watcher { private Log LOG = LogFactory.getLog(TestBSPPeer.class); - private static final int NUM_PEER = 10; + private static final int NUM_PEER = 3; private static final int ROUND = 3; private static final int PAYLOAD = 1024; // 1kb in default List list = new ArrayList(NUM_PEER); @@ -106,15 +106,14 @@ public void runTest() throws AssertionFailedError { int randomTime; byte[] dummyData = new byte[PAYLOAD]; - BSPMessage msg = null; for (int i = 0; i < ROUND; i++) { randomTime = r.nextInt(MAXIMUM_DURATION) + 5; - for (int j = 0; j < 10; j++) { - r.nextBytes(dummyData); - msg = new ByteMessage(Bytes.tail(dummyData, 128), dummyData); + for (int j = 0; j < 3; j++) { String peerName = "localhost:" + (30000 + j); + ByteMessage msg = new ByteMessage(Bytes.tail(dummyData, 128), + dummyData); try { peer.send(peerName, msg); } catch (IOException e) { @@ -149,8 +148,8 @@ LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages + " messages at " + round + " round"); - if (lastTwoDigitsOfPort < 10) { - assertEquals(10, numMessages); + if (lastTwoDigitsOfPort < 3) { + assertEquals(3, numMessages); } else { assertEquals(0, numMessages); } Index: src/test/org/apache/hama/bsp/TestMessages.java =================================================================== --- src/test/org/apache/hama/bsp/TestMessages.java (revision 0) +++ src/test/org/apache/hama/bsp/TestMessages.java (revision 0) @@ -0,0 +1,21 @@ +package org.apache.hama.bsp; + +import junit.framework.TestCase; + +import org.apache.hama.util.Bytes; + +public class TestMessages extends TestCase { + + public void testByteMessage() { + int dataSize = (int) (Runtime.getRuntime().maxMemory() * 0.60); + ByteMessage msg = new ByteMessage(Bytes.toBytes("tag"), new byte[dataSize]); + assertEquals(msg.getData().length, dataSize); + msg = null; + + byte[] dummyData = new byte[1024]; + ByteMessage msg2 = new ByteMessage(Bytes.tail(dummyData, 128), dummyData); + assertEquals( + Bytes.compareTo(msg2.getTag(), 0, 128, msg2.getData(), + msg2.getData().length - 128, 128), 0); + } +}