Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hama.bsp.message.queue.DiskQueue; import org.apache.hama.bsp.message.queue.MemoryQueue; import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.bsp.message.queue.MessageTransferQueue; import org.apache.hama.bsp.message.queue.SingleLockQueue; import org.apache.hama.bsp.message.queue.SynchronizedQueue; import org.apache.hama.util.BSPNetUtils; @@ -59,7 +60,9 @@ protected Configuration conf; protected final HashMap peerSocketCache = new HashMap(); protected final HashMap> outgoingQueues = new HashMap>(); + protected MessageQueue localQueue; + // this must be a synchronized implementation: this is accessed per RPC protected SynchronizedQueue localQueueForNextIteration; // this peer object is just used for counter incrementation @@ -89,8 +92,8 @@ this.peer = peer; this.conf = conf; this.peerAddress = peerAddress; - this.localQueue = getQueue(); - this.localQueueForNextIteration = getSynchronizedQueue(); + this.localQueue = getSenderQueue(); + this.localQueueForNextIteration = getSynchronizedReceiverQueue(); this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100); } @@ -158,7 +161,7 @@ public final void clearOutgoingQueues() { localQueue = localQueueForNextIteration.getMessageQueue(); localQueue.prepareRead(); - localQueueForNextIteration = getSynchronizedQueue(); + localQueueForNextIteration = getSynchronizedReceiverQueue(); notifyInit(); } @@ -179,7 +182,7 @@ } MessageQueue queue = outgoingQueues.get(targetPeerAddress); if (queue == null) { - queue = getQueue(); + queue = getSenderQueue(); } queue.add(msg); peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L); @@ -204,20 +207,44 @@ * * @return a new queue implementation. */ - protected MessageQueue getQueue() { + protected MessageQueue getSenderQueue() { Class queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class); LOG.debug("Creating new " + queueClass); @SuppressWarnings("unchecked") - MessageQueue newInstance = (MessageQueue) ReflectionUtils + MessageTransferQueue newInstance = (MessageTransferQueue) ReflectionUtils .newInstance(queueClass, conf); - newInstance.init(conf, attemptId); - return newInstance; + MessageQueue queue = newInstance.getSenderQueue(); + queue.init(conf, attemptId); + return queue; } - protected SynchronizedQueue getSynchronizedQueue() { - return SingleLockQueue.synchronize(getQueue()); + /** + * Returns a new queue implementation based on what was configured. If nothing + * has been configured for "hama.messenger.queue.class" then the + * {@link MemoryQueue} is used. If you have scalability issues, then better + * use {@link DiskQueue}. + * + * @return a new queue implementation. + */ + protected MessageQueue getReceiverQueue() { + Class queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class); + LOG.debug("Creating new " + queueClass); + @SuppressWarnings("unchecked") + MessageTransferQueue newInstance = (MessageTransferQueue) ReflectionUtils + .newInstance(queueClass, conf); + MessageQueue queue = newInstance.getReceiverQueue(); + queue.init(conf, attemptId); + return queue; } + protected SynchronizedQueue getSynchronizedSenderQueue() { + return SingleLockQueue.synchronize(getSenderQueue()); + } + + protected SynchronizedQueue getSynchronizedReceiverQueue() { + return SingleLockQueue.synchronize(getReceiverQueue()); + } + @Override public final Configuration getConf() { return conf; @@ -229,27 +256,27 @@ } private void notifySentMessage(String peerName, M message) { - for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { - aMessageListenerQueue.onMessageSent(peerName, message); - } + for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { + aMessageListenerQueue.onMessageSent(peerName, message); + } } private void notifyReceivedMessage(M message) throws IOException { - for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { - aMessageListenerQueue.onMessageReceived(message); - } + for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { + aMessageListenerQueue.onMessageReceived(message); + } } private void notifyInit() { - for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { - aMessageListenerQueue.onInitialized(); - } + for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { + aMessageListenerQueue.onInitialized(); + } } private void notifyClose() { - for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { - aMessageListenerQueue.onClose(); - } + for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { + aMessageListenerQueue.onClose(); + } } @Override Index: core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (working copy) @@ -46,7 +46,7 @@ * configuration.
* It is experimental to use. */ -public final class DiskQueue implements MessageQueue { +public final class DiskQueue implements MessageQueue, MessageTransferQueue { public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir"; @@ -310,4 +310,14 @@ return false; } + @Override + public MessageQueue getSenderQueue() { + return this; + } + + @Override + public MessageQueue getReceiverQueue() { + return this; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (working copy) @@ -150,7 +150,7 @@ * @see org.apache.hama.bsp.message.SynchronizedQueue#poll() */ @Override - public Object poll() { + public T poll() { synchronized (mutex) { return queue.poll(); } @@ -190,4 +190,25 @@ Object mutex) { return new SingleLockQueue(queue, mutex); } + + @Override + public void prepareWrite() { + synchronized (mutex) { + queue.prepareWrite(); + } + } + + @Override + public void addAll(MessageQueue otherqueue) { + synchronized (mutex) { + queue.addAll(otherqueue); + } + } + + @Override + public boolean isMessageSerialized() { + synchronized (mutex) { + return queue.isMessageSerialized(); + } + } } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (working copy) @@ -17,37 +17,13 @@ */ package org.apache.hama.bsp.message.queue; -import java.util.Collection; -import java.util.Iterator; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hama.bsp.TaskAttemptID; - /** * Synchronized Queue interface. Can be used to implement better synchronized * datastructures. */ -public interface SynchronizedQueue extends Configurable { +public interface SynchronizedQueue extends MessageQueue{ - public abstract Iterator iterator(); - - public abstract void init(Configuration conf, TaskAttemptID id); - - public abstract void close(); - - public abstract void prepareRead(); - - public abstract void addAll(Collection col); - - public abstract void add(T item); - - public abstract void clear(); - - public abstract Object poll(); - - public abstract int size(); - public abstract MessageQueue getMessageQueue(); } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (working copy) @@ -44,7 +44,8 @@ * * @param */ -public class SpillingQueue implements MessageQueue { +public class SpillingQueue implements MessageQueue, + MessageTransferQueue { private static final Log LOG = LogFactory.getLog(SpillingQueue.class); @@ -152,9 +153,9 @@ @Override public void addAll(MessageQueue arg0) { - for (M anArg0 : arg0) { - add(anArg0); - } + for (M anArg0 : arg0) { + add(anArg0); + } } @Override @@ -334,12 +335,20 @@ public int size() { return numMessagesWritten; } - @Override public boolean isMessageSerialized() { return true; } + @Override + public MessageQueue getSenderQueue() { + return this; + } + @Override + public MessageQueue getReceiverQueue() { + return this; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (working copy) @@ -30,7 +30,7 @@ * sorted receive and send. */ public final class SortedMessageQueue> - implements MessageQueue { + implements MessageQueue, MessageTransferQueue { private final PriorityQueue queue = new PriorityQueue(); private Configuration conf; @@ -110,4 +110,14 @@ return false; } + @Override + public MessageQueue getSenderQueue() { + return this; + } + + @Override + public MessageQueue getReceiverQueue() { + return this; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (revision 1445474) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (working copy) @@ -29,7 +29,7 @@ /** * LinkedList backed queue structure for bookkeeping messages. */ -public final class MemoryQueue implements MessageQueue { +public final class MemoryQueue implements MessageQueue, MessageTransferQueue { private final Deque deque = new ArrayDeque(); private Configuration conf; @@ -108,4 +108,14 @@ return false; } + @Override + public MessageQueue getSenderQueue() { + return this; + } + + @Override + public MessageQueue getReceiverQueue() { + return this; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +/** + * + * + * @param + */ +public interface MessageTransferQueue { + + /** + * + */ + public MessageQueue getSenderQueue(); + + /** + * + */ + public MessageQueue getReceiverQueue(); + +}