Index: examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java (revision 1442838) +++ examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java (working copy) @@ -82,13 +82,17 @@ boolean nonZero = new Random().nextInt(density) == 0; if (nonZero && !edges.contains(j) && i != j) { edges.add(j); - - // TODO please refactor this. + int peerIndex = j / interval; - if(peerIndex == peer.getNumPeers()) + if (peerIndex == peer.getNumPeers()) peerIndex = peerIndex - 1; - - peer.send(peer.getPeerName(j / interval), new Text(j + "," + i)); + + // assign remainders last task + int destPeer = j / interval; + if (destPeer == peer.getNumPeers()) + destPeer = destPeer - 1; + + peer.send(peer.getPeerName(destPeer), new Text(j + "," + i)); } } Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1442838) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -75,9 +75,13 @@ GraphJob pageJob = PageRank.createJob( new String[] { INPUT, OUTPUT, "7" }, conf); + long startTime = System.currentTimeMillis(); if (!pageJob.waitForCompletion(true)) { fail("Job did not complete normally!"); } + // 7 secs + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); verifyResult(); } finally { deleteTempDirs(); @@ -86,7 +90,7 @@ private void generateTestData() throws ClassNotFoundException, InterruptedException, IOException { - SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "20" }); + SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "7" }); } private void deleteTempDirs() { Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -177,8 +177,7 @@ // if we have an aggregator defined, we must make an additional sync // to have the updated values available on all our peers. if (isEnabled() && iteration > 1) { - peer.sync(); - + MapWritable updatedValues = peer.getCurrentMessage().getMap(); for (int i = 0; i < aggregators.length; i++) { globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]); @@ -191,6 +190,7 @@ return false; } } + return true; } Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -30,6 +30,8 @@ import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; import org.apache.hama.bsp.PartitioningRunner.RecordConverter; +import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.bsp.message.queue.SortedMessageQueue; import com.google.common.base.Preconditions; @@ -42,6 +44,7 @@ public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class"; public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class"; + /** * Creates a new Graph Job with the given configuration and an exampleClass. * The exampleClass is used to determine the user's jar to distribute in the @@ -58,6 +61,13 @@ this.setVertexValueClass(IntWritable.class); this.setEdgeValueClass(IntWritable.class); this.setPartitioner(HashPartitioner.class); + // TODO Once HAMA-723 is done, use spilling sorted queue. + this.setQueueClass(SortedMessageQueue.class); + } + + @SuppressWarnings({ "rawtypes" }) + private void setQueueClass(Class cls) { + conf.setClass("hama.messenger.queue.class", cls, MessageQueue.class); } /** @@ -133,8 +143,8 @@ } @Override - public void setPartitioner(@SuppressWarnings("rawtypes") - Class theClass) { + public void setPartitioner( + @SuppressWarnings("rawtypes") Class theClass) { super.setPartitioner(theClass); conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); } Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -20,12 +20,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; /** @@ -33,13 +32,13 @@ * real message (vertex ID and value). It can be extended by adding flags, for * example for a graph repair call. */ -public final class GraphJobMessage implements Writable { +public final class GraphJobMessage implements Writable, + WritableComparable { public static final int MAP_FLAG = 0x01; public static final int VERTEX_FLAG = 0x02; public static final int REPAIR_FLAG = 0x04; - public static final int PARTITION_FLAG = 0x08; - public static final int VERTICES_SIZE_FLAG = 0x10; + public static final int VERTICES_SIZE_FLAG = 0x08; // staticly defined because it is process-wide information, therefore in caps // considered as a constant @@ -75,11 +74,6 @@ this.vertexValue = vertexValue; } - public GraphJobMessage(Vertex vertex) { - this.flag = PARTITION_FLAG; - this.vertex = vertex; - } - public GraphJobMessage(IntWritable size) { this.flag = VERTICES_SIZE_FLAG; this.vertices_size = size; @@ -95,26 +89,6 @@ vertexValue.write(out); } else if (isMapMessage()) { map.write(out); - } else if (isPartitioningMessage()) { - vertex.getVertexID().write(out); - if (vertex.getValue() != null) { - out.writeBoolean(true); - vertex.getValue().write(out); - } else { - out.writeBoolean(false); - } - List outEdges = vertex.getEdges(); - out.writeInt(outEdges.size()); - for (Object e : outEdges) { - Edge edge = (Edge) e; - edge.getDestinationVertexID().write(out); - if (edge.getValue() != null) { - out.writeBoolean(true); - edge.getValue().write(out); - } else { - out.writeBoolean(false); - } - } } else if (isVerticesSizeMessage()) { vertices_size.write(out); } else { @@ -134,33 +108,6 @@ } else if (isMapMessage()) { map = new MapWritable(); map.readFields(in); - } else if (isPartitioningMessage()) { - Vertex vertex = GraphJobRunner - .newVertexInstance(VERTEX_CLASS, null); - Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); - vertexId.readFields(in); - vertex.setVertexID(vertexId); - if (in.readBoolean()) { - Writable vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS, - null); - vertexValue.readFields(in); - vertex.setValue(vertexValue); - } - int size = in.readInt(); - vertex.setEdges(new ArrayList>(size)); - for (int i = 0; i < size; i++) { - Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS, - null); - edgeVertexID.readFields(in); - Writable edgeValue = null; - if (in.readBoolean()) { - edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null); - edgeValue.readFields(in); - } - vertex.getEdges().add( - new Edge(edgeVertexID, edgeValue)); - } - this.vertex = vertex; } else if (isVerticesSizeMessage()) { vertices_size = new IntWritable(); vertices_size.readFields(in); @@ -202,10 +149,6 @@ return flag == REPAIR_FLAG; } - public boolean isPartitioningMessage() { - return flag == PARTITION_FLAG; - } - public boolean isVerticesSizeMessage() { return flag == VERTICES_SIZE_FLAG; } @@ -217,4 +160,18 @@ + "]"; } + @Override + public int compareTo(GraphJobMessage that) { + if (this.flag != that.flag) { + return (this.flag < that.flag) ? -1 : 1; + } else { + if (this.isVertexMessage()) { + return this.getVertexId().toString() + .compareTo(that.getVertexId().toString()); + } else { + return 0; + } + } + } + } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -92,15 +92,13 @@ throws IOException, SyncException, InterruptedException { setupFields(peer); - loadVertices(peer); - countGlobalVertexCount(peer); - doInitialSuperstep(peer); } + @SuppressWarnings("unchecked") @Override public final void bsp( BSPPeer peer) @@ -111,18 +109,53 @@ while (updated && !((maxIteration > 0) && iteration > maxIteration)) { // reset the global update counter from our master in every superstep globalUpdateCounts = 0; - peer.sync(); - - // note that the messages must be parsed here - final Map> messages = parseMessages(peer); + // master needs to update doMasterUpdates(peer); + peer.sync(); + // if aggregators say we don't have updates anymore, break if (!aggregationRunner.receiveAggregatedValues(peer, iteration)) { break; } + + GraphJobMessage msg = null; + GraphJobMessage lastMsg = processMapMessages(peer); + + V lastID = null; + List messageList = new ArrayList(); + if (lastMsg != null) { + lastID = (V) lastMsg.getVertexId(); + messageList.add((M) lastMsg.getVertexValue()); + } + // loop over vertices and do their computation - doSuperstep(messages, peer); + int activeVertices = 0; + while ((msg = peer.getCurrentMessage()) != null) { + if (msg.isVertexMessage()) { + final V vertexID = (V) msg.getVertexId(); + final M value = (M) msg.getVertexValue(); + + if (lastID == null) { + lastID = vertexID; + } + + if (lastID.equals(vertexID)) { + messageList.add(value); + } else { + activeVertices += computeVertex(lastID, messageList); + + lastID = vertexID; + messageList = new ArrayList(); + messageList.add(value); + } + } + } + activeVertices += computeVertex(lastID, messageList); + activeVertices += computeRestVertices(); + + aggregationRunner.sendAggregatorValues(peer, activeVertices); + iteration++; if (isMasterTask(peer)) { peer.getCounter(GraphJobCounter.ITERATIONS).increment(1); @@ -144,55 +177,78 @@ } } - /** - * The master task is going to check the number of updated vertices and do - * master aggregation. In case of no aggregators defined, we save a sync by - * reading multiple typed messages. - */ - private void doMasterUpdates( + @SuppressWarnings("unchecked") + private GraphJobMessage processMapMessages( BSPPeer peer) throws IOException { - if (isMasterTask(peer) && iteration > 1) { - MapWritable updatedCnt = new MapWritable(); - // exit if there's no update made - if (globalUpdateCounts == 0) { - updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); + GraphJobMessage msg = null; + GraphJobMessage lastMsg = null; + while ((msg = peer.getCurrentMessage()) != null) { + // either this is a vertex message or a directive that must be read + // as map + if (msg.isMapMessage()) { + for (Entry e : msg.getMap().entrySet()) { + Text vertexID = (Text) e.getKey(); + if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { + if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { + updated = false; + } else { + globalUpdateCounts += ((IntWritable) e.getValue()).get(); + } + } else if (aggregationRunner.isEnabled() + && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { + aggregationRunner.masterReadAggregatedValue(vertexID, + (M) e.getValue()); + } else if (aggregationRunner.isEnabled() + && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { + aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, + (M) e.getValue()); + } + } + } else if (msg.isVertexMessage()) { + lastMsg = msg; + break; } else { - aggregationRunner.doMasterAggregation(updatedCnt); - } - // send the updates from the mater tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); + throw new UnsupportedOperationException("Unknown message type: " + msg); } } + return lastMsg; } - /** - * Do the main logic of a superstep, namely checking if vertices are active, - * feeding compute with messages and controlling combiners/aggregators. - */ - private void doSuperstep(Map> messages, - BSPPeer peer) - throws IOException { + private int computeRestVertices() throws IOException { int activeVertices = 0; for (Vertex vertex : vertices) { - List msgs = messages.get(vertex.getVertexID()); - // If there are newly received messages, restart. - if (vertex.isHalted() && msgs != null) { - vertex.setActive(); + if (!vertex.isComputed && !vertex.isHalted()) { + List msgs = Collections.emptyList(); + M lastValue = vertex.getValue(); + vertex.compute(msgs.iterator()); + aggregationRunner.aggregateVertex(lastValue, vertex); + if (!vertex.isHalted()) { + activeVertices++; + } } - if (msgs == null) { - msgs = Collections.emptyList(); + vertex.isComputed = false; + } + return activeVertices; + } + + private int computeVertex(V vertexID, List messageList) throws IOException { + int activeVertices = 0; + Vertex vertex = vertices.getVertex(vertexID); + if (vertex != null) { + if (vertex.isHalted()) { + vertex.setActive(); } if (!vertex.isHalted()) { if (combiner != null) { - M combined = combiner.combine(msgs); - msgs = new ArrayList(); - msgs.add(combined); + M combined = combiner.combine(messageList); + messageList = new ArrayList(); + messageList.add(combined); } M lastValue = vertex.getValue(); - vertex.compute(msgs.iterator()); + vertex.compute(messageList.iterator()); + vertex.isComputed = true; aggregationRunner.aggregateVertex(lastValue, vertex); if (!vertex.isHalted()) { activeVertices++; @@ -200,8 +256,31 @@ } } - aggregationRunner.sendAggregatorValues(peer, activeVertices); - iteration++; + return activeVertices; + } + + /** + * The master task is going to check the number of updated vertices and do + * master aggregation. In case of no aggregators defined, we save a sync by + * reading multiple typed messages. + */ + private void doMasterUpdates( + BSPPeer peer) + throws IOException { + if (isMasterTask(peer) && iteration > 1) { + MapWritable updatedCnt = new MapWritable(); + // exit if there's no update made + if (iteration > 2 && globalUpdateCounts == 0) { + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); + LOG.debug("Sends break message at " + iteration + "th iteration"); + } else { + aggregationRunner.doMasterAggregation(updatedCnt); + } + // send the updates from the mater tasks back to the slaves + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, new GraphJobMessage(updatedCnt)); + } + } } /** @@ -229,13 +308,13 @@ maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1); - Class vertexIdClass = (Class) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR, - Text.class, Writable.class); - Class vertexValueClass = (Class) conf.getClass( - GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); - Class edgeValueClass = (Class) conf.getClass( - GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, - Writable.class); + Class vertexIdClass = (Class) conf.getClass( + GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class); + Class vertexValueClass = (Class) conf.getClass( + GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); + Class edgeValueClass = (Class) conf.getClass( + GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, + Writable.class); vertexClass = (Class>) conf.getClass( "hama.graph.vertex.class", Vertex.class); @@ -289,7 +368,8 @@ } } - LOG.info(vertices.size() + " vertices are loaded into " + peer.getPeerName()); + LOG.info(vertices.size() + " vertices are loaded into " + + peer.getPeerName()); /* * If the user want to repair the graph, it should traverse through that @@ -379,57 +459,6 @@ } /** - * Parses the messages in every superstep and does actions according to flags - * in the messages. - * - * @return a map that contains messages pro vertex. - */ - @SuppressWarnings("unchecked") - private Map> parseMessages( - BSPPeer peer) - throws IOException { - GraphJobMessage msg; - final Map> msgMap = new HashMap>(); - while ((msg = peer.getCurrentMessage()) != null) { - // either this is a vertex message or a directive that must be read - // as map - if (msg.isVertexMessage()) { - final V vertexID = (V) msg.getVertexId(); - final M value = (M) msg.getVertexValue(); - List msgs = msgMap.get(vertexID); - if (msgs == null) { - msgs = new ArrayList(); - msgMap.put(vertexID, msgs); - } - msgs.add(value); - } else if (msg.isMapMessage()) { - for (Entry e : msg.getMap().entrySet()) { - Text vertexID = (Text) e.getKey(); - if (FLAG_MESSAGE_COUNTS.equals(vertexID)) { - if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) { - updated = false; - } else { - globalUpdateCounts += ((IntWritable) e.getValue()).get(); - } - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { - aggregationRunner.masterReadAggregatedValue(vertexID, - (M) e.getValue()); - } else if (aggregationRunner.isEnabled() - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { - aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, - (M) e.getValue()); - } - } - } else { - throw new UnsupportedOperationException("Unknown message type: " + msg); - } - - } - return msgMap; - } - - /** * @return the number of vertices, globally accumulated. */ public final long getNumberVertices() { @@ -505,10 +534,10 @@ /** * @return a new vertex instance */ + @SuppressWarnings("unchecked") public static Vertex newVertexInstance( Class vertexClass, Configuration conf) { - return (Vertex) ReflectionUtils.newInstance( - vertexClass, conf); + return (Vertex) ReflectionUtils.newInstance(vertexClass, conf); } } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -355,4 +355,10 @@ */ public abstract void writeState(DataOutput out) throws IOException; + public boolean isComputed; + + public boolean isComputed() { + return isComputed; + } + } Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -94,5 +94,5 @@ * Gets the vertex value */ public M getValue(); - + } Index: graph/src/main/java/org/apache/hama/graph/VerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (revision 1442838) +++ graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (working copy) @@ -19,9 +19,9 @@ import java.io.DataInput; import java.io.DataOutput; -import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; -import java.util.List; +import java.util.Map; import org.apache.hadoop.io.Writable; @@ -35,36 +35,18 @@ public class VerticesInfo implements Iterable> { - private final List> vertices = new ArrayList>(100); + private final Map> vertices = new HashMap>(); public void addVertex(Vertex vertex) { - int i = 0; - for (Vertex check : this) { - if (check.getVertexID().equals(vertex.getVertexID())) { - this.vertices.set(i, vertex); - return; - } - ++i; - } - vertices.add(vertex); + vertices.put(vertex.getVertexID(), vertex); } public Vertex getVertex(V vertexId) { - for (Vertex vertex : this) { - if (vertex.getVertexID().equals(vertexId)) { - return vertex; - } - } - return null; + return vertices.get(vertexId); } public boolean containsVertex(V vertexId) { - for (Vertex vertex : this) { - if (vertex.getVertexID().equals(vertexId)) { - return true; - } - } - return false; + return vertices.containsKey(vertexId); } public void clear() { @@ -77,7 +59,7 @@ @Override public Iterator> iterator() { - return vertices.iterator(); + return vertices.values().iterator(); } public void recoverState(DataInput in) { Index: graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (revision 0) +++ graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (working copy) @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.PriorityQueue; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.message.queue.SortedMessageQueue; + +public class TestGraphJobMessage extends TestCase { + + public void testCompare() throws Exception { + GraphJobMessage msg1 = new GraphJobMessage(new Text("16"), new Text( + "message")); + GraphJobMessage msg2 = new GraphJobMessage(new Text("5"), new Text( + "message")); + + msg1 = new GraphJobMessage(new IntWritable(15), new Text("message1")); + msg2 = new GraphJobMessage(new IntWritable(6), new Text("message1")); + GraphJobMessage msg3 = new GraphJobMessage(new IntWritable(15), new Text( + "message2")); + GraphJobMessage msg4 = new GraphJobMessage(new IntWritable(6), new Text( + "message2")); + + PriorityQueue queue = new PriorityQueue(); + queue.add(msg1); + queue.add(msg2); + queue.add(msg3); + queue.add(msg4); + + while (queue.size() != 0) { + System.out.println(queue.remove()); + } + System.out.println("--------"); + SortedMessageQueue x = new SortedMessageQueue(); + x.add(msg1); + x.add(msg2); + x.add(msg3); + x.add(msg4); + + System.out.println("--------"); + while(x.size() != 0) { + System.out.println(x.poll()); + } + + } + +}