Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1560617) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -71,13 +71,15 @@ } double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); + aggregate(0, this.getValue()); } // if we have not reached our global error yet, then proceed. - DoubleWritable globalError = getLastAggregatedValue(0); - + DoubleWritable globalError = getAggregatedValue(0); + if (globalError != null && this.getSuperstepCount() > 2 && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { + System.out.println(globalError); voteToHalt(); } else { // in each superstep we are going to send a new rank to our neighbours @@ -126,7 +128,7 @@ // error pageJob.setAggregatorClass(AverageAggregator.class); - + // Vertex reader pageJob.setVertexInputReaderClass(PagerankSeqReader.class); Index: graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java (working copy) @@ -25,13 +25,12 @@ * (sums them up) them. */ public class AbsDiffAggregator extends - AbstractAggregator> { + AbstractAggregator { double absoluteDifference = 0.0d; @Override - public void aggregate(Vertex v, - DoubleWritable oldValue, DoubleWritable newValue) { + public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) { // make sure it's nullsafe if (oldValue != null) { absoluteDifference += Math.abs(oldValue.get() - newValue.get()); @@ -41,8 +40,7 @@ // when a master aggregates he aggregated values, he calls this, so let's just // sum up here. @Override - public void aggregate(Vertex vertex, - DoubleWritable value) { + public void aggregate(DoubleWritable value) { absoluteDifference += value.get(); } Index: graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (working copy) @@ -26,8 +26,8 @@ * For tracking cases it increments an internal counter on each call of * aggregate. */ -public abstract class AbstractAggregator> - implements Aggregator { +public abstract class AbstractAggregator + implements Aggregator { private int timesAggregated = 0; @@ -52,7 +52,7 @@ * this will always be null. */ @Override - public void aggregate(VERTEX vertex, M value) { + public void aggregate(M value) { } @@ -62,7 +62,7 @@ * implementation in this class.Please make sure that you are null-checking * vertex, since on a master task this will always be null. */ - public void aggregate(VERTEX vertex, M oldValue, M newValue) { + public void aggregate(M oldValue, M newValue) { } Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -41,7 +41,7 @@ public final class AggregationRunner { // multiple aggregator arrays - private Aggregator>[] aggregators; + private Aggregator[] aggregators; private Writable[] globalAggregatorResult; private IntWritable[] globalAggregatorIncrement; private boolean[] isAbstractAggregator; @@ -49,7 +49,7 @@ private Text[] aggregatorValueFlag; private Text[] aggregatorIncrementFlag; // aggregator on the master side - private Aggregator>[] masterAggregator; + private Aggregator[] masterAggregator; private boolean enabled = false; private Configuration conf; @@ -110,8 +110,7 @@ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); if (isAbstractAggregator[i]) { updatedCnt.put(aggregatorIncrementFlag[i], - ((AbstractAggregator>) aggregators[i]) - .getTimesAggregated()); + ((AbstractAggregator) aggregators[i]).getTimesAggregated()); } } for (int i = 0; i < aggregators.length; i++) { @@ -133,16 +132,14 @@ * @param lastValue the value before compute(). * @param v the vertex. */ - public void aggregateVertex(M lastValue, Vertex v) { + public void aggregateVertex(int index, M lastValue, M value) { if (isEnabled()) { - for (int i = 0; i < this.aggregators.length; i++) { - Aggregator> aggregator = this.aggregators[i]; - aggregator.aggregate(v, v.getValue()); - if (isAbstractAggregator[i]) { - AbstractAggregator> intern = (AbstractAggregator>) aggregator; - intern.aggregate(v, lastValue, v.getValue()); - intern.aggregateInternal(); - } + Aggregator aggregator = this.aggregators[index]; + aggregator.aggregate(value); + if (isAbstractAggregator[index]) { + AbstractAggregator intern = (AbstractAggregator) aggregator; + intern.aggregate(lastValue, value); + intern.aggregateInternal(); } } } @@ -157,7 +154,7 @@ for (int i = 0; i < masterAggregator.length; i++) { Writable lastAggregatedValue = masterAggregator[i].getValue(); if (isAbstractAggregator[i]) { - final AbstractAggregator> intern = ((AbstractAggregator>) masterAggregator[i]); + final AbstractAggregator intern = ((AbstractAggregator) masterAggregator[i]); final Writable finalizeAggregation = intern.finalizeAggregation(); if (intern.finalizeAggregation() != null) { lastAggregatedValue = finalizeAggregation; @@ -207,7 +204,7 @@ */ public void masterReadAggregatedValue(Text textIndex, M value) { int index = Integer.parseInt(textIndex.toString().split(";")[1]); - masterAggregator[index].aggregate(null, value); + masterAggregator[index].aggregate(value); } /** @@ -217,15 +214,15 @@ public void masterReadAggregatedIncrementalValue(Text textIndex, M value) { int index = Integer.parseInt(textIndex.toString().split(";")[1]); if (isAbstractAggregator[index]) { - ((AbstractAggregator>) masterAggregator[index]) + ((AbstractAggregator) masterAggregator[index]) .addTimesAggregated(((IntWritable) value).get()); } } @SuppressWarnings("unchecked") - private Aggregator> getNewAggregator(String clsName) { + private Aggregator getNewAggregator(String clsName) { try { - return (Aggregator>) ReflectionUtils.newInstance( + return (Aggregator) ReflectionUtils.newInstance( conf.getClassByName(clsName), conf); } catch (ClassNotFoundException e) { e.printStackTrace(); Index: graph/src/main/java/org/apache/hama/graph/Aggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Aggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/Aggregator.java (working copy) @@ -27,12 +27,12 @@ * The result of an aggregator from the last superstep can be picked up by the * vertex itself via {@link Vertex}#getLastAggregatedValue(); */ -public interface Aggregator> { +public interface Aggregator { /** * Observes a new vertex value. */ - public void aggregate(VERTEX vertex, M value); + public void aggregate(M value); /** * Gets a vertex value. Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -269,14 +269,12 @@ } if (!vertex.isHalted()) { - M lastValue = vertex.getValue(); if (iterable == null) { vertex.compute(Collections. emptyList()); } else { vertex.compute(iterable); currentMessage = iterable.getOverflowMessage(); } - getAggregationRunner().aggregateVertex(lastValue, vertex); activeVertices++; } @@ -338,7 +336,6 @@ * Seed the vertices first with their own values in compute. This is the first * superstep after the vertices have been loaded. */ - @SuppressWarnings("unused") private void doInitialSuperstep( BSPPeer peer) throws IOException { @@ -347,7 +344,6 @@ IDSkippingIterator skippingIterator = vertices.skippingIterator(); while (skippingIterator.hasNext()) { Vertex vertex = skippingIterator.next(); - M lastValue = vertex.getValue(); // Calls setup method. vertex.setup(conf); Index: graph/src/main/java/org/apache/hama/graph/MaxAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (working copy) @@ -20,12 +20,12 @@ import org.apache.hadoop.io.IntWritable; public class MaxAggregator extends - AbstractAggregator> { + AbstractAggregator { int max = Integer.MIN_VALUE; @Override - public void aggregate(Vertex vertex, IntWritable value) { + public void aggregate(IntWritable value) { if (value.get() > max) { max = value.get(); } Index: graph/src/main/java/org/apache/hama/graph/MinAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/MinAggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/MinAggregator.java (working copy) @@ -20,12 +20,12 @@ import org.apache.hadoop.io.IntWritable; public class MinAggregator extends - AbstractAggregator> { + AbstractAggregator { int min = Integer.MAX_VALUE; @Override - public void aggregate(Vertex vertex, IntWritable value) { + public void aggregate(IntWritable value) { if (value.get() < min) { min = value.get(); } Index: graph/src/main/java/org/apache/hama/graph/SumAggregator.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/SumAggregator.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/SumAggregator.java (working copy) @@ -23,13 +23,12 @@ * Sums all vertex values globally. */ public class SumAggregator extends - AbstractAggregator> { + AbstractAggregator { double sum = 0.0d; @Override - public void aggregate(Vertex vertex, - DoubleWritable value) { + public void aggregate(DoubleWritable value) { sum += value.get(); } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -58,6 +58,7 @@ private transient GraphJobRunner runner; private V vertexID; + private M oldValue; private M value; private List> edges; @@ -183,6 +184,7 @@ @Override public void setValue(M value) { + this.oldValue = this.value; this.value = value; } @@ -194,31 +196,6 @@ return runner.getMaxIteration(); } - /** - * Get the last aggregated value of the defined aggregator, null if nothing - * was configured or not returned a result. You have to supply an index, the - * index is defined by the order you set the aggregator classes in - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, - * so if you have a single aggregator you can retrieve it via - * {@link #getLastAggregatedValue}(0). - */ - @SuppressWarnings("unchecked") - public M getLastAggregatedValue(int index) { - return (M) runner.getLastAggregatedValue(index); - } - - /** - * Get the number of aggregated vertices in the last superstep. Or null if no - * aggregator is available.You have to supply an index, the index is defined - * by the order you set the aggregator classes in - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, - * so if you have a single aggregator you can retrieve it via - * {@link #getNumLastAggregatedVertices}(0). - */ - public IntWritable getNumLastAggregatedVertices(int index) { - return runner.getNumLastAggregatedVertices(index); - } - public int getNumPeers() { return runner.getPeer().getNumPeers(); } @@ -406,4 +383,35 @@ vertex.readFields(dis); return vertex; } + + @Override + public void aggregate(int index, M value) throws IOException { + this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value); + } + + /** + * Get the last aggregated value of the defined aggregator, null if nothing + * was configured or not returned a result. You have to supply an index, the + * index is defined by the order you set the aggregator classes in + * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, + * so if you have a single aggregator you can retrieve it via + * {@link #getLastAggregatedValue}(0). + */ + @SuppressWarnings("unchecked") + @Override + public M getAggregatedValue(int index) { + return (M) runner.getLastAggregatedValue(index); + } + + /** + * Get the number of aggregated vertices in the last superstep. Or null if no + * aggregator is available.You have to supply an index, the index is defined + * by the order you set the aggregator classes in + * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, + * so if you have a single aggregator you can retrieve it via + * {@link #getNumLastAggregatedVertices}(0). + */ + public IntWritable getNumLastAggregatedVertices(int index) { + return runner.getNumLastAggregatedVertices(index); + } } Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1560617) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -111,5 +111,20 @@ * Gets the vertex value */ public M getValue(); + + /** + * Provides a value to the specified aggregator. + * + * @throws IOException + * + * @param name identifies a aggregator + * @param value value to be aggregated + */ + public void aggregate(int index, M value) throws IOException; + + /** + * Returns the value of the specified aggregator. + */ + public Writable getAggregatedValue(int index); } Index: graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java (revision 1560617) +++ graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java (working copy) @@ -27,9 +27,9 @@ @Test public void testAggregator() { AbsDiffAggregator diff = new AbsDiffAggregator(); - diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); - diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); - diff.aggregate(null, null, new DoubleWritable(5)); + diff.aggregate(new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(null, new DoubleWritable(5)); // 0, because this is totally worthless for diffs assertEquals(0, diff.getTimesAggregated().get()); Index: graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java (revision 1560617) +++ graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java (working copy) @@ -27,11 +27,11 @@ @Test public void testAggregator() { AverageAggregator diff = new AverageAggregator(); - diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(new DoubleWritable(5), new DoubleWritable(2)); diff.aggregateInternal(); - diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2)); + diff.aggregate(new DoubleWritable(5), new DoubleWritable(2)); diff.aggregateInternal(); - diff.aggregate(null, null, new DoubleWritable(5)); + diff.aggregate(null, new DoubleWritable(5)); diff.aggregateInternal(); assertEquals(3, diff.getTimesAggregated().get()); Index: graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java (revision 1560617) +++ graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java (working copy) @@ -27,8 +27,8 @@ @Test public void testMinAggregator() { MinAggregator diff = new MinAggregator(); - diff.aggregate(null, new IntWritable(5)); - diff.aggregate(null, new IntWritable(25)); + diff.aggregate(new IntWritable(5)); + diff.aggregate(new IntWritable(25)); assertEquals(5, diff.getValue().get()); } @@ -36,8 +36,8 @@ @Test public void testMaxAggregator() { MaxAggregator diff = new MaxAggregator(); - diff.aggregate(null, new IntWritable(5)); - diff.aggregate(null, new IntWritable(25)); + diff.aggregate(new IntWritable(5)); + diff.aggregate(new IntWritable(25)); assertEquals(25, diff.getValue().get()); } Index: graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java (revision 1560617) +++ graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java (working copy) @@ -27,8 +27,8 @@ @Test public void testAggregator() { SumAggregator diff = new SumAggregator(); - diff.aggregate(null, new DoubleWritable(5)); - diff.aggregate(null, new DoubleWritable(5)); + diff.aggregate(new DoubleWritable(5)); + diff.aggregate(new DoubleWritable(5)); assertEquals(10, (int) diff.getValue().get()); } Index: graph/src/test/java/org/apache/hama/graph/example/PageRank.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/example/PageRank.java (revision 1560617) +++ graph/src/test/java/org/apache/hama/graph/example/PageRank.java (working copy) @@ -74,7 +74,7 @@ } // if we have not reached our global error yet, then proceed. - DoubleWritable globalError = this.getLastAggregatedValue(0); + DoubleWritable globalError = this.getAggregatedValue(0); if (globalError != null && this.getSuperstepCount() > 2 && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { voteToHalt();