Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1629503) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Map.Entry; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -154,6 +153,8 @@ /** * Just write pair as a result. Note that * this will also be executed when failure happened. + * @param peer + * @throws java.io.IOException */ @Override public final void cleanup( @@ -203,7 +204,7 @@ if (firstVertexMessage != null) { peer.send(peer.getPeerName(), firstVertexMessage); } - GraphJobMessage msg = null; + GraphJobMessage msg; while ((msg = peer.getCurrentMessage()) != null) { peer.send(peer.getPeerName(), msg); } @@ -221,105 +222,69 @@ /** * Do the main logic of a superstep, namely checking if vertices are active, * feeding compute with messages and controlling combiners/aggregators. + * We iterate over our messages and vertices in sorted order. That means + * that we need to seek the first vertex that has the same ID as the + * iterated message. */ - @SuppressWarnings("unchecked") private void doSuperstep(GraphJobMessage currentMessage, BSPPeer peer) throws IOException { int activeVertices = 0; this.changedVertexCnt = 0; - vertices.startSuperstep(); + this.vertices.startSuperstep(); - /* - * We iterate over our messages and vertices in sorted order. That means - * that we need to seek the first vertex that has the same ID as the - * currentMessage or the first vertex that is active. - */ - IDSkippingIterator iterator = vertices.skippingIterator(); - VertexMessageIterable iterable = null; - Vertex vertex = null; + IDSkippingIterator iterator = this.vertices.skippingIterator(); + VertexMessages queueMessages = new VertexMessages(peer); + queueMessages.prependMessage(currentMessage); // note that can't skip inactive vertices because we have to rewrite the // complete vertex file in each iteration - while (iterator.hasNext( - currentMessage == null ? null : (V) currentMessage.getVertexId(), - Strategy.ALL)) { + V firstVID = currentMessage == null ? null : (V) currentMessage.getVertexId(); + while (iterator.hasNext(firstVID, Strategy.ALL)) { + Vertex vertex = iterator.next(); + boolean msgsExist = queueMessages.continueWith(vertex.getVertexID()); - vertex = iterator.next(); - if (currentMessage != null) { - iterable = iterate(currentMessage, (V) currentMessage.getVertexId(), - vertex, peer); - } else { - iterable = null; - } + if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages); - if (iterable != null && vertex.isHalted()) { + if (msgsExist && vertex.isHalted()) { vertex.setActive(); } if (!vertex.isHalted()) { - if (iterable == null) { - vertex.compute(Collections. emptyList()); - } else { - vertex.compute(iterable); - currentMessage = iterable.getOverflowMessage(); - } + vertex.compute(queueMessages); activeVertices++; } + // Dump remaining messages + queueMessages.dumpRest(); + // note that we even need to rewrite the vertex if it is halted for // consistency reasons - vertices.finishVertexComputation(vertex); + this.vertices.finishVertexComputation(vertex); } - vertices.finishSuperstep(); + this.vertices.finishSuperstep(); getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); - iteration++; + this.iteration++; } /** - * Iterating utility that ensures following things:
- * - if vertex is active, but the given message does not match the vertexID, - * return null.
- * - if vertex is inactive, but received a message that matches the ID, build - * an iterator that can be iterated until the next vertex has been reached - * (not buffer in memory) and set the vertex active
- * - if vertex is active, and the given message does match the vertexID, - * return an iterator that can be iterated until the next vertex has been - * reached.
- * - if vertex is inactive, and received no message, return null. + * Utility that ensures that the incoming messages have a target vertex. */ - @SuppressWarnings("unchecked") - private VertexMessageIterable iterate(GraphJobMessage currentMessage, - V firstMessageId, Vertex vertex, - BSPPeer peer) { - int comparision = firstMessageId.compareTo(vertex.getVertexID()); - if (conf.getBoolean("hama.check.missing.vertex", true)) { - if (comparision < 0) { + private void checkMsgOrder(V vid, VertexMessages vm) { + // When the vid is greater than the current message, it means that a vertex + // has sent a message to an other vertex that doesn't exist + if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) { + if (conf.getBoolean("hama.check.missing.vertex", true)) { throw new IllegalArgumentException( - "A message has recieved with a destination ID: " + firstMessageId + - " that does not exist! (Vertex iterator is at" + vertex.getVertexID() - + " ID)"); + "A message has recieved with a destination ID: " + vm.getMessageVID() + + " that does not exist! (Vertex iterator is at" + vid + " ID)"); + } else { + // Skip all unrecognized messages until we find a match + vm.continueUntil(vid); } - } else { - while (comparision < 0) { - VertexMessageIterable messageIterable = new VertexMessageIterable( - currentMessage, firstMessageId, peer); - currentMessage = messageIterable.getOverflowMessage(); - firstMessageId = (V) currentMessage.getVertexId(); - comparision = firstMessageId.compareTo(vertex.getVertexID()); - } } - if (comparision == 0) { - // vertex id matches with the vertex, return an iterator with newest - // message - return new VertexMessageIterable(currentMessage, - vertex.getVertexID(), peer); - } else { - // return null - return null; - } } /** @@ -431,9 +396,9 @@ } } else { if (vertex.compareTo(currentVertex) > 0) { - throw new IOException("The records of split aren't in order by vertex ID."); + throw new IOException("The records of split aren't in order by vertex ID."); } - + if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } Index: graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java (revision 1629503) +++ graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java (working copy) @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.io.Writable; -import org.apache.hama.bsp.BSPPeer; - -import com.google.common.collect.AbstractIterator; - -/** - * The rationale behind this class is that it polls messages if they are - * requested and once it finds a message that is not dedicated for this vertex, - * it breaks the iteration. The message that was polled and doesn't belong to - * the vertex is returned by {@link #getOverflowMessage()}. - */ -public final class VertexMessageIterable implements Iterable { - - private final V vertexID; - private final BSPPeer peer; - - private GraphJobMessage overflow; - private GraphJobMessage currentMessage; - - private Iterator currentIterator; - - public VertexMessageIterable(GraphJobMessage currentMessage, V vertexID, - BSPPeer peer) { - this.currentMessage = currentMessage; - this.vertexID = vertexID; - this.peer = peer; - setupIterator(); - } - - private void setupIterator() { - currentIterator = new AbstractIterator() { - @SuppressWarnings("unchecked") - @Override - protected T computeNext() { - // spool back the current message - if (currentMessage != null) { - GraphJobMessage tmp = currentMessage; - // set it to null, so we don't send it over and over again - currentMessage = null; - return (T) tmp.getVertexValue(); - } - - try { - GraphJobMessage msg = peer.getCurrentMessage(); - if (msg != null) { - if (msg.getVertexId().equals(vertexID)) { - return (T) msg.getVertexValue(); - } else { - overflow = msg; - } - } - return endOfData(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - public GraphJobMessage getOverflowMessage() { - // check if iterable was completely consumed - while (currentIterator.hasNext()) { - currentIterator.next(); - } - return overflow; - } - - @Override - public Iterator iterator() { - return currentIterator; - } - -} Index: graph/src/main/java/org/apache/hama/graph/VertexMessages.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexMessages.java (revision 0) +++ graph/src/main/java/org/apache/hama/graph/VertexMessages.java (working copy) @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hama.bsp.BSPPeer; + +/** + * This class has as a target to iterate the whole sorted queue of the incoming + * messages. Each vertex will be able to call the hasNext() and + * next() methods to consume the messages. The iterator is + * responsible to understand when the messages of a specific Vertex ID have been + * consumed, and then unlock the messages of the next Vertex ID through the + * continueWith() method. + * + * @param + * @param + */ +public class VertexMessages implements Iterator, Iterable { + private final BSPPeer peer; + private V vid = null; + private GraphJobMessage currentMessage = null; + private boolean locked = true; + + public VertexMessages(BSPPeer peer) { + this.peer = peer; + } + + @Override + public boolean hasNext() { + if (locked) { + return false; + } + + try { + if (this.currentMessage == null) { + this.currentMessage = this.peer.getCurrentMessage(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (this.currentMessage != null && this.currentMessage.getVertexId().equals(this.vid)) { + return true; + } + // When a new ID has shown up or the messages are finished, + // we lock the iterator + this.locked = true; + return false; + } + + @Override + public T next() { + if (this.currentMessage == null || this.locked) { + return null; + } + + // Despose the current message and prepare for next hasNext() call + try { + return (T) this.currentMessage.getVertexValue(); + } finally { + this.currentMessage = null; + } + } + + /** + * By implementing both Iterator and Iterable + * interfaces, this class will not be able to re-iterated and the messages + * will be accessed only once. In our case this is fine. + * + * @return an one-time iterator + */ + @Override + public Iterator iterator() { + return this; + } + + /** + * This method should be used only after initialization. If an other message + * exists in the memory of the iterator, the new prepended message + * will be ignored. + * + * @param msg The message to be prepended just after initialization + */ + public void prependMessage(GraphJobMessage msg) { + if (this.currentMessage == null && msg != null) { + this.currentMessage = msg; + } + } + + /** + * Check the vertexID target of the current message that is loaded in the + * iterator and unlock the iterator only if the vid argument is + * matching. + * + * @param vid + * @return return true if the vid is equal to the next message's ID + */ + public boolean continueWith(V vid) { + // Normally when we call this method this.locked == true + this.vid = vid; + this.locked = false; + + if (this.currentMessage == null) { + // Get next message (if there is) and decide based on the new vid + return this.hasNext(); + } + + // If we have a message already loaded + if (!this.currentMessage.getVertexId().equals(vid)) { + this.locked = true; + return false; + } + return true; + } + + /** + * Consume the incoming messages until we find a message that has a target + * equal to the vid argument. + * + * @param vid + * @return + */ + public boolean continueUntil(V vid) { + do { + try { + if (this.currentMessage == null) { + this.currentMessage = this.peer.getCurrentMessage(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (this.currentMessage == null) { + this.locked = true; + return false; + } + } while (!this.currentMessage.getVertexId().equals(this.vid)); + + this.locked = false; + return true; + } + + public void dumpRest() { + while(this.hasNext()) { + this.next(); + } + } + + /** + * Return the target Vertex ID of the current loaded message. + * + * @return + */ + public V getMessageVID() { + return this.currentMessage == null ? null : (V) this.currentMessage.getVertexId(); + } +}