Index: src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1348716) +++ src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -259,33 +258,39 @@ } int messagesSize = messages.size(); - Iterator>> iterator = messages.entrySet() - .iterator(); - while (iterator.hasNext()) { - Entry> e = iterator.next(); - LinkedList msgs = e.getValue(); - if (combiner != null) { - M combined = combiner.combine(msgs); + + for (Vertex vertex : vertices.values()) { + LinkedList msgs = messages.get(vertex.getVertexID()); + if (vertex.isHalted() && msgs != null) { + vertex.votedToHalt = false; + } + if (msgs == null) { msgs = new LinkedList(); - msgs.add(combined); } - Vertex vertex = vertices.get(e.getKey()); - M lastValue = vertex.getValue(); - vertex.compute(msgs.iterator()); - if (aggregators != null) { - if (this.aggregators != null) { - for (int i = 0; i < this.aggregators.length; i++) { - Aggregator> aggregator = this.aggregators[i]; - aggregator.aggregate(vertex, vertex.getValue()); - if (isAbstractAggregator[i]) { - AbstractAggregator> intern = ((AbstractAggregator>) aggregator); - intern.aggregate(vertex, lastValue, vertex.getValue()); - intern.aggregateInternal(); + + if (!vertex.isHalted()) { + if (combiner != null) { + M combined = combiner.combine(msgs); + msgs = new LinkedList(); + msgs.add(combined); + } + M lastValue = vertex.getValue(); + vertex.compute(msgs.iterator()); + + if (aggregators != null) { + if (this.aggregators != null) { + for (int i = 0; i < this.aggregators.length; i++) { + Aggregator> aggregator = this.aggregators[i]; + aggregator.aggregate(vertex, vertex.getValue()); + if (isAbstractAggregator[i]) { + AbstractAggregator> intern = ((AbstractAggregator>) aggregator); + intern.aggregate(vertex, lastValue, vertex.getValue()); + intern.aggregateInternal(); + } } } } } - iterator.remove(); } runAggregators(peer, messagesSize); @@ -476,11 +481,11 @@ /** * @return a new vertex instance */ - public static Vertex newVertexInstance( + public static Vertex newVertexInstance( Class vertexClass, Configuration conf) { @SuppressWarnings("unchecked") - Vertex vertex = (Vertex) ReflectionUtils - .newInstance(vertexClass, conf); + Vertex vertex = (Vertex) ReflectionUtils.newInstance( + vertexClass, conf); return vertex; } Index: src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- src/main/java/org/apache/hama/graph/Vertex.java (revision 1348716) +++ src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -35,6 +35,8 @@ protected GraphJobRunner runner; private BSPPeer peer; private List> edges; + + protected boolean votedToHalt = false; public Configuration getConf() { return peer.getConfiguration(); @@ -163,6 +165,15 @@ } @Override + public void voteToHalt() { + this.votedToHalt = true; + } + + public boolean isHalted() { + return votedToHalt; + } + + @Override public int hashCode() { final int prime = 31; int result = 1; Index: src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1348716) +++ src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -81,6 +81,11 @@ public long getSuperstepCount(); /** + * Vote to halt. + */ + public void voteToHalt(); + + /** * Sets the vertex value */ public void setValue(M value);