Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1442838) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -75,9 +75,13 @@ GraphJob pageJob = PageRank.createJob( new String[] { INPUT, OUTPUT, "7" }, conf); + long startTime = System.currentTimeMillis(); if (!pageJob.waitForCompletion(true)) { fail("Job did not complete normally!"); } + // 7 secs + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); verifyResult(); } finally { deleteTempDirs(); @@ -86,7 +90,7 @@ private void generateTestData() throws ClassNotFoundException, InterruptedException, IOException { - SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "20" }); + SymmetricMatrixGen.main(new String[] { "400", "10", INPUT, "2" }); } private void deleteTempDirs() { Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -177,6 +177,15 @@ // if we have an aggregator defined, we must make an additional sync // to have the updated values available on all our peers. if (isEnabled() && iteration > 1) { + + // send vertex messages to n+1 superstep + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + if (msg.isVertexMessage()) { + peer.send(peer.getPeerName(), msg); + } + } + peer.sync(); MapWritable updatedValues = peer.getCurrentMessage().getMap(); Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -30,6 +30,8 @@ import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; import org.apache.hama.bsp.PartitioningRunner.RecordConverter; +import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.bsp.message.queue.SortedMessageQueue; import com.google.common.base.Preconditions; @@ -42,6 +44,7 @@ public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class"; public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class"; + /** * Creates a new Graph Job with the given configuration and an exampleClass. * The exampleClass is used to determine the user's jar to distribute in the @@ -58,6 +61,13 @@ this.setVertexValueClass(IntWritable.class); this.setEdgeValueClass(IntWritable.class); this.setPartitioner(HashPartitioner.class); + // TODO Once HAMA-723 is done, use spilling sorted queue. + this.setQueueClass(SortedMessageQueue.class); + } + + @SuppressWarnings({ "rawtypes" }) + private void setQueueClass(Class cls) { + conf.setClass("hama.messenger.queue.class", cls, MessageQueue.class); } /** @@ -133,8 +143,8 @@ } @Override - public void setPartitioner(@SuppressWarnings("rawtypes") - Class theClass) { + public void setPartitioner( + @SuppressWarnings("rawtypes") Class theClass) { super.setPartitioner(theClass); conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); } Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; /** @@ -33,7 +34,8 @@ * real message (vertex ID and value). It can be extended by adding flags, for * example for a graph repair call. */ -public final class GraphJobMessage implements Writable { +public final class GraphJobMessage implements Writable, + WritableComparable { public static final int MAP_FLAG = 0x01; public static final int VERTEX_FLAG = 0x02; @@ -217,4 +219,18 @@ + "]"; } + @Override + public int compareTo(GraphJobMessage that) { + if (this.flag != that.flag) { + return (this.flag < that.flag) ? -1 : 1; + } else { + if (this.isVertexMessage()) { + return this.getVertexId().toString() + .compareTo(that.getVertexId().toString()); + } else { + return 0; + } + } + } + } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -28,8 +28,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; @@ -86,21 +89,22 @@ private BSPPeer peer; + private FileSystem local; + @Override public final void setup( BSPPeer peer) throws IOException, SyncException, InterruptedException { setupFields(peer); - + this.local = FileSystem.getLocal(conf); loadVertices(peer); - countGlobalVertexCount(peer); - doInitialSuperstep(peer); } + @SuppressWarnings("unchecked") @Override public final void bsp( BSPPeer peer) @@ -113,16 +117,60 @@ globalUpdateCounts = 0; peer.sync(); - // note that the messages must be parsed here - final Map> messages = parseMessages(peer); + GraphJobMessage lastMsg = processMapMessages(peer); + + Path vertexMessageFile = new Path(conf.get("bsp.local.dir", "/tmp") + + "/messages-" + peer.getPeerIndex() + "_" + peer.getSuperstepCount()); + + // //////////// + // write v messages to local disk + writeVertexMessagesToLocal(peer, vertexMessageFile, lastMsg); + // master needs to update doMasterUpdates(peer); // if aggregators say we don't have updates anymore, break if (!aggregationRunner.receiveAggregatedValues(peer, iteration)) { break; } - // loop over vertices and do their computation - doSuperstep(messages, peer); + + // read v messages and compute sequentially + SequenceFile.Reader reader = new SequenceFile.Reader(local, + vertexMessageFile, conf); + IntWritable key = new IntWritable(); + GraphJobMessage value = new GraphJobMessage(); + + V lastID = null; + List messageList = new ArrayList(); + int activeVertices = 0; + + while (reader.next(key, value)) { + V vertexID = (V) value.getVertexId(); + M vertexValue = (M) value.getVertexValue(); + + if (lastID == null) { + lastID = vertexID; + } + + if (lastID.equals(vertexID)) { + messageList.add(vertexValue); + } else { + activeVertices += computeVertex(lastID, messageList); + + lastID = vertexID; + messageList = new ArrayList(); + messageList.add(vertexValue); + } + + } + reader.close(); + + activeVertices += computeVertex(lastID, messageList); + activeVertices += computeRestVertices(); + + local.delete(vertexMessageFile, true); + + aggregationRunner.sendAggregatorValues(peer, activeVertices); + iteration++; if (isMasterTask(peer)) { peer.getCounter(GraphJobCounter.ITERATIONS).increment(1); @@ -131,6 +179,29 @@ } + private void writeVertexMessagesToLocal( + BSPPeer peer, + Path messagePath, GraphJobMessage lastMsg) throws IOException { + SequenceFile.Writer writer1 = SequenceFile.createWriter(local, conf, + messagePath, IntWritable.class, GraphJobMessage.class); + + int cnt = 0; + if (lastMsg != null) { + writer1.append(new IntWritable(1), lastMsg); + cnt++; + } + + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + if (msg.isVertexMessage()) { + writer1.append(new IntWritable(cnt), msg); + cnt++; + } + } + + writer1.close(); + } + /** * Just write pair as a result. Note that * this will also be executed when failure happened. @@ -144,55 +215,78 @@ } } - /** - * The master task is going to check the number of updated vertices and do - * master aggregation. In case of no aggregators defined, we save a sync by - * reading multiple typed messages. - */ - private void doMasterUpdates( + @SuppressWarnings("unchecked") + private GraphJobMessage processMapMessages( BSPPeer peer) throws IOException { - if (isMasterTask(peer) && iteration > 1) { - MapWritable updatedCnt = new MapWritable(); - // exit if there's no update made - if (globalUpdateCounts == 0) { - updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); + GraphJobMessage msg = null; + GraphJobMessage lastMsg = null; + while ((msg = peer.getCurrentMessage()) != null) { + // either this is a vertex message or a directive that must be read + // as map + if (msg.isMapMessage()) { + for (Entry e : msg.getMap().entrySet()) { + Text vertexID = (Text) e.getKey(); + if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { + if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { + updated = false; + } else { + globalUpdateCounts += ((IntWritable) e.getValue()).get(); + } + } else if (aggregationRunner.isEnabled() + && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { + aggregationRunner.masterReadAggregatedValue(vertexID, + (M) e.getValue()); + } else if (aggregationRunner.isEnabled() + && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { + aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, + (M) e.getValue()); + } + } + } else if (msg.isVertexMessage()) { + lastMsg = msg; + break; } else { - aggregationRunner.doMasterAggregation(updatedCnt); - } - // send the updates from the mater tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); + throw new UnsupportedOperationException("Unknown message type: " + msg); } } + return lastMsg; } - /** - * Do the main logic of a superstep, namely checking if vertices are active, - * feeding compute with messages and controlling combiners/aggregators. - */ - private void doSuperstep(Map> messages, - BSPPeer peer) - throws IOException { + private int computeRestVertices() throws IOException { int activeVertices = 0; for (Vertex vertex : vertices) { - List msgs = messages.get(vertex.getVertexID()); - // If there are newly received messages, restart. - if (vertex.isHalted() && msgs != null) { - vertex.setActive(); + if (!vertex.isComputed && !vertex.isHalted()) { + List msgs = Collections.emptyList(); + M lastValue = vertex.getValue(); + vertex.compute(msgs.iterator()); + aggregationRunner.aggregateVertex(lastValue, vertex); + if (!vertex.isHalted()) { + activeVertices++; + } } - if (msgs == null) { - msgs = Collections.emptyList(); + vertex.isComputed = false; + } + return activeVertices; + } + + private int computeVertex(V vertexID, List messageList) throws IOException { + int activeVertices = 0; + Vertex vertex = vertices.getVertex(vertexID); + if (vertex != null) { + if (vertex.isHalted()) { + vertex.setActive(); } if (!vertex.isHalted()) { if (combiner != null) { - M combined = combiner.combine(msgs); - msgs = new ArrayList(); - msgs.add(combined); + M combined = combiner.combine(messageList); + messageList = new ArrayList(); + messageList.add(combined); } M lastValue = vertex.getValue(); - vertex.compute(msgs.iterator()); + vertex.compute(messageList.iterator()); + vertex.isComputed = true; aggregationRunner.aggregateVertex(lastValue, vertex); if (!vertex.isHalted()) { activeVertices++; @@ -200,8 +294,30 @@ } } - aggregationRunner.sendAggregatorValues(peer, activeVertices); - iteration++; + return activeVertices; + } + + /** + * The master task is going to check the number of updated vertices and do + * master aggregation. In case of no aggregators defined, we save a sync by + * reading multiple typed messages. + */ + private void doMasterUpdates( + BSPPeer peer) + throws IOException { + if (isMasterTask(peer) && iteration > 1) { + MapWritable updatedCnt = new MapWritable(); + // exit if there's no update made + if (globalUpdateCounts == 0) { + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); + } else { + aggregationRunner.doMasterAggregation(updatedCnt); + } + // send the updates from the mater tasks back to the slaves + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, new GraphJobMessage(updatedCnt)); + } + } } /** @@ -229,13 +345,13 @@ maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1); - Class vertexIdClass = (Class) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR, - Text.class, Writable.class); - Class vertexValueClass = (Class) conf.getClass( - GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); - Class edgeValueClass = (Class) conf.getClass( - GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, - Writable.class); + Class vertexIdClass = (Class) conf.getClass( + GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class); + Class vertexValueClass = (Class) conf.getClass( + GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); + Class edgeValueClass = (Class) conf.getClass( + GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, + Writable.class); vertexClass = (Class>) conf.getClass( "hama.graph.vertex.class", Vertex.class); @@ -289,7 +405,8 @@ } } - LOG.info(vertices.size() + " vertices are loaded into " + peer.getPeerName()); + LOG.info(vertices.size() + " vertices are loaded into " + + peer.getPeerName()); /* * If the user want to repair the graph, it should traverse through that @@ -379,57 +496,6 @@ } /** - * Parses the messages in every superstep and does actions according to flags - * in the messages. - * - * @return a map that contains messages pro vertex. - */ - @SuppressWarnings("unchecked") - private Map> parseMessages( - BSPPeer peer) - throws IOException { - GraphJobMessage msg; - final Map> msgMap = new HashMap>(); - while ((msg = peer.getCurrentMessage()) != null) { - // either this is a vertex message or a directive that must be read - // as map - if (msg.isVertexMessage()) { - final V vertexID = (V) msg.getVertexId(); - final M value = (M) msg.getVertexValue(); - List msgs = msgMap.get(vertexID); - if (msgs == null) { - msgs = new ArrayList(); - msgMap.put(vertexID, msgs); - } - msgs.add(value); - } else if (msg.isMapMessage()) { - for (Entry e : msg.getMap().entrySet()) { - Text vertexID = (Text) e.getKey(); - if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { - if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { - updated = false; - } else { - globalUpdateCounts += ((IntWritable) e.getValue()).get(); - } - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { - aggregationRunner.masterReadAggregatedValue(vertexID, - (M) e.getValue()); - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { - aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, - (M) e.getValue()); - } - } - } else { - throw new UnsupportedOperationException("Unknown message type: " + msg); - } - - } - return msgMap; - } - - /** * @return the number of vertices, globally accumulated. */ public final long getNumberVertices() { @@ -505,10 +571,10 @@ /** * @return a new vertex instance */ + @SuppressWarnings("unchecked") public static Vertex newVertexInstance( Class vertexClass, Configuration conf) { - return (Vertex) ReflectionUtils.newInstance( - vertexClass, conf); + return (Vertex) ReflectionUtils.newInstance(vertexClass, conf); } } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -355,4 +355,10 @@ */ public abstract void writeState(DataOutput out) throws IOException; + public boolean isComputed; + + public boolean isComputed() { + return isComputed; + } + } Index: graph/src/main/java/org/apache/hama/graph/VerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (working copy) @@ -19,9 +19,9 @@ import java.io.DataInput; import java.io.DataOutput; -import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; -import java.util.List; +import java.util.Map; import org.apache.hadoop.io.Writable; @@ -35,36 +35,18 @@ public class VerticesInfo implements Iterable> { - private final List> vertices = new ArrayList>(100); + private final Map> vertices = new HashMap>(); public void addVertex(Vertex vertex) { - int i = 0; - for (Vertex check : this) { - if (check.getVertexID().equals(vertex.getVertexID())) { - this.vertices.set(i, vertex); - return; - } - ++i; - } - vertices.add(vertex); + vertices.put(vertex.getVertexID(), vertex); } public Vertex getVertex(V vertexId) { - for (Vertex vertex : this) { - if (vertex.getVertexID().equals(vertexId)) { - return vertex; - } - } - return null; + return vertices.get(vertexId); } public boolean containsVertex(V vertexId) { - for (Vertex vertex : this) { - if (vertex.getVertexID().equals(vertexId)) { - return true; - } - } - return false; + return vertices.containsKey(vertexId); } public void clear() { @@ -77,7 +59,7 @@ @Override public Iterator> iterator() { - return vertices.iterator(); + return vertices.values().iterator(); } public void recoverState(DataInput in) { Index: graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (revision 0) +++ graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (working copy) @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.PriorityQueue; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.message.queue.SortedMessageQueue; + +public class TestGraphJobMessage extends TestCase { + + public void testCompare() throws Exception { + GraphJobMessage msg1 = new GraphJobMessage(new Text("16"), new Text( + "message")); + GraphJobMessage msg2 = new GraphJobMessage(new Text("5"), new Text( + "message")); + + msg1 = new GraphJobMessage(new IntWritable(15), new Text("message1")); + msg2 = new GraphJobMessage(new IntWritable(6), new Text("message1")); + GraphJobMessage msg3 = new GraphJobMessage(new IntWritable(15), new Text( + "message2")); + GraphJobMessage msg4 = new GraphJobMessage(new IntWritable(6), new Text( + "message2")); + + PriorityQueue queue = new PriorityQueue(); + queue.add(msg1); + queue.add(msg2); + queue.add(msg3); + queue.add(msg4); + + while (queue.size() != 0) { + System.out.println(queue.remove()); + } + System.out.println("--------"); + SortedMessageQueue x = new SortedMessageQueue(); + x.add(msg1); + x.add(msg2); + x.add(msg3); + x.add(msg4); + + System.out.println("--------"); + while(x.size() != 0) { + System.out.println(x.poll()); + } + + } + +}