Index: examples/src/main/java/org/apache/hama/examples/ShortestPathVertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ShortestPathVertex.java (revision 1239446) +++ examples/src/main/java/org/apache/hama/examples/ShortestPathVertex.java (working copy) @@ -20,7 +20,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.BSPPeer; import org.apache.hama.graph.VertexWritable; public final class ShortestPathVertex extends VertexWritable { @@ -74,9 +78,49 @@ super.write(out); out.writeInt(weight); } - + public int compareTo(ShortestPathVertex o) { ShortestPathVertex that = (ShortestPathVertex) o; return this.name.compareTo(that.name); } + + public void compute(List value) throws IOException { + int minDist = this.getCost(); + + for (ShortestPathVertexMessage msg : value) { + if (msg.getData() < minDist) + minDist = msg.getData(); + } + + if (minDist < this.getCost()) { + this.setCost(minDist); + for (ShortestPathVertex e : this.getEdges()) { + sendMessage(e, minDist + e.getWeight()); + } + } + } + + private void sendMessage(ShortestPathVertex e, int minDist) + throws IOException { + String target = peer.getPeerName(Math.abs((e.hashCode() % peer + .getAllPeerNames().length))); + + peer.send(target, new ShortestPathVertexMessage(e, minDist)); + } + + ShortestPathVertex[] edges; + BSPPeer peer; + + public void setEdges(ShortestPathVertex[] shortestPathVertexs) { + edges = shortestPathVertexs; + } + + public ShortestPathVertex[] getEdges() { + return edges; + } + + public void setPeerInstance( + BSPPeer peer) { + this.peer = peer; + } } Index: examples/src/main/java/org/apache/hama/examples/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (revision 1239446) +++ examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (working copy) @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; @@ -64,7 +65,9 @@ int updatesMade = 0; int globalUpdateCounts = 0; ShortestPathVertexMessage msg = null; - Deque updatedQueue = new LinkedList(); + + Map> msgMap = new HashMap>(); + while ((msg = (ShortestPathVertexMessage) peer.getCurrentMessage()) != null) { if (msg.getTag().getName().startsWith(FLAG_MESSAGE)) { if (msg.getData() == Integer.MIN_VALUE) { @@ -76,15 +79,25 @@ int index = Collections.binarySearch(vertexLookup, msg.getTag()); ShortestPathVertex vertex = vertexLookup.get(index); - // check if we need an distance update - if (vertex.getCost() > msg.getData()) { - updatesMade++; - updatedQueue.add(vertex); - vertex.setCost(msg.getData()); + if(msgMap.containsKey(vertex)) { + List msgs = msgMap.get(vertex); + msgs.add(msg); + msgMap.put(vertex, msgs); + } else { + List msgs = new ArrayList(); + msgs.add(msg); + msgMap.put(vertex, msgs); } } } + for(Map.Entry> e : msgMap.entrySet()) { + ShortestPathVertex v = e.getKey(); + v.setEdges(adjacencyList.get(v)); + v.setPeerInstance(peer); + v.compute(e.getValue()); + } + if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask) && peer.getSuperstepCount() > 1) { for (String peerName : peer.getAllPeerNames()) { @@ -99,10 +112,6 @@ new ShortestPathVertex((int) peer.getSuperstepCount(), FLAG_MESSAGE + peer.getPeerName()), updatesMade)); - for (ShortestPathVertex vertex : updatedQueue) { - sendMessageToNeighbors(peer, vertex); - } - peer.sync(); } }