Index: core/src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeer.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hama.bsp; import java.io.IOException; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; @@ -202,4 +204,6 @@ * @return the task id of this task. */ public TaskAttemptID getTaskId(); + + public List> getMsgIterators(); } Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -668,4 +668,9 @@ return taskId; } + @Override + public List> getMsgIterators() { + return messenger.getMsgIterators(); + } + } Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map.Entry; import java.util.Queue; @@ -121,6 +122,11 @@ return localQueue.poll(); } + @Override + public List> getMsgIterators() { + return localQueue.getMsgIterators(); + } + /* * (non-Javadoc) * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages() Index: core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (working copy) @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import org.apache.hadoop.io.Writable; @@ -122,4 +123,6 @@ * on. */ public InetSocketAddress getListenerAddress(); + + public List> getMsgIterators(); } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (working copy) @@ -17,7 +17,9 @@ */ package org.apache.hama.bsp.message.queue; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.conf.Configuration; @@ -87,11 +89,6 @@ } @Override - public final Iterator iterator() { - return deque.iterator(); - } - - @Override public void setConf(Configuration conf) { this.conf = conf; } @@ -117,4 +114,15 @@ return this; } + @Override + public List> getMsgIterators() { + List> iterators = new ArrayList>(); + iterators.add(deque.iterator()); + Iterator> it = bundles.iterator(); + while(it.hasNext()) { + iterators.add(it.next().iterator()); + } + return iterators; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (working copy) @@ -17,6 +17,9 @@ */ package org.apache.hama.bsp.message.queue; +import java.util.Iterator; +import java.util.List; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -26,8 +29,7 @@ /** * Simple queue interface. */ -public interface MessageQueue extends Iterable, - Configurable { +public interface MessageQueue extends Configurable { public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent"; @@ -63,6 +65,8 @@ */ public void add(M item); + public List> getMsgIterators(); + /** * Clears all entries in the given queue. */ Index: core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hama.bsp.message.queue; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -45,17 +46,6 @@ /* * (non-Javadoc) - * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator() - */ - @Override - public Iterator iterator() { - synchronized (mutex) { - return queue.iterator(); - } - } - - /* - * (non-Javadoc) * @see * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop * .conf.Configuration) @@ -196,4 +186,9 @@ queue.addAll(otherqueue); } } + + @Override + public List> getMsgIterators() { + return queue.getMsgIterators(); + } } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (revision 1672580) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hama.bsp.message.queue; import java.util.Iterator; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -37,11 +38,6 @@ private Configuration conf; @Override - public Iterator iterator() { - return queue.iterator(); - } - - @Override public void setConf(Configuration conf) { this.conf = conf; } @@ -106,4 +102,9 @@ return this; } + @Override + public List> getMsgIterators() { + throw new RuntimeException("Not support yet."); + } + } Index: core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (revision 1672580) +++ core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (working copy) @@ -151,6 +151,12 @@ return null; } + @Override + public List> getMsgIterators() { + // TODO Auto-generated method stub + return null; + } + } public static class TestBSPPeer implements @@ -307,6 +313,12 @@ return null; } + @Override + public List> getMsgIterators() { + // TODO Auto-generated method stub + return null; + } + } public static class TempSyncClient extends BSPPeerSyncClient { Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1672580) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -231,6 +231,8 @@ Vertex vertex = null; + // TODO use threads + while (currentMessage != null) { vertex = vertices.get((V) currentMessage.getVertexId()); Index: graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (revision 1672580) +++ graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (working copy) @@ -17,7 +17,9 @@ */ package org.apache.hama.graph; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.conf.Configuration; @@ -27,6 +29,8 @@ import org.apache.hama.bsp.message.queue.MessageQueue; import org.apache.hama.bsp.message.queue.SynchronizedQueue; +import com.google.common.collect.Lists; + public class IncomingVertexMessageManager> implements SynchronizedQueue { @@ -36,11 +40,6 @@ private final ConcurrentLinkedQueue mapMessages = new ConcurrentLinkedQueue(); @Override - public Iterator iterator() { - return msgPerVertex.iterator(); - } - - @Override public void setConf(Configuration conf) { this.conf = conf; } @@ -54,7 +53,7 @@ public void addBundle(BSPMessageBundle bundle) { addAll(bundle); } - + @Override public void addAll(Iterable col) { for (GraphJobMessage m : col) @@ -112,5 +111,9 @@ return this; } + @Override + public List> getMsgIterators() { + return msgPerVertex.getMsgIterator(); + } } Index: graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (revision 1672580) +++ graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (working copy) @@ -17,12 +17,16 @@ */ package org.apache.hama.graph; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.io.WritableComparable; +import com.google.common.collect.Lists; + public class MessagePerVertex { @SuppressWarnings("rawtypes") @@ -67,4 +71,13 @@ return (storage.size() > 0) ? storage.pollFirstEntry().getValue() : null; } + public List> getMsgIterator() { + List> iterators = new ArrayList>(); + for (List l : Lists.partition( + new ArrayList(storage.values()), 10)) { + iterators.add(l.iterator()); + } + return iterators; + } + }