Index: examples/src/main/java/org/apache/hama/examples/CustomAggregators.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/CustomAggregators.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/CustomAggregators.java (working copy) @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextOutputFormat; +import org.apache.hama.graph.GraphJob; +import org.apache.hama.graph.SumAggregator; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; + +/** + * This is a simple example of how to manipulate custom aggregators. + */ +public class CustomAggregators { + + public static class GraphTextReader extends + VertexInputReader { + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) throws Exception { + + vertex.setVertexID(value); + vertex.setValue(new DoubleWritable(Double.parseDouble(value.toString()))); + + return true; + } + } + + public static class GraphVertex extends + Vertex { + + @Override + public void compute(Iterable msgs) throws IOException { + + // We will send 2 custom messages on superstep 2 and 4 only! + if (this.getSuperstepCount() == 2) { + this.aggregate("mySum", new DoubleWritable(1.0)); + } + + if (this.getSuperstepCount() == 4) { + this.aggregate("mySum", new DoubleWritable(2.0)); + } + + // We will get the first aggrigation result from our custom aggregator on superstep 3, + // and we will store the result only in vertex 4. + // This vertex should have value = 4. + if (this.getSuperstepCount() == 3 && this.getVertexID().toString().equals("4")) { + this.setValue((DoubleWritable) this.getAggregatedValue("mySum")); + } + + // By setting vertex number 3 to halt, we will see a change on the aggregating results + // in both custom and global aggregators. + // This vertex should have value = 3. + if (this.getSuperstepCount() == 3 && this.getVertexID().toString().equals("3")) { + this.voteToHalt(); + } + + // This vertex should have value = 6 (3 vertices are working x 2 the custom value) + if (this.getSuperstepCount() == 5 && this.getVertexID().toString().equals("1")) { + this.setValue((DoubleWritable) this.getAggregatedValue("mySum")); + } + + if (this.getSuperstepCount() == 6) { + this.voteToHalt(); + } + } + } + + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + if (args.length != 2) { + printUsage(); + } + HamaConfiguration conf = new HamaConfiguration(new Configuration()); + GraphJob graphJob = createJob(args, conf); + long startTime = System.currentTimeMillis(); + if (graphJob.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + } + + private static void printUsage() { + System.out.println("Usage: "); + System.exit(-1); + } + + private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException { + GraphJob graphJob = new GraphJob(conf, CustomAggregators.class); + graphJob.setJobName("Custom Aggregators"); + graphJob.setVertexClass(GraphVertex.class); + + graphJob.registerAggregator("mySum", SumAggregator.class); + + graphJob.setInputPath(new Path(args[0])); + graphJob.setOutputPath(new Path(args[1])); + + graphJob.setVertexIDClass(Text.class); + graphJob.setVertexValueClass(DoubleWritable.class); + graphJob.setEdgeValueClass(NullWritable.class); + + graphJob.setInputFormat(TextInputFormat.class); + + graphJob.setVertexInputReaderClass(GraphTextReader.class); + graphJob.setPartitioner(HashPartitioner.class); + + graphJob.setOutputFormat(TextOutputFormat.class); + graphJob.setOutputKeyClass(Text.class); + graphJob.setOutputValueClass(DoubleWritable.class); + + return graphJob; + } +} \ No newline at end of file Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1555875) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -74,7 +74,7 @@ } // if we have not reached our global error yet, then proceed. - DoubleWritable globalError = getLastAggregatedValue(0); + DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg"); if (globalError != null && this.getSuperstepCount() > 2 && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { voteToHalt(); @@ -84,6 +84,8 @@ // in each superstep we are going to send a new rank to our neighbours sendMessageToNeighbors(new DoubleWritable(this.getValue().get() / this.getEdges().size())); + + this.aggregate("avg", this.getValue()); } } @@ -126,7 +128,7 @@ } // error - pageJob.setAggregatorClass(AverageAggregator.class); + pageJob.registerAggregator("avg", AverageAggregator.class); // Vertex reader pageJob.setVertexInputReaderClass(PagerankSeqReader.class); Index: examples/src/main/java/org/apache/hama/examples/SpMV.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SpMV.java (revision 1555875) +++ examples/src/main/java/org/apache/hama/examples/SpMV.java (working copy) @@ -51,7 +51,7 @@ * operations. 2) row-wise implementation is good because we don't need to care * about communication 3) the main way to improve performance - create custom * Partitioner - * + * * TODO need to be simplified. */ public class SpMV { @@ -103,7 +103,7 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { // reading input vector, which represented as matrix row - HamaConfiguration conf = (HamaConfiguration) peer.getConfiguration(); + HamaConfiguration conf = peer.getConfiguration(); v = new DenseVectorWritable(); readFromFile(conf.get(inputVectorPathString), v, conf); peer.sync(); @@ -225,7 +225,7 @@ * format usable in subsequent computation - dense vector. It can be usable * for iterative solvers. IMPORTANT: currently it is used in SpMV. It can be a * bottle neck, because all input needs to be stored in memory. - * + * * @param SpMVoutputPathString output path, which represents directory with * part files. * @param conf configuration @@ -299,7 +299,7 @@ /** * This method is used to write vector from memory to specified path. - * + * * @param pathString output path * @param result instance of vector to be writed * @param conf configuration Index: examples/src/test/java/org/apache/hama/examples/CustomAggregatorsTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/CustomAggregatorsTest.java (revision 0) +++ examples/src/test/java/org/apache/hama/examples/CustomAggregatorsTest.java (working copy) @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; +import org.junit.Test; + +/** + * Testcase for {@link org.apache.hama.examples.CustomAggregators} + */ +public class CustomAggregatorsTest extends TestCase { + private static String OUTPUT = "/tmp/page-out"; + private Configuration conf = new HamaConfiguration(); + private FileSystem fs; + + private void deleteTempDirs() { + try { + if (fs.exists(new Path(OUTPUT))) + fs.delete(new Path(OUTPUT), true); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void verifyResult() throws IOException { + FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*")); + for (FileStatus fts : globStatus) { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fs.open(fts.getPath()))); + String line = null; + + String[] results = { "6.0", "2.0", "3.0", "4.0" }; + + for (int i=1;i<5;i++) { + line = reader.readLine(); + String[] split = line.split("\t"); + assertTrue(split[0].equals(String.valueOf(i))); + assertTrue(split[1].equals(results[i-1])); + System.out.println(split[0] + " : " + split[1]); + } + } + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + fs = FileSystem.get(conf); + } + + @Test + public void test() throws IOException, InterruptedException, ClassNotFoundException { + try { + CustomAggregators.main(new String[] {"src/test/resources/dg.txt", OUTPUT }); + verifyResult(); + } finally { + deleteTempDirs(); + } + } + +} Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -17,13 +17,14 @@ */ package org.apache.hama.graph; -import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -30,128 +31,38 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.sync.SyncException; -import com.google.common.base.Preconditions; - /** * Runner class to do the tasks that need to be done if aggregation was * configured. - * + * */ @SuppressWarnings("rawtypes") public final class AggregationRunner { + private Map Aggregators; + private Map aggregatorResults; + private Set aggregatorsUsed; - // multiple aggregator arrays - private Aggregator>[] aggregators; - private Set skipAggregators; - private Writable[] globalAggregatorResult; - private IntWritable[] globalAggregatorIncrement; - private boolean[] isAbstractAggregator; - private String[] aggregatorClassNames; - private Text[] aggregatorValueFlag; - private Text[] aggregatorIncrementFlag; - // aggregator on the master side - private Aggregator>[] masterAggregator; - - private boolean enabled = false; private Configuration conf; + private Text textWrap = new Text(); - @SuppressWarnings("unchecked") public void setupAggregators( BSPPeer peer) { this.conf = peer.getConfiguration(); - String aggregatorClasses = peer.getConfiguration().get( + + this.aggregatorResults = new HashMap(4); + this.Aggregators = new HashMap(4); + this.aggregatorsUsed = new HashSet(4); + + String customAggregatorClasses = peer.getConfiguration().get( GraphJob.AGGREGATOR_CLASS_ATTR); - this.skipAggregators = new HashSet(); - if (aggregatorClasses != null) { - enabled = true; - aggregatorClassNames = aggregatorClasses.split(";"); - // init to the split size - aggregators = new Aggregator[aggregatorClassNames.length]; - globalAggregatorResult = new Writable[aggregatorClassNames.length]; - globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length]; - isAbstractAggregator = new boolean[aggregatorClassNames.length]; - aggregatorValueFlag = new Text[aggregatorClassNames.length]; - aggregatorIncrementFlag = new Text[aggregatorClassNames.length]; - if (GraphJobRunner.isMasterTask(peer)) { - masterAggregator = new Aggregator[aggregatorClassNames.length]; - } - for (int i = 0; i < aggregatorClassNames.length; i++) { - aggregators[i] = getNewAggregator(aggregatorClassNames[i]); - aggregatorValueFlag[i] = new Text( - GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i); - aggregatorIncrementFlag[i] = new Text( - GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i); - if (aggregators[i] instanceof AbstractAggregator) { - isAbstractAggregator[i] = true; - } - if (GraphJobRunner.isMasterTask(peer)) { - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); - } - } - } - } - /** - * Runs the aggregators by sending their values to the master task. - * @param changedVertexCnt - */ - public void sendAggregatorValues( - BSPPeer peer, - int activeVertices, int changedVertexCnt) throws IOException { - // send msgCounts to the master task - MapWritable updatedCnt = new MapWritable(); - updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable( - activeVertices)); - // send total number of vertices changes - updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable( - changedVertexCnt)); - // also send aggregated values to the master - if (aggregators != null) { - for (int i = 0; i < this.aggregators.length; i++) { - if (!this.skipAggregators.contains(i)) { - updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); - if (isAbstractAggregator[i]) { - updatedCnt.put(aggregatorIncrementFlag[i], - ((AbstractAggregator>) aggregators[i]) - .getTimesAggregated()); - } - } - } - for (int i = 0; i < aggregators.length; i++) { - if (!this.skipAggregators.contains(i)) { - // now create new aggregators for the next iteration - aggregators[i] = getNewAggregator(aggregatorClassNames[i]); - if (GraphJobRunner.isMasterTask(peer)) { - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); - } - } - } - } - peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage( - updatedCnt)); - } + if (customAggregatorClasses != null) { + String[] custAggrs = customAggregatorClasses.split(";"); - /** - * Aggregates the last value before computation and the value after the - * computation. - * - * @param lastValue the value before compute(). - * @param v the vertex. - */ - public void aggregateVertex(M lastValue, Vertex v) { - if (isEnabled()) { - for (int i = 0; i < this.aggregators.length; i++) { - if (!this.skipAggregators.contains(i)) { - Aggregator> aggregator = this.aggregators[i]; - aggregator.aggregate(v, v.getValue()); - if (isAbstractAggregator[i]) { - AbstractAggregator> intern = (AbstractAggregator>) aggregator; - intern.aggregate(v, lastValue, v.getValue()); - intern.aggregateInternal(); - } - } + for (String aggr : custAggrs) { + String[] Name_AggrClass = aggr.split("@",2); + this.Aggregators.put(Name_AggrClass[0], getNewAggregator(Name_AggrClass[1])); } } } @@ -161,42 +72,39 @@ * peer and updates the given map accordingly. */ public void doMasterAggregation(MapWritable updatedCnt) { - if (isEnabled()) { - // work through the master aggregators - for (int i = 0; i < masterAggregator.length; i++) { - if (!this.skipAggregators.contains(i)) { - Writable lastAggregatedValue = masterAggregator[i].getValue(); - if (isAbstractAggregator[i]) { - final AbstractAggregator> intern = ((AbstractAggregator>) masterAggregator[i]); - final Writable finalizeAggregation = intern.finalizeAggregation(); - if (intern.finalizeAggregation() != null) { - lastAggregatedValue = finalizeAggregation; - } - // this count is usually the times of active - // vertices in the graph - updatedCnt.put(aggregatorIncrementFlag[i], - intern.getTimesAggregated()); - } - updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue); - } - } + // Get results only from used aggregators. + for (String name : this.aggregatorsUsed) { + updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue()); } + this.aggregatorsUsed.clear(); + + // Reset all custom aggregators. TODO: Change the aggregation interface to include clean() method. + Map tmp = new HashMap(4); + for (Entry e : this.Aggregators.entrySet()) { + String aggClass = e.getValue().getClass().getName(); + tmp.put(e.getKey(), getNewAggregator(aggClass)); + } + this.Aggregators = tmp; } /** * Receives aggregated values from a master task. - * + * * @return always true if no aggregators are defined, false if aggregators say * we haven't seen any messages anymore. */ public boolean receiveAggregatedValues(MapWritable updatedValues, - long iteration) throws IOException, SyncException, InterruptedException { - // map is the first value that is in the queue - for (int i = 0; i < aggregators.length; i++) { - globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); - globalAggregatorIncrement[i] = (IntWritable) updatedValues - .get(aggregatorIncrementFlag[i]); + long iteration) { + // In every superstep, we create a new result collection as we don't save history. + // If a value is missing, the user will take a null result. By creating a new collection + // every time, we can reduce the network cost (because we send less information by skipping null values) + // But we are losing in GC. + this.aggregatorResults = new HashMap(4); + for (String name : this.Aggregators.keySet()) { + this.textWrap.set(name); + this.aggregatorResults.put(name, updatedValues.get(textWrap)); } + IntWritable count = (IntWritable) updatedValues .get(GraphJobRunner.FLAG_MESSAGE_COUNTS); if (count != null && count.get() == Integer.MIN_VALUE) { @@ -206,49 +114,18 @@ } /** - * @return true if aggregators were defined. Normally used by the internal - * stateful methods, outside shouldn't use it too extensively. + * Method to let the custom master aggregator read messages from peers + * and aggregate a value. */ - public boolean isEnabled() { - return enabled; - } + @SuppressWarnings("unchecked") + public void masterAggregation(Text name, Writable value) { + String nameIdx = name.toString().split(";",2)[1]; + this.Aggregators.get(nameIdx).aggregate(null, value); - /** - * Method to let the master read messages from peers and aggregate a value. - */ - public void masterReadAggregatedValue(Text textIndex, M value) { - int index = Integer.parseInt(textIndex.toString().split(";")[1]); - masterAggregator[index].aggregate(null, value); + // When it's time to send the values, we can see which aggregators are used. + this.aggregatorsUsed.add(nameIdx); } - /** - * Method to let the master read messages from peers and aggregate the - * incremental value. - */ - public void masterReadAggregatedIncrementalValue(Text textIndex, M value) { - int index = Integer.parseInt(textIndex.toString().split(";")[1]); - if (isAbstractAggregator[index]) { - ((AbstractAggregator>) masterAggregator[index]) - .addTimesAggregated(((IntWritable) value).get()); - } - } - - /** - * This method adds an id of an aggregator that will be skipped in the current - * superstep. - */ - public void addSkipAggregator(int index) { - this.skipAggregators.add(index); - } - - /** - * This method adds an id of an aggregator that will be skipped in the current - * superstep. - */ - void resetSkipAggregators() { - this.skipAggregators.clear(); - } - @SuppressWarnings("unchecked") private Aggregator> getNewAggregator(String clsName) { try { @@ -261,13 +138,7 @@ + " could not be found or instantiated!"); } - public final Writable getLastAggregatedValue(int index) { - return globalAggregatorResult[Preconditions.checkPositionIndex(index, - globalAggregatorResult.length)]; + public final Writable getAggregatedValue(String name) { + return this.aggregatorResults.get(name); } - - public final IntWritable getNumLastAggregatedVertices(int index) { - return globalAggregatorIncrement[Preconditions.checkPositionIndex(index, - globalAggregatorIncrement.length)]; - } } Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -101,23 +101,20 @@ } /** - * Set the aggregator for the job. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void setAggregatorClass(Class cls) { - this.setAggregatorClass(new Class[] { cls }); - } + * Custom aggregator registration. Add a custom aggregator + * that will aggregate massages sent from the user. + * + * @param name identifies an aggregator + * @param aggregatorClass the aggregator class + */ + @SuppressWarnings("rawtypes") + public void registerAggregator(String name, Class aggregatorClass) { + String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, ""); - /** - * Sets multiple aggregators for the job. - */ - @SuppressWarnings("rawtypes") - public void setAggregatorClass(Class... cls) { - String classNames = ""; - for (Class cl : cls) { - classNames += cl.getName() + ";"; - } - conf.set(AGGREGATOR_CLASS_ATTR, classNames); + prevAggrs += name + "@" + aggregatorClass.getName() + ";"; + + this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs); } /** Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -217,6 +217,7 @@ buffer = new DataInputBuffer(); } + @Override public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -46,7 +46,7 @@ /** * Fully generic graph job runner. - * + * * @param the id type of a vertex. * @param the value type of an edge. * @param the value type of a vertex. @@ -63,13 +63,11 @@ // make sure that these values don't collide with the vertex names public static final String S_FLAG_MESSAGE_COUNTS = "hama.0"; - public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1"; - public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2"; - public static final String S_FLAG_VERTEX_INCREASE = "hama.3"; - public static final String S_FLAG_VERTEX_DECREASE = "hama.4"; - public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5"; - public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6"; - public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7"; + public static final String S_FLAG_AGGREGATOR_VALUE = "hama.3"; + public static final String S_FLAG_VERTEX_INCREASE = "hama.4"; + public static final String S_FLAG_VERTEX_DECREASE = "hama.5"; + public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.6"; + public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.7"; public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS); public static final Text FLAG_VERTEX_INCREASE = new Text( S_FLAG_VERTEX_INCREASE); @@ -79,8 +77,6 @@ S_FLAG_VERTEX_ALTER_COUNTER); public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text( S_FLAG_VERTEX_TOTAL_VERTICES); - public static final Text FLAG_AGGREGATOR_SKIP = new Text( - S_FLAG_AGGREGATOR_SKIP); public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class"; public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class"; @@ -182,27 +178,14 @@ GraphJobMessage firstVertexMessage, BSPPeer peer) throws IOException, SyncException, InterruptedException { - - if (isMasterTask(peer) && iteration == 1) { + // This run only on master + if (isMasterTask(peer)) { MapWritable updatedCnt = new MapWritable(); + // send total number of vertices updatedCnt.put( FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) .getCounter()))); - // send the updates from the master tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); - } - } - - // this is only done in every second iteration - if (isMasterTask(peer) && iteration > 1) { - MapWritable updatedCnt = new MapWritable(); - // send total number of vertices. - updatedCnt.put( - FLAG_VERTEX_TOTAL_VERTICES, - new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) - .getCounter()))); // exit if there's no update made if (globalUpdateCounts == 0) { updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); @@ -214,26 +197,25 @@ peer.send(peerName, new GraphJobMessage(updatedCnt)); } } - if (getAggregationRunner().isEnabled() && iteration > 1) { - // in case we need to sync, we need to replay the messages that already - // are added to the queue. This prevents loosing messages when using - // aggregators. - if (firstVertexMessage != null) { - peer.send(peer.getPeerName(), firstVertexMessage); - } - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - peer.send(peer.getPeerName(), msg); - } - // now sync - peer.sync(); - // now the map message must be read that might be send from the master - updated = getAggregationRunner().receiveAggregatedValues( - peer.getCurrentMessage().getMap(), iteration); - // set the first vertex message back to the message it had before sync - firstVertexMessage = peer.getCurrentMessage(); + + // in case we need to sync, we need to replay the messages that already + // are added to the queue. This prevents loosing messages when using + // aggregators. + if (firstVertexMessage != null) { + peer.send(peer.getPeerName(), firstVertexMessage); } - this.aggregationRunner.resetSkipAggregators(); + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + peer.send(peer.getPeerName(), msg); + } + // now sync + peer.sync(); + // now the map message must be read that might be send from the master + updated = getAggregationRunner().receiveAggregatedValues( + peer.getCurrentMessage().getMap(), iteration); + // set the first vertex message back to the message it had before sync + firstVertexMessage = peer.getCurrentMessage(); + return firstVertexMessage; } @@ -255,7 +237,7 @@ * currentMessage or the first vertex that is active. */ IDSkippingIterator iterator = vertices.skippingIterator(); - + // note that can't skip inactive vertices because we have to rewrite the // complete vertex file in each iteration while (iterator.hasNext( @@ -268,13 +250,12 @@ iterable = iterate(currentMessage, (V) currentMessage.getVertexId(), vertex, peer); } - + if (iterable != null && vertex.isHalted()) { vertex.setActive(); } - + if (!vertex.isHalted()) { - M lastValue = vertex.getValue(); if (iterable == null) { vertex.compute(Collections. emptyList()); } else { @@ -286,10 +267,9 @@ } currentMessage = iterable.getOverflowMessage(); } - getAggregationRunner().aggregateVertex(lastValue, vertex); activeVertices++; } - + // note that we even need to rewrite the vertex if it is halted for // consistency reasons vertices.finishVertexComputation(vertex); @@ -296,8 +276,7 @@ } vertices.finishSuperstep(); - getAggregationRunner().sendAggregatorValues(peer, activeVertices, - this.changedVertexCnt); + sendControllValues(activeVertices,this.changedVertexCnt); iteration++; } @@ -356,16 +335,14 @@ IDSkippingIterator skippingIterator = vertices.skippingIterator(); while (skippingIterator.hasNext()) { Vertex vertex = skippingIterator.next(); - - M lastValue = vertex.getValue(); + // Calls setup method. vertex.setup(conf); vertex.compute(Collections.singleton(vertex.getValue())); - getAggregationRunner().aggregateVertex(lastValue, vertex); vertices.finishVertexComputation(vertex); } vertices.finishSuperstep(); - getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt); + sendControllValues(1, this.changedVertexCnt); iteration++; } @@ -489,7 +466,7 @@ /** * Add new vertex into memory of each peer. - * + * * @throws IOException */ private void addVertex(Vertex vertex) throws IOException { @@ -507,7 +484,7 @@ /** * Remove vertex from this peer. - * + * * @throws IOException */ private void removeVertex(V vertexID) { @@ -518,7 +495,7 @@ /** * After all inserts are done, we must finalize the VertexInfo data structure. - * + * * @throws IOException */ private void finishAdditions() throws IOException { @@ -529,7 +506,7 @@ /** * After all inserts are done, we must finalize the VertexInfo data structure. - * + * * @throws IOException */ private void finishRemovals() throws IOException { @@ -566,7 +543,7 @@ /** * Parses the messages in every superstep and does actions according to flags * in the messages. - * + * * @return the first vertex message, null if none received. */ @SuppressWarnings("unchecked") @@ -594,14 +571,8 @@ } else { globalUpdateCounts += ((IntWritable) e.getValue()).get(); } - } else if (getAggregationRunner().isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { - getAggregationRunner().masterReadAggregatedValue(vertexID, - (M) e.getValue()); - } else if (getAggregationRunner().isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { - getAggregationRunner().masterReadAggregatedIncrementalValue( - vertexID, (M) e.getValue()); + } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { + this.getAggregationRunner().masterAggregation(vertexID, e.getValue()); } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { dynamicAdditions = true; addVertex((Vertex) e.getValue()); @@ -619,17 +590,8 @@ "A message to increase vertex count is in a wrong place: " + peer); } - } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) { - if (isMasterTask(peer)) { - this.getAggregationRunner().addSkipAggregator( - ((IntWritable) e.getValue()).get()); - } else { - throw new UnsupportedOperationException( - "A message to skip aggregators is in a wrong peer: " + peer); - } } } - } else { throw new UnsupportedOperationException("Unknown message type: " + msg); } @@ -649,6 +611,21 @@ } /** + * Runs internal aggregators and send their values to the master task. + * @param activeVertices number of active vertices in this peer + * @param changedVertexCnt number of added/removed vertices in a superstep + */ + private void sendControllValues(int activeVertices, int changedVertexCnt) + throws IOException { + // send msgCounts to the master task + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices)); + // send total number of vertices changes + updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER, new LongWritable(changedVertexCnt)); + peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt)); + } + + /** * @return the number of vertices, globally accumulated. */ public final long getNumberVertices() { @@ -677,26 +654,6 @@ } /** - * Gets the last aggregated value at the given index. The index is dependend - * on how the aggregators were configured during job setup phase. - * - * @return the value of the aggregator, or null if none was defined. - */ - public final Writable getLastAggregatedValue(int index) { - return getAggregationRunner().getLastAggregatedValue(index); - } - - /** - * Gets the last aggregated number of vertices at the given index. The index - * is dependend on how the aggregators were configured during job setup phase. - * - * @return the value of the aggregator, or null if none was defined. - */ - public final IntWritable getNumLastAggregatedVertices(int index) { - return getAggregationRunner().getNumLastAggregatedVertices(index); - } - - /** * @return the peer instance. */ public final BSPPeer getPeer() { Index: graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (working copy) @@ -82,6 +82,7 @@ vertices.dump(); } + @Override public void addVertex(Vertex vertex) { vertices.put(vertex.getVertexID(), vertex); } @@ -108,6 +109,7 @@ vertices.clear(); } + @Override public int size() { return (int) this.vertices.entries(); } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -23,8 +23,8 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hama.HamaConfiguration; @@ -35,14 +35,14 @@ * Vertex is a abstract definition of Google Pregel Vertex. For implementing a * graph application, one must implement a sub-class of Vertex and define, the * message passing and message processing for each vertex. - * + * * Every vertex should be assigned an ID. This ID object should obey the * equals-hashcode contract and would be used for partitioning. - * + * * The edges for a vertex could be accessed and modified using the * {@link Vertex#getEdges()} call. The self value of the vertex could be changed * by {@link Vertex#setValue(Writable)}. - * + * * @param Vertex ID object type * @param Edge cost object type * @param Vertex value object type @@ -71,7 +71,7 @@ @Override public void setup(HamaConfiguration conf) { } - + @Override public void sendMessage(Edge e, M msg) throws IOException { runner.getPeer().send(getDestinationPeerName(e), @@ -116,7 +116,7 @@ private void alterVertexCounter(int i) throws IOException { this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i); } - + @Override public void addVertex(V vertexID, List> edges, M value) throws IOException { MapWritable msg = new MapWritable(); @@ -125,15 +125,15 @@ vertex.setEdges(edges); vertex.setValue(value); vertex.setVertexID(vertexID); - + msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex); // Find the proper partition to host the new vertex. - int partition = getPartitioner().getPartition(vertexID, value, + int partition = getPartitioner().getPartition(vertexID, value, runner.getPeer().getNumPeers()); String destPeer = runner.getPeer().getAllPeerNames()[partition]; - + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); - + alterVertexCounter(1); } @@ -141,11 +141,11 @@ public void remove() throws IOException { MapWritable msg = new MapWritable(); msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID); - + // Get master task peer. String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); runner.getPeer().send(destPeer, new GraphJobMessage(msg)); - + alterVertexCounter(-1); } @@ -188,31 +188,6 @@ return runner.getMaxIteration(); } - /** - * Get the last aggregated value of the defined aggregator, null if nothing - * was configured or not returned a result. You have to supply an index, the - * index is defined by the order you set the aggregator classes in - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, - * so if you have a single aggregator you can retrieve it via - * {@link #getLastAggregatedValue}(0). - */ - @SuppressWarnings("unchecked") - public M getLastAggregatedValue(int index) { - return (M) runner.getLastAggregatedValue(index); - } - - /** - * Get the number of aggregated vertices in the last superstep. Or null if no - * aggregator is available.You have to supply an index, the index is defined - * by the order you set the aggregator classes in - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero, - * so if you have a single aggregator you can retrieve it via - * {@link #getNumLastAggregatedVertices}(0). - */ - public IntWritable getNumLastAggregatedVertices(int index) { - return runner.getNumLastAggregatedVertices(index); - } - public int getNumPeers() { return runner.getPeer().getNumPeers(); } @@ -241,21 +216,6 @@ this.votedToHalt = true; } - /** - * Disable an aggregator for the next superstep. The returning value of - * the aggregator will be null. - */ - public void skipAggregator(int index) throws IOException { - MapWritable msg = new MapWritable(); - msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index)); - - this.runner.getAggregationRunner().addSkipAggregator(index); - - // Get master task peer. - String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); - runner.getPeer().send(destPeer, new GraphJobMessage(msg)); - } - void setActive() { this.votedToHalt = false; } @@ -310,7 +270,7 @@ } this.value.readFields(in); } - + this.edges = new ArrayList>(); if (in.readBoolean()) { int num = in.readInt(); @@ -341,7 +301,7 @@ out.writeBoolean(true); vertexID.write(out); } - + if (value == null) { out.writeBoolean(false); } else { @@ -395,6 +355,28 @@ } + /** + * Provides a value to the specified aggregator. + * @throws IOException + * + * @param name identifies an aggregator + * @param value value to be aggregated + */ + @Override + public void aggregate(String name, Writable value) throws IOException { + MapWritable msg = new MapWritable(); + msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name), value); + + // Get master task peer. + String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); + } + + @Override + public Writable getAggregatedValue(String name) { + return this.runner.getAggregationRunner().getAggregatedValue(name); + } + protected void setRunner(GraphJobRunner runner) { this.runner = runner; } Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1555875) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -26,7 +26,7 @@ /** * The vertex interface. - * + * * @param this type must be writable and should also implement equals and * hashcode. * @param the type used for storing edge values, usually the value of an @@ -41,7 +41,7 @@ * This method is called once before the Vertex computation begins. Since the * Vertex object is serializable, variables in your Vertex program always * should be declared a s static. - * + * */ public void setup(HamaConfiguration conf); @@ -112,4 +112,18 @@ */ public M getValue(); + /** + * Provides a value to the specified aggregator. + * @throws IOException + * + * @param name identifies a aggregator + * @param value value to be aggregated + */ + public void aggregate(String name, Writable value) throws IOException; + + /** + * Returns the value of the specified aggregator. + */ + public Writable getAggregatedValue(String name); + } Index: graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (revision 1555875) +++ graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (working copy) @@ -44,7 +44,7 @@ public class TestSubmitGraphJob extends TestBSPMasterGroomServer { String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com", + "facebook.com\ttwitter.com", "facebook.com\tgoogle.com\tnasa.gov", "yahoo.com\tnasa.gov\tstackoverflow.com", "twitter.com\tgoogle.com\tfacebook.com", @@ -56,6 +56,7 @@ @SuppressWarnings("rawtypes") private static final List> vi = new ArrayList>(); + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -84,7 +85,7 @@ // set the defaults bsp.setMaxIteration(30); - bsp.setAggregatorClass(AverageAggregator.class); + bsp.registerAggregator("avg", AverageAggregator.class); bsp.setInputFormat(SequenceFileInputFormat.class); bsp.setInputKeyClass(Text.class); Index: graph/src/test/java/org/apache/hama/graph/example/PageRank.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/example/PageRank.java (revision 1555875) +++ graph/src/test/java/org/apache/hama/graph/example/PageRank.java (working copy) @@ -43,7 +43,7 @@ public static class PageRankVertex extends Vertex { - + static double DAMPING_FACTOR = 0.85; static double MAXIMUM_CONVERGENCE_ERROR = 0.001; @@ -74,7 +74,7 @@ } // if we have not reached our global error yet, then proceed. - DoubleWritable globalError = getLastAggregatedValue(0); + DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg"); if (globalError != null && this.getSuperstepCount() > 2 && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { voteToHalt(); @@ -84,6 +84,8 @@ // in each superstep we are going to send a new rank to our neighbours sendMessageToNeighbors(new DoubleWritable(this.getValue().get() / this.getEdges().size())); + + this.aggregate("avg", this.getValue()); } } @@ -126,7 +128,7 @@ } // error - pageJob.setAggregatorClass(AverageAggregator.class); + pageJob.registerAggregator("avg", AverageAggregator.class); // Vertex reader pageJob.setVertexInputReaderClass(PagerankSeqReader.class);