Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1632265) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -20,6 +20,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.IntWritable; @@ -46,7 +48,7 @@ private MapWritable map; @SuppressWarnings("rawtypes") private WritableComparable vertexId; - private Writable vertexValue; + private List values; private IntWritable verticesSize; private static GraphJobMessageComparator comparator; @@ -69,9 +71,18 @@ public GraphJobMessage(WritableComparable vertexId, Writable vertexValue) { this.flag = VERTEX_FLAG; this.vertexId = vertexId; - this.vertexValue = vertexValue; + if (values == null) + values = new ArrayList(); + + this.values.add(vertexValue); } + public GraphJobMessage(WritableComparable vertexId, List values) { + this.flag = VERTEX_FLAG; + this.vertexId = vertexId; + this.values = values; + } + public GraphJobMessage(IntWritable size) { this.flag = VERTICES_SIZE_FLAG; this.verticesSize = size; @@ -84,7 +95,10 @@ // we don't need to write the classes because the other side has the same // classes for the two entities. vertexId.write(out); - vertexValue.write(out); + + out.writeInt(values.size()); + for (Writable v : values) + v.write(out); } else if (isMapMessage()) { map.write(out); } else if (isVerticesSizeMessage()) { @@ -122,8 +136,14 @@ if (isVertexMessage()) { vertexId = GraphJobRunner.createVertexIDObject(); vertexId.readFields(in); - vertexValue = GraphJobRunner.createVertexValue(); - vertexValue.readFields(in); + + values = new ArrayList(); + int numOfValues = in.readInt(); + for (int i = 0; i < numOfValues; i++) { + Writable v = GraphJobRunner.createVertexValue(); + v.readFields(in); + values.add(v); + } } else if (isMapMessage()) { map = new MapWritable(); map.readFields(in); @@ -161,8 +181,8 @@ return vertexId; } - public Writable getVertexValue() { - return vertexValue; + public List getVertexValue() { + return values; } public IntWritable getVerticesSize() { @@ -184,7 +204,7 @@ @Override public String toString() { if (isVertexMessage()) { - return "ID: " + vertexId + " Val: " + vertexValue; + return "ID: " + vertexId + " Val: " + values; } else if (isMapMessage()) { return "Map: " + map; } else if (isVerticesSizeMessage()) { @@ -191,7 +211,7 @@ return "#Vertices: " + verticesSize; } else { return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId=" - + vertexId + ", vertexValue=" + vertexValue + "]"; + + vertexId + ", vertexValue=" + values + "]"; } } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1632265) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -19,7 +19,12 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,12 +44,11 @@ import org.apache.hama.bsp.PartitioningRunner.RecordConverter; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.commons.util.KeyValuePair; -import org.apache.hama.graph.IDSkippingIterator.Strategy; import org.apache.hama.util.ReflectionUtils; /** * Fully generic graph job runner. - * + * * @param the id type of a vertex. * @param the value type of an edge. * @param the value type of a vertex. @@ -88,7 +92,8 @@ public static Class EDGE_VALUE_CLASS; public static Class> vertexClass; - private VerticesInfo vertices; + private Map> vertices; + private boolean updated = true; private int globalUpdateCounts = 0; private int changedVertexCnt = 0; @@ -153,6 +158,7 @@ /** * Just write pair as a result. Note that * this will also be executed when failure happened. + * * @param peer * @throws java.io.IOException */ @@ -161,11 +167,11 @@ BSPPeer peer) throws IOException { vertexOutputWriter.setup(conf); - IDSkippingIterator skippingIterator = vertices.skippingIterator(); - while (skippingIterator.hasNext()) { - vertexOutputWriter.write(skippingIterator.next(), peer); + Iterator> iterator = vertices.values().iterator(); + while (iterator.hasNext()) { + vertexOutputWriter.write(iterator.next(), peer); } - vertices.cleanup(conf, peer.getTaskId()); + vertices.clear(); } /** @@ -219,50 +225,49 @@ return firstVertexMessage; } + private Map computed = new HashMap(); + /** * Do the main logic of a superstep, namely checking if vertices are active, - * feeding compute with messages and controlling combiners/aggregators. - * We iterate over our messages and vertices in sorted order. That means - * that we need to seek the first vertex that has the same ID as the - * iterated message. + * feeding compute with messages and controlling combiners/aggregators. We + * iterate over our messages and vertices in sorted order. That means that we + * need to seek the first vertex that has the same ID as the iterated message. */ + @SuppressWarnings("unchecked") private void doSuperstep(GraphJobMessage currentMessage, BSPPeer peer) throws IOException { int activeVertices = 0; this.changedVertexCnt = 0; - this.vertices.startSuperstep(); - IDSkippingIterator iterator = this.vertices.skippingIterator(); - VertexMessages queueMessages = new VertexMessages(peer); - queueMessages.prependMessage(currentMessage); + initComputedMap(); - // note that can't skip inactive vertices because we have to rewrite the - // complete vertex file in each iteration - V firstVID = currentMessage == null ? null : (V) currentMessage.getVertexId(); - while (iterator.hasNext(firstVID, Strategy.ALL)) { - Vertex vertex = iterator.next(); - boolean msgsExist = queueMessages.continueWith(vertex.getVertexID()); + List msgs = null; + Vertex vertex = null; - if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages); - - if (msgsExist && vertex.isHalted()) { + while (currentMessage != null) { + vertex = vertices.get(currentMessage.getVertexId()); + msgs = (List) currentMessage.getVertexValue(); + if (vertex.isHalted()) { vertex.setActive(); } if (!vertex.isHalted()) { - vertex.compute(queueMessages); + vertex.compute(msgs); + computed.put(vertex.getVertexID(), true); activeVertices++; } - // Dump remaining messages - queueMessages.dumpRest(); + currentMessage = peer.getCurrentMessage(); + } - // note that we even need to rewrite the vertex if it is halted for - // consistency reasons - this.vertices.finishVertexComputation(vertex); + for (Map.Entry e : computed.entrySet()) { + if (!e.getValue()) { + vertex = vertices.get(e.getKey()); + if (!vertex.isHalted()) + vertex.compute(Collections. emptyList()); + } } - this.vertices.finishSuperstep(); getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); @@ -269,22 +274,9 @@ this.iteration++; } - /** - * Utility that ensures that the incoming messages have a target vertex. - */ - private void checkMsgOrder(V vid, VertexMessages vm) { - // When the vid is greater than the current message, it means that a vertex - // has sent a message to an other vertex that doesn't exist - if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) { - if (conf.getBoolean("hama.check.missing.vertex", true)) { - throw new IllegalArgumentException( - "A message has recieved with a destination ID: " + vm.getMessageVID() - + " that does not exist! (Vertex iterator is at" + vid + " ID)"); - } else { - // Skip all unrecognized messages until we find a match - vm.continueUntil(vid); - } - } + private void initComputedMap() { + for (V v : (computed.size() == 0) ? vertices.keySet() : computed.keySet()) + computed.put(v, false); } /** @@ -294,18 +286,16 @@ private void doInitialSuperstep( BSPPeer peer) throws IOException { - vertices.startSuperstep(); this.changedVertexCnt = 0; - IDSkippingIterator skippingIterator = vertices.skippingIterator(); - while (skippingIterator.hasNext()) { - Vertex vertex = skippingIterator.next(); + Iterator> iterator = vertices.values().iterator(); + while (iterator.hasNext()) { + Vertex vertex = iterator.next(); + // Calls setup method. vertex.setup(conf); vertex.compute(Collections.singleton(vertex.getValue())); - vertices.finishVertexComputation(vertex); } - vertices.finishSuperstep(); getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt); iteration++; } @@ -334,11 +324,7 @@ setAggregationRunner(new AggregationRunner()); getAggregationRunner().setupAggregators(peer); - Class> verticesInfoClass = (Class>) conf - .getClass("hama.graph.vertices.info", ListVerticesInfo.class, - VerticesInfo.class); - vertices = ReflectionUtils.newInstance(verticesInfoClass); - vertices.init(this, conf, peer.getTaskId()); + vertices = new HashMap>(); } @SuppressWarnings("unchecked") @@ -396,7 +382,8 @@ } } else { if (vertex.compareTo(currentVertex) > 0) { - throw new IOException("The records of split aren't in order by vertex ID."); + throw new IOException( + "The records of split aren't in order by vertex ID."); } if (selfReference) { @@ -403,7 +390,7 @@ vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - vertices.addVertex(vertex); + addVertex(vertex); vertex = currentVertex; } } @@ -412,12 +399,8 @@ if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - vertices.addVertex(vertex); + addVertex(vertex); - vertices.finishAdditions(); - // finish the "superstep" because we have written a new file here - vertices.finishSuperstep(); - LOG.info(vertices.size() + " vertices are loaded into " + peer.getPeerName()); LOG.debug("Starting Vertex processing!"); @@ -425,7 +408,7 @@ /** * Add new vertex into memory of each peer. - * + * * @throws IOException */ private void addVertex(Vertex vertex) throws IOException { @@ -438,43 +421,21 @@ LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " + peer.getPeerName()); - vertices.addVertex(vertex); + vertices.put(vertex.getVertexID(), vertex); } /** * Remove vertex from this peer. - * + * * @throws IOException */ private void removeVertex(V vertexID) { - vertices.removeVertex(vertexID); + vertices.remove(vertexID); LOG.debug("Removed VertexID: " + vertexID + " in peer " + peer.getPeerName()); } /** - * After all inserts are done, we must finalize the VertexInfo data structure. - * - * @throws IOException - */ - private void finishAdditions() throws IOException { - vertices.finishAdditions(); - // finish the "superstep" because we have written a new file here - vertices.finishSuperstep(); - } - - /** - * After all inserts are done, we must finalize the VertexInfo data structure. - * - * @throws IOException - */ - private void finishRemovals() throws IOException { - vertices.finishRemovals(); - // finish the "superstep" because we have written a new file here - vertices.finishSuperstep(); - } - - /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ @@ -502,7 +463,7 @@ /** * Parses the messages in every superstep and does actions according to flags * in the messages. - * + * * @return the first vertex message, null if none received. */ @SuppressWarnings("unchecked") @@ -510,8 +471,6 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { GraphJobMessage msg = null; - boolean dynamicAdditions = false; - boolean dynamicRemovals = false; while ((msg = peer.getCurrentMessage()) != null) { // either this is a vertex message or a directive that must be read @@ -539,10 +498,8 @@ getAggregationRunner().masterReadAggregatedIncrementalValue( vertexID, (M) e.getValue()); } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { - dynamicAdditions = true; addVertex((Vertex) e.getValue()); } else if (FLAG_VERTEX_DECREASE.equals(vertexID)) { - dynamicRemovals = true; removeVertex((V) e.getValue()); } else if (FLAG_VERTEX_TOTAL_VERTICES.equals(vertexID)) { this.numberVertices = ((LongWritable) e.getValue()).get(); @@ -564,15 +521,6 @@ } - // If we applied any changes to vertices, we need to call finishAdditions - // and finishRemovals in the end. - if (dynamicAdditions) { - finishAdditions(); - } - if (dynamicRemovals) { - finishRemovals(); - } - return msg; } @@ -607,7 +555,7 @@ /** * Gets the last aggregated value at the given index. The index is dependend * on how the aggregators were configured during job setup phase. - * + * * @return the value of the aggregator, or null if none was defined. */ public final Writable getLastAggregatedValue(int index) { @@ -617,7 +565,7 @@ /** * Gets the last aggregated number of vertices at the given index. The index * is dependend on how the aggregators were configured during job setup phase. - * + * * @return the value of the aggregator, or null if none was defined. */ public final IntWritable getNumLastAggregatedVertices(int index) { Index: graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (revision 1632265) +++ graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (working copy) @@ -49,8 +49,7 @@ private HashMap> outgoingBundles = new HashMap>(); @SuppressWarnings("rawtypes") - private HashMap> vertexMessageMap = new HashMap>(); - private List tmp; + private HashMap>> messagesPerVertex = new HashMap>>(); @SuppressWarnings("unchecked") @Override @@ -73,35 +72,30 @@ public void addMessage(String peerName, GraphJobMessage msg) { InetSocketAddress targetPeerAddress = getSocketAddress(peerName); - if (msg.isVertexMessage() && combiner != null) { + if (msg.isVertexMessage()) { WritableComparable vertexID = msg.getVertexId(); - Writable vertexValue = msg.getVertexValue(); + List vertexValue = msg.getVertexValue(); - if (!vertexMessageMap.containsKey(targetPeerAddress)) { - vertexMessageMap.put(targetPeerAddress, - new HashMap()); + if (!messagesPerVertex.containsKey(targetPeerAddress)) { + messagesPerVertex.put(targetPeerAddress, + new HashMap>()); } - Map combinedMessage = vertexMessageMap + Map> messages = messagesPerVertex .get(targetPeerAddress); - if (combinedMessage.containsKey(vertexID)) { - tmp = new ArrayList(); - tmp.add(combinedMessage.get(vertexID)); - tmp.add(vertexValue); - - Iterable iterable = new Iterable() { - @Override - public Iterator iterator() { - return tmp.iterator(); - } - }; - - combinedMessage.put(vertexID, combiner.combine(iterable)); + if (messages.containsKey(vertexID)) { + messages.get(vertexID).addAll(vertexValue); } else { - combinedMessage.put(vertexID, vertexValue); + messages.put(vertexID, new ArrayList(vertexValue)); } + if (combiner != null && messages.get(vertexID).size() > 1) { + List singleMessage = new ArrayList(); + singleMessage.add(combiner.combine(messages.get(vertexID))); + messages.put(vertexID, singleMessage); + } + } else { outgoingBundles.get(targetPeerAddress).addMessage(msg); } @@ -131,24 +125,30 @@ @Override public void clear() { outgoingBundles.clear(); - vertexMessageMap.clear(); + messagesPerVertex.clear(); } @SuppressWarnings("rawtypes") @Override public Iterator>> getBundleIterator() { - if (combiner != null) { - for (Map.Entry> e : vertexMessageMap + + Iterator>>> entries = messagesPerVertex + .entrySet().iterator(); + + while (entries.hasNext()) { + Map.Entry>> entry = entries + .next(); + + for (Map.Entry> v : entry.getValue() .entrySet()) { - for (Map.Entry v : e.getValue() - .entrySet()) { - outgoingBundles.get(e.getKey()).addMessage( - new GraphJobMessage(v.getKey(), v.getValue())); - } + outgoingBundles.get(entry.getKey()).addMessage( + new GraphJobMessage(v.getKey(), v.getValue())); } + + entries.remove(); } - vertexMessageMap.clear(); + messagesPerVertex.clear(); return outgoingBundles.entrySet().iterator(); }