Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (revision 1069189) +++ src/examples/org/apache/hama/examples/PiEstimator.java (working copy) @@ -64,12 +64,14 @@ BSPMessage estimate = new BSPMessage(tagName, myData); bspPeer.send(masterTask, estimate); + LOG.info("Send message:" + System.currentTimeMillis()); bspPeer.sync(); double pi = 0.0; BSPMessage received; while ((received = bspPeer.getCurrentMessage()) != null) { - LOG.info("Receives messages:" + Bytes.toDouble(received.getData())); + LOG.info("Receive messages:" + Bytes.toDouble(received.getData()) + + " from " + Bytes.toString(received.getTag())); if (pi == 0.0) { pi = Bytes.toDouble(received.getData()); } else { Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1069189) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -140,12 +140,12 @@ @Override public void send(String peerName, BSPMessage msg) throws IOException { LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName); - ConcurrentLinkedQueue queue = outgoingQueues.get(peerName); + ConcurrentLinkedQueue queue = outgoingQueues.get(getAddress(peerName)); if (queue == null) { queue = new ConcurrentLinkedQueue(); - outgoingQueues.put(getAddress(peerName), queue); } queue.add(msg); + outgoingQueues.put(getAddress(peerName), queue); } /*