Index: examples/src/main/java/org/apache/hama/examples/DynamicGraph.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (revision 1519376) +++ examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (working copy) @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -50,10 +51,12 @@ * 4 * * Output example: - * sum 10 + * sum 12 + * (we also add the number of vertices that exist + * in the last superstep from two different methods) */ public class DynamicGraph { - + public static class GraphTextReader extends VertexInputReader { @@ -93,6 +96,7 @@ s += i.get(); } s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter(); + s += this.getNumVertices(); this.setValue(new IntWritable(this.getValue().get() +s)); } else { throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); Index: examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (revision 1519376) +++ examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (working copy) @@ -56,7 +56,7 @@ while ((line = reader.readLine()) != null) { String[] split = line.split("\t"); assertTrue(split[0].equals("sum")); - assertTrue(split[1].equals("11")); + assertTrue(split[1].equals("12")); System.out.println(split[0] + " : " + split[1]); } } Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1519376) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -40,6 +39,11 @@ @SuppressWarnings("rawtypes") public final class AggregationRunner { + //make sure that these values don't collide with the vertex names + private static final String S_FLAG_PEER_AGGREGATOR_VALUE = "hama.2"; + private static final String S_FLAG_AGGREGATOR_VALUE = "hama.3"; + private static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.4"; + // multiple aggregator arrays private Aggregator>[] aggregators; private Writable[] globalAggregatorResult; @@ -51,12 +55,18 @@ // aggregator on the master side private Aggregator>[] masterAggregator; + private PeerAggregator[] peerAggregators; + private Text[] peerAggregatorValueFlag; + + private GraphJobRunner graphJobRunner; + private boolean enabled = false; private Configuration conf; @SuppressWarnings("unchecked") public void setupAggregators( - BSPPeer peer) { + BSPPeer peer, + GraphJobRunner graphJobRunner) { this.conf = peer.getConfiguration(); String aggregatorClasses = peer.getConfiguration().get( GraphJob.AGGREGATOR_CLASS_ATTR); @@ -75,10 +85,8 @@ } for (int i = 0; i < aggregatorClassNames.length; i++) { aggregators[i] = getNewAggregator(aggregatorClassNames[i]); - aggregatorValueFlag[i] = new Text( - GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i); - aggregatorIncrementFlag[i] = new Text( - GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i); + aggregatorValueFlag[i] = new Text(S_FLAG_AGGREGATOR_VALUE + ";" + i); + aggregatorIncrementFlag[i] = new Text(S_FLAG_AGGREGATOR_INCREMENT + ";" + i); if (aggregators[i] instanceof AbstractAggregator) { isAbstractAggregator[i] = true; } @@ -87,23 +95,28 @@ } } } + + this.graphJobRunner = graphJobRunner; + + peerAggregatorValueFlag = new Text[2]; + peerAggregatorValueFlag[0] = new Text(S_FLAG_PEER_AGGREGATOR_VALUE + ";0"); + peerAggregatorValueFlag[1] = new Text(S_FLAG_PEER_AGGREGATOR_VALUE + ";1"); + + peerAggregators = new PeerAggregator[2]; + peerAggregators[0] = new FinishPeerAggregator(); + peerAggregators[1] = new CountVerticesPeerAggregator(); + } /** * Runs the aggregators by sending their values to the master task. - * @param changedVertexCnt */ public void sendAggregatorValues( - BSPPeer peer, - int activeVertices, int changedVertexCnt) throws IOException { - // send msgCounts to the master task + BSPPeer peer) throws IOException { + MapWritable updatedCnt = new MapWritable(); - updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable( - activeVertices)); - // send total number of vertices changes - updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable( - changedVertexCnt)); - // also send aggregated values to the master + + // send aggregated values to the master if (aggregators != null) { for (int i = 0; i < this.aggregators.length; i++) { updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); @@ -121,6 +134,15 @@ } } } + // send peer values to master + if (peerAggregators != null) { + for (int i = 0; i < peerAggregators.length; i++) { + updatedCnt.put(peerAggregatorValueFlag[i], peerAggregators[i].getPeerValue(this.graphJobRunner)); + // we reset peer aggregators from previous content + peerAggregators[i].reset(); + } + } + peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage( updatedCnt)); } @@ -147,10 +169,11 @@ } /** - * The method the master task does, it globally aggregates the values of each - * peer and updates the given map accordingly. + * Get globally aggregated values of each peer or vertex. */ - public void doMasterAggregation(MapWritable updatedCnt) { + public MapWritable getMasterValues() { + MapWritable masterValues = new MapWritable(); + if (isEnabled()) { // work through the master aggregators for (int i = 0; i < masterAggregator.length; i++) { @@ -163,34 +186,39 @@ } // this count is usually the times of active // vertices in the graph - updatedCnt.put(aggregatorIncrementFlag[i], + masterValues.put(aggregatorIncrementFlag[i], intern.getTimesAggregated()); } - updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue); + masterValues.put(aggregatorValueFlag[i], lastAggregatedValue); } } + + for (int i = 0; i < peerAggregators.length; i++) { + masterValues.put(peerAggregatorValueFlag[i], peerAggregators[i].getValue()); + } + + return masterValues; } /** * Receives aggregated values from a master task. - * - * @return always true if no aggregators are defined, false if aggregators say - * we haven't seen any messages anymore. */ - public boolean receiveAggregatedValues(MapWritable updatedValues, - long iteration) throws IOException, SyncException, InterruptedException { + public void receiveAggregatedValues(MapWritable updatedValues) + throws IOException, SyncException, InterruptedException { // map is the first value that is in the queue - for (int i = 0; i < aggregators.length; i++) { - globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); - globalAggregatorIncrement[i] = (IntWritable) updatedValues - .get(aggregatorIncrementFlag[i]); + if (isEnabled()) { + for (int i = 0; i < aggregators.length; i++) { + globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); + globalAggregatorIncrement[i] = (IntWritable) updatedValues + .get(aggregatorIncrementFlag[i]); + } } - IntWritable count = (IntWritable) updatedValues - .get(GraphJobRunner.FLAG_MESSAGE_COUNTS); - if (count != null && count.get() == Integer.MIN_VALUE) { - return false; + + for (int i = 0; i < peerAggregators.length; i++) { + PeerAggregator peerAggregator = this.peerAggregators[i]; + peerAggregator.setPeer(this.graphJobRunner, updatedValues.get(peerAggregatorValueFlag[i])); } - return true; + } /** @@ -204,20 +232,21 @@ /** * Method to let the master read messages from peers and aggregate a value. */ - public void masterReadAggregatedValue(Text textIndex, M value) { +// public void masterReadAggregatedValue(Text textIndex, M value) { + @SuppressWarnings("unchecked") + public void masterAggregate(Text command, Writable value) { + String textIndex = command.toString(); int index = Integer.parseInt(textIndex.toString().split(";")[1]); - masterAggregator[index].aggregate(null, value); - } - /** - * Method to let the master read messages from peers and aggregate the - * incremental value. - */ - public void masterReadAggregatedIncrementalValue(Text textIndex, M value) { - int index = Integer.parseInt(textIndex.toString().split(";")[1]); - if (isAbstractAggregator[index]) { - ((AbstractAggregator>) masterAggregator[index]) + if (textIndex.startsWith(S_FLAG_PEER_AGGREGATOR_VALUE)) { + peerAggregators[index].aggregate(value); + } else if (isEnabled() && textIndex.startsWith(S_FLAG_AGGREGATOR_VALUE)) { + masterAggregator[index].aggregate(null, (M) value); + } else if (isEnabled() && textIndex.startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { + if (isAbstractAggregator[index]) { + ((AbstractAggregator>) masterAggregator[index]) .addTimesAggregated(((IntWritable) value).get()); + } } } Index: graph/src/main/java/org/apache/hama/graph/CountVerticesPeerAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/CountVerticesPeerAggregator.java (revision 0) +++ graph/src/main/java/org/apache/hama/graph/CountVerticesPeerAggregator.java (working copy) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.graph.GraphJobRunner.GraphJobCounter; + +@SuppressWarnings("rawtypes") +public class CountVerticesPeerAggregator implements PeerAggregator { + + private long nVertices = 0; + + @Override + public Writable getPeerValue(GraphJobRunner graphJobRunner) { + return new LongWritable(graphJobRunner.getChangedVertexCnt()); + } + + @Override + public void aggregate(Writable v) { + this.nVertices += ((LongWritable) v).get(); + } + + @Override + public void setPeer(GraphJobRunner graphJobRunner, Writable v) { + graphJobRunner.setNumberVertices(graphJobRunner.getNumberVertices() + ((LongWritable) v).get()); + + if (GraphJobRunner.isMasterTask(graphJobRunner.getPeer())) { + graphJobRunner.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) v).get()); + } + } + + @Override + public Writable getValue() { + return new LongWritable(this.nVertices); + } + + @Override + public void reset() { + this.nVertices = 0; + } + +} Index: graph/src/main/java/org/apache/hama/graph/FinishPeerAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/FinishPeerAggregator.java (revision 0) +++ graph/src/main/java/org/apache/hama/graph/FinishPeerAggregator.java (working copy) @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +@SuppressWarnings("rawtypes") +public class FinishPeerAggregator implements PeerAggregator { + + private int globalUpdates = 0; + + @Override + public Writable getPeerValue(GraphJobRunner graphJobRunner) { + return new IntWritable(graphJobRunner.getActiveVertices()); + } + + @Override + public void aggregate(Writable v) { + this.globalUpdates += ((IntWritable) v).get(); + } + + @Override + public void setPeer(GraphJobRunner graphJobRunner, Writable v) { + if (((IntWritable) v).get() == 0) { + graphJobRunner.setStopCondition(false); + } + } + + @Override + public Writable getValue() { + return new IntWritable(this.globalUpdates); + } + + @Override + public void reset() { + this.globalUpdates = 0; + } + +} Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1519376) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -61,18 +60,10 @@ private static final Log LOG = LogFactory.getLog(GraphJobRunner.class); // make sure that these values don't collide with the vertex names - public static final String S_FLAG_MESSAGE_COUNTS = "hama.0"; - public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1"; - public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2"; - public static final String S_FLAG_VERTEX_INCREASE = "hama.3"; - public static final String S_FLAG_VERTEX_DECREASE = "hama.4"; - public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5"; - public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6"; - public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS); + public static final String S_FLAG_VERTEX_INCREASE = "hama.0"; + public static final String S_FLAG_VERTEX_DECREASE = "hama.1"; public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE); public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE); - public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER); - public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES); public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class"; public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class"; @@ -89,7 +80,7 @@ private VerticesInfo vertices; private boolean updated = true; - private int globalUpdateCounts = 0; + private int activeVertices = 0; private int changedVertexCnt = 0; private long numberVertices = 0; @@ -125,9 +116,7 @@ // we do supersteps while we still have updates and have not reached our // maximum iterations yet while (updated && !((maxIteration > 0) && iteration > maxIteration)) { - // reset the global update counter from our master in every - // superstep - globalUpdateCounts = 0; + peer.sync(); // note that the messages must be parsed here @@ -160,7 +149,9 @@ vertexOutputWriter.setup(conf); IDSkippingIterator skippingIterator = vertices.skippingIterator(); while (skippingIterator.hasNext()) { - vertexOutputWriter.write(skippingIterator.next(), peer); + Vertex v = skippingIterator.next(); + LOG.warn("OUT: " + "K: " + v.getVertexID() + " V: " + v.getValue()); + vertexOutputWriter.write(v, peer); } vertices.cleanup(conf, peer.getTaskId()); } @@ -175,51 +166,41 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { - if (isMasterTask(peer) && iteration == 1) { - MapWritable updatedCnt = new MapWritable(); - updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()))); - // send the updates from the master tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); + if (isMasterTask(peer)) { + MapWritable masterValues = new MapWritable(); + + masterValues = this.aggregationRunner.getMasterValues(); + + if (masterValues.size() > 0) { + // send the updates from the master tasks back to the slaves + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, new GraphJobMessage(masterValues)); + } } } - // this is only done in every second iteration - if (isMasterTask(peer) && iteration > 1) { - MapWritable updatedCnt = new MapWritable(); - // send total number of vertices. - updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()))); - // exit if there's no update made - if (globalUpdateCounts == 0) { - updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); - } else { - aggregationRunner.doMasterAggregation(updatedCnt); - } - // send the updates from the master tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); - } + // in every slave we need to sync, so we replay the messages that already + // are added to the queue. This prevents loosing messages when using + // aggregators. + if (firstVertexMessage != null) { + peer.send(peer.getPeerName(), firstVertexMessage); } - if (aggregationRunner.isEnabled() && iteration > 1) { - // in case we need to sync, we need to replay the messages that already - // are added to the queue. This prevents loosing messages when using - // aggregators. - if (firstVertexMessage != null) { - peer.send(peer.getPeerName(), firstVertexMessage); - } - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - peer.send(peer.getPeerName(), msg); - } - // now sync - peer.sync(); - // now the map message must be read that might be send from the master - updated = aggregationRunner.receiveAggregatedValues(peer - .getCurrentMessage().getMap(), iteration); - // set the first vertex message back to the message it had before sync - firstVertexMessage = peer.getCurrentMessage(); + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + peer.send(peer.getPeerName(), msg); } - return firstVertexMessage; + + // now sync + peer.sync(); + + msg = peer.getCurrentMessage(); + + if (msg != null && msg.isMapMessage()) { + aggregationRunner.receiveAggregatedValues(msg.getMap()); + msg = peer.getCurrentMessage(); + } + + return msg; } /** @@ -230,7 +211,7 @@ private void doSuperstep(GraphJobMessage currentMessage, BSPPeer peer) throws IOException { - int activeVertices = 0; + this.activeVertices = 0; this.changedVertexCnt = 0; vertices.startSuperstep(); /* @@ -268,7 +249,7 @@ currentMessage = iterable.getOverflowMessage(); } aggregationRunner.aggregateVertex(lastValue, vertex); - activeVertices++; + this.activeVertices++; } // note that we even need to rewrite the vertex if it is halted for @@ -277,7 +258,7 @@ } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); + aggregationRunner.sendAggregatorValues(peer); iteration++; } @@ -339,10 +320,11 @@ M lastValue = vertex.getValue(); vertex.compute(Collections.singleton(vertex.getValue())); aggregationRunner.aggregateVertex(lastValue, vertex); + this.activeVertices++; vertices.finishVertexComputation(vertex); } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt); + aggregationRunner.sendAggregatorValues(peer); iteration++; } @@ -377,7 +359,7 @@ .newInstance(outputWriter); aggregationRunner = new AggregationRunner(); - aggregationRunner.setupAggregators(peer); + aggregationRunner.setupAggregators(peer, this); Class> verticesInfoClass = (Class>) conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class, VerticesInfo.class); vertices = ReflectionUtils.newInstance(verticesInfoClass); @@ -542,35 +524,15 @@ break; } else if (msg.isMapMessage()) { for (Entry e : msg.getMap().entrySet()) { - Text vertexID = (Text) e.getKey(); - if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { - if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { - updated = false; - } else { - globalUpdateCounts += ((IntWritable) e.getValue()).get(); - } - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { - aggregationRunner.masterReadAggregatedValue(vertexID, - (M) e.getValue()); - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { - aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, - (M) e.getValue()); - } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { + Text command = (Text) e.getKey(); + if (FLAG_VERTEX_INCREASE.equals(command)) { dynamicAdditions = true; addVertex((Vertex) e.getValue()); - } else if (FLAG_VERTEX_DECREASE.equals(vertexID)) { + } else if (FLAG_VERTEX_DECREASE.equals(command)) { dynamicRemovals = true; removeVertex((V) e.getValue()); - } else if (FLAG_VERTEX_TOTAL_VERTICES.equals(vertexID)) { - this.numberVertices = ((LongWritable) e.getValue()).get(); - } else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) { - if (isMasterTask(peer)) { - peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) e.getValue()).get()); - } else { - throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer); - } + } else { + this.aggregationRunner.masterAggregate(command, e.getValue()); } } @@ -599,6 +561,13 @@ } /** + * @return the number of added or deleted vertices in the current superstep for this peer. + */ + public int getChangedVertexCnt() { + return changedVertexCnt; + } + + /** * @return the current number of iterations. */ public final long getNumberIterations() { @@ -613,6 +582,13 @@ } /** + * @return the number of vertices on this peer only. + */ + public final int getPeerVerteces() { + return this.vertices.size(); + } + + /** * @return the defined partitioner instance. */ public final Partitioner getPartitioner() { @@ -647,6 +623,34 @@ } /** + * @return the peer instance. + */ + public final int getActiveVertices() { + return this.activeVertices; + } + + /** + * Set the number of added or deleted vertices. + */ + public final void setChangedVertexCnt(int changedVertexCnt) { + this.changedVertexCnt = changedVertexCnt; + } + + /** + * Set the number of vertices, globally aggregated. + */ + public final void setNumberVertices(long nv) { + this.numberVertices = nv; + } + + /** + * Set to false to stop the running job. + */ + public final void setStopCondition(boolean s) { + this.updated = s; + } + + /** * Checks if this is a master task. The master task is the first peer in the * peer array. */ @@ -699,12 +703,4 @@ return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS); } - public int getChangedVertexCnt() { - return changedVertexCnt; - } - - public void setChangedVertexCnt(int changedVertexCnt) { - this.changedVertexCnt = changedVertexCnt; - } - } Index: graph/src/main/java/org/apache/hama/graph/PeerAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/PeerAggregator.java (revision 0) +++ graph/src/main/java/org/apache/hama/graph/PeerAggregator.java (working copy) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +@SuppressWarnings("rawtypes") +public interface PeerAggregator { + + public Writable getPeerValue(GraphJobRunner graphJobRunner); + + public void aggregate(Writable v); + + public Writable getValue(); + + public void setPeer(GraphJobRunner graphJobRunner, Writable v); + + public void reset(); +} Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1519376) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;