Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1561506) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -45,7 +45,7 @@ /** * Fully generic graph job runner. - * + * * @param the id type of a vertex. * @param the value type of an edge. * @param the value type of a vertex. @@ -177,20 +177,8 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { - if (isMasterTask(peer) && iteration == 1) { - MapWritable updatedCnt = new MapWritable(); - updatedCnt.put( - FLAG_VERTEX_TOTAL_VERTICES, - new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) - .getCounter()))); - // send the updates from the master tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); - } - } - // this is only done in every second iteration - if (isMasterTask(peer) && iteration > 1) { + if (isMasterTask(peer)) { MapWritable updatedCnt = new MapWritable(); // send total number of vertices. updatedCnt.put( @@ -208,7 +196,7 @@ peer.send(peerName, new GraphJobMessage(updatedCnt)); } } - if (getAggregationRunner().isEnabled() && iteration > 1) { + if (getAggregationRunner().isEnabled()) { // in case we need to sync, we need to replay the messages that already // are added to the queue. This prevents loosing messages when using // aggregators. @@ -465,7 +453,7 @@ /** * Add new vertex into memory of each peer. - * + * * @throws IOException */ private void addVertex(Vertex vertex) throws IOException { @@ -483,7 +471,7 @@ /** * Remove vertex from this peer. - * + * * @throws IOException */ private void removeVertex(V vertexID) { @@ -494,7 +482,7 @@ /** * After all inserts are done, we must finalize the VertexInfo data structure. - * + * * @throws IOException */ private void finishAdditions() throws IOException { @@ -505,7 +493,7 @@ /** * After all inserts are done, we must finalize the VertexInfo data structure. - * + * * @throws IOException */ private void finishRemovals() throws IOException { @@ -542,7 +530,7 @@ /** * Parses the messages in every superstep and does actions according to flags * in the messages. - * + * * @return the first vertex message, null if none received. */ @SuppressWarnings("unchecked") @@ -647,7 +635,7 @@ /** * Gets the last aggregated value at the given index. The index is dependend * on how the aggregators were configured during job setup phase. - * + * * @return the value of the aggregator, or null if none was defined. */ public final Writable getLastAggregatedValue(int index) { @@ -657,7 +645,7 @@ /** * Gets the last aggregated number of vertices at the given index. The index * is dependend on how the aggregators were configured during job setup phase. - * + * * @return the value of the aggregator, or null if none was defined. */ public final IntWritable getNumLastAggregatedVertices(int index) {