Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1527474) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hama.graph; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; @@ -42,6 +44,7 @@ // multiple aggregator arrays private Aggregator>[] aggregators; + private Set skipAggregators; private Writable[] globalAggregatorResult; private IntWritable[] globalAggregatorIncrement; private boolean[] isAbstractAggregator; @@ -60,6 +63,7 @@ this.conf = peer.getConfiguration(); String aggregatorClasses = peer.getConfiguration().get( GraphJob.AGGREGATOR_CLASS_ATTR); + this.skipAggregators = new HashSet(); if (aggregatorClasses != null) { enabled = true; aggregatorClassNames = aggregatorClasses.split(";"); @@ -106,18 +110,22 @@ // also send aggregated values to the master if (aggregators != null) { for (int i = 0; i < this.aggregators.length; i++) { - updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); - if (isAbstractAggregator[i]) { - updatedCnt.put(aggregatorIncrementFlag[i], - ((AbstractAggregator>) aggregators[i]) - .getTimesAggregated()); + if (!this.skipAggregators.contains(i)) { + updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); + if (isAbstractAggregator[i]) { + updatedCnt.put(aggregatorIncrementFlag[i], + ((AbstractAggregator>) aggregators[i]) + .getTimesAggregated()); + } } } for (int i = 0; i < aggregators.length; i++) { - // now create new aggregators for the next iteration - aggregators[i] = getNewAggregator(aggregatorClassNames[i]); - if (GraphJobRunner.isMasterTask(peer)) { - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); + if (!this.skipAggregators.contains(i)) { + // now create new aggregators for the next iteration + aggregators[i] = getNewAggregator(aggregatorClassNames[i]); + if (GraphJobRunner.isMasterTask(peer)) { + masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); + } } } } @@ -135,12 +143,14 @@ public void aggregateVertex(M lastValue, Vertex v) { if (isEnabled()) { for (int i = 0; i < this.aggregators.length; i++) { - Aggregator> aggregator = this.aggregators[i]; - aggregator.aggregate(v, v.getValue()); - if (isAbstractAggregator[i]) { - AbstractAggregator> intern = (AbstractAggregator>) aggregator; - intern.aggregate(v, lastValue, v.getValue()); - intern.aggregateInternal(); + if (!this.skipAggregators.contains(i)) { + Aggregator> aggregator = this.aggregators[i]; + aggregator.aggregate(v, v.getValue()); + if (isAbstractAggregator[i]) { + AbstractAggregator> intern = (AbstractAggregator>) aggregator; + intern.aggregate(v, lastValue, v.getValue()); + intern.aggregateInternal(); + } } } } @@ -154,19 +164,21 @@ if (isEnabled()) { // work through the master aggregators for (int i = 0; i < masterAggregator.length; i++) { - Writable lastAggregatedValue = masterAggregator[i].getValue(); - if (isAbstractAggregator[i]) { - final AbstractAggregator> intern = ((AbstractAggregator>) masterAggregator[i]); - final Writable finalizeAggregation = intern.finalizeAggregation(); - if (intern.finalizeAggregation() != null) { - lastAggregatedValue = finalizeAggregation; + if (!this.skipAggregators.contains(i)) { + Writable lastAggregatedValue = masterAggregator[i].getValue(); + if (isAbstractAggregator[i]) { + final AbstractAggregator> intern = ((AbstractAggregator>) masterAggregator[i]); + final Writable finalizeAggregation = intern.finalizeAggregation(); + if (intern.finalizeAggregation() != null) { + lastAggregatedValue = finalizeAggregation; + } + // this count is usually the times of active + // vertices in the graph + updatedCnt.put(aggregatorIncrementFlag[i], + intern.getTimesAggregated()); } - // this count is usually the times of active - // vertices in the graph - updatedCnt.put(aggregatorIncrementFlag[i], - intern.getTimesAggregated()); + updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue); } - updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue); } } } @@ -181,9 +193,9 @@ long iteration) throws IOException, SyncException, InterruptedException { // map is the first value that is in the queue for (int i = 0; i < aggregators.length; i++) { - globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); - globalAggregatorIncrement[i] = (IntWritable) updatedValues - .get(aggregatorIncrementFlag[i]); + globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); + globalAggregatorIncrement[i] = (IntWritable) updatedValues + .get(aggregatorIncrementFlag[i]); } IntWritable count = (IntWritable) updatedValues .get(GraphJobRunner.FLAG_MESSAGE_COUNTS); @@ -221,6 +233,22 @@ } } + /** + * This method adds an id of an aggregator that will be skipped in the current + * superstep. + */ + public void addSkipAggregator(int index) { + this.skipAggregators.add(index); + } + + /** + * This method adds an id of an aggregator that will be skipped in the current + * superstep. + */ + void resetSkipAggregators() { + this.skipAggregators.clear(); + } + @SuppressWarnings("unchecked") private Aggregator> getNewAggregator(String clsName) { try { Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1527474) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -68,11 +68,13 @@ public static final String S_FLAG_VERTEX_DECREASE = "hama.4"; public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5"; public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6"; + public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7"; public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS); public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE); public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE); public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER); public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES); + public static final Text FLAG_AGGREGATOR_SKIP = new Text(S_FLAG_AGGREGATOR_SKIP); public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class"; public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class"; @@ -193,7 +195,7 @@ if (globalUpdateCounts == 0) { updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); } else { - aggregationRunner.doMasterAggregation(updatedCnt); + getAggregationRunner().doMasterAggregation(updatedCnt); } // send the updates from the master tasks back to the slaves for (String peerName : peer.getAllPeerNames()) { @@ -200,7 +202,7 @@ peer.send(peerName, new GraphJobMessage(updatedCnt)); } } - if (aggregationRunner.isEnabled() && iteration > 1) { + if (getAggregationRunner().isEnabled() && iteration > 1) { // in case we need to sync, we need to replay the messages that already // are added to the queue. This prevents loosing messages when using // aggregators. @@ -214,11 +216,12 @@ // now sync peer.sync(); // now the map message must be read that might be send from the master - updated = aggregationRunner.receiveAggregatedValues(peer + updated = getAggregationRunner().receiveAggregatedValues(peer .getCurrentMessage().getMap(), iteration); // set the first vertex message back to the message it had before sync firstVertexMessage = peer.getCurrentMessage(); } + this.aggregationRunner.resetSkipAggregators(); return firstVertexMessage; } @@ -267,7 +270,7 @@ } currentMessage = iterable.getOverflowMessage(); } - aggregationRunner.aggregateVertex(lastValue, vertex); + getAggregationRunner().aggregateVertex(lastValue, vertex); activeVertices++; } @@ -277,7 +280,7 @@ } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); + getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); iteration++; } @@ -338,11 +341,11 @@ Vertex vertex = skippingIterator.next(); M lastValue = vertex.getValue(); vertex.compute(Collections.singleton(vertex.getValue())); - aggregationRunner.aggregateVertex(lastValue, vertex); + getAggregationRunner().aggregateVertex(lastValue, vertex); vertices.finishVertexComputation(vertex); } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt); + getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt); iteration++; } @@ -376,8 +379,8 @@ vertexOutputWriter = (VertexOutputWriter) ReflectionUtils .newInstance(outputWriter); - aggregationRunner = new AggregationRunner(); - aggregationRunner.setupAggregators(peer); + setAggregationRunner(new AggregationRunner()); + getAggregationRunner().setupAggregators(peer); Class> verticesInfoClass = (Class>) conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class, VerticesInfo.class); vertices = ReflectionUtils.newInstance(verticesInfoClass); @@ -549,13 +552,13 @@ } else { globalUpdateCounts += ((IntWritable) e.getValue()).get(); } - } else if (aggregationRunner.isEnabled() + } else if (getAggregationRunner().isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { - aggregationRunner.masterReadAggregatedValue(vertexID, + getAggregationRunner().masterReadAggregatedValue(vertexID, (M) e.getValue()); - } else if (aggregationRunner.isEnabled() + } else if (getAggregationRunner().isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { - aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, + getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID, (M) e.getValue()); } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { dynamicAdditions = true; @@ -571,6 +574,12 @@ } else { throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer); } + } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) { + if (isMasterTask(peer)) { + this.getAggregationRunner().addSkipAggregator(((IntWritable) e.getValue()).get()); + } else { + throw new UnsupportedOperationException("A message to skip aggregators is in a wrong peer: " + peer); + } } } @@ -626,7 +635,7 @@ * @return the value of the aggregator, or null if none was defined. */ public final Writable getLastAggregatedValue(int index) { - return aggregationRunner.getLastAggregatedValue(index); + return getAggregationRunner().getLastAggregatedValue(index); } /** @@ -636,7 +645,7 @@ * @return the value of the aggregator, or null if none was defined. */ public final IntWritable getNumLastAggregatedVertices(int index) { - return aggregationRunner.getNumLastAggregatedVertices(index); + return getAggregationRunner().getNumLastAggregatedVertices(index); } /** @@ -707,4 +716,18 @@ this.changedVertexCnt = changedVertexCnt; } + /** + * @return the aggregationRunner + */ + AggregationRunner getAggregationRunner() { + return aggregationRunner; + } + + /** + * @param aggregationRunner the aggregationRunner to set + */ + void setAggregationRunner(AggregationRunner aggregationRunner) { + this.aggregationRunner = aggregationRunner; + } + } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1527474) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -228,7 +228,7 @@ * @return the configured partitioner instance to message vertices. */ public Partitioner getPartitioner() { - return (Partitioner) runner.getPartitioner(); + return runner.getPartitioner(); } @Override @@ -241,6 +241,21 @@ this.votedToHalt = true; } + /** + * Disable an aggregator for the next superstep. The returning value of + * the aggregator will be null. + */ + public void skipAggregator(int index) throws IOException { + MapWritable msg = new MapWritable(); + msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index)); + + this.runner.getAggregationRunner().addSkipAggregator(index); + + // Get master task peer. + String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); + } + void setActive() { this.votedToHalt = false; }