Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (revision 1413154) +++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (working copy) @@ -54,7 +54,7 @@ * * @param message BSPMessage to add. */ - public void addMessage(M message) { + public BSPMessageBundle addMessage(M message) { String className = message.getClass().getName(); if (!messages.containsKey(className)) { // use linked list because we're just iterating over them @@ -65,6 +65,7 @@ } else { messages.get(className).add(message); } + return this; } public List getMessages() { Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1413154) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -50,7 +50,7 @@ * managed and it maintains a cache for socket addresses. */ public abstract class AbstractMessageManager implements - MessageManager, Configurable { +MessageManager, Configurable { protected static final Log LOG = LogFactory .getLog(AbstractMessageManager.class); @@ -84,14 +84,14 @@ @Override public void init(TaskAttemptID attemptId, BSPPeer peer, Configuration conf, InetSocketAddress peerAddress) { - this.messageListenerQueue = new LinkedList>(); - this.attemptId = attemptId; - this.peer = peer; - this.conf = conf; - this.peerAddress = peerAddress; - this.localQueue = getQueue(); + this.messageListenerQueue = new LinkedList>(); + this.attemptId = attemptId; + this.peer = peer; + this.conf = conf; + this.peerAddress = peerAddress; + this.localQueue = getQueue(); this.localQueueForNextIteration = getSynchronizedQueue(); - this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100); + this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100); } /* @@ -161,6 +161,18 @@ localQueueForNextIteration = getSynchronizedQueue(); notifyInit(); } + + protected InetSocketAddress getInetSocketAddress(String peerName){ + InetSocketAddress targetPeerAddress = null; + // Get socket for target peer. + if (peerSocketCache.containsKey(peerName)) { + targetPeerAddress = peerSocketCache.get(peerName); + } else { + targetPeerAddress = BSPNetUtils.getAddress(peerName); + peerSocketCache.put(peerName, targetPeerAddress); + } + return targetPeerAddress; + } /* * (non-Javadoc) @@ -169,14 +181,7 @@ */ @Override public void send(String peerName, M msg) throws IOException { - InetSocketAddress targetPeerAddress = null; - // Get socket for target peer. - if (peerSocketCache.containsKey(peerName)) { - targetPeerAddress = peerSocketCache.get(peerName); - } else { - targetPeerAddress = BSPNetUtils.getAddress(peerName); - peerSocketCache.put(peerName, targetPeerAddress); - } + InetSocketAddress targetPeerAddress = getInetSocketAddress(peerName); MessageQueue queue = outgoingQueues.get(targetPeerAddress); if (queue == null) { queue = getQueue(); @@ -209,7 +214,7 @@ LOG.debug("Creating new " + queueClass); @SuppressWarnings("unchecked") MessageQueue newInstance = (MessageQueue) ReflectionUtils - .newInstance(queueClass, conf); + .newInstance(queueClass, conf); newInstance.init(conf, attemptId); return newInstance; } @@ -228,7 +233,7 @@ this.conf = conf; } - private void notifySentMessage(String peerName, M message) { + protected void notifySentMessage(String peerName, M message) { Iterator> iterator = this.messageListenerQueue .iterator(); while (iterator.hasNext()) { Index: core/src/main/java/org/apache/hama/bsp/message/AsyncAvroMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AsyncAvroMessageManagerImpl.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/AsyncAvroMessageManagerImpl.java (working copy) @@ -0,0 +1,8 @@ +package org.apache.hama.bsp.message; + +import org.apache.hadoop.io.Writable; + +public class AsyncAvroMessageManagerImpl extends AvroMessageManagerImpl{ + + //TODO +} Index: core/src/main/java/org/apache/hama/bsp/message/AsyncHadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AsyncHadoopMessageManagerImpl.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/AsyncHadoopMessageManagerImpl.java (working copy) @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPMessageBundle; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.Combiner; +import org.apache.hama.bsp.TaskAttemptID; + +import com.google.common.collect.Maps; + +public class AsyncHadoopMessageManagerImpl extends HadoopMessageManagerImpl{ + + private static final Log LOG = LogFactory + .getLog(AsyncHadoopMessageManagerImpl.class); + + private final AtomicInteger size = new AtomicInteger(0); + private ExecutorService service; + private final ConcurrentMap> outgoingQueues = Maps.newConcurrentMap(); + + private int capacity; + private int batchSize; + private int threadCount; + private Combiner combiner; + + @SuppressWarnings("unchecked") + @Override + public final void init(TaskAttemptID attemptId, BSPPeer peer, + Configuration conf, InetSocketAddress peerAddress) { + super.init(attemptId, peer, conf, peerAddress); + this.capacity = conf.getInt(ASYNC_MESSAGING_POOL_SIZE_KEY, 100); + this.batchSize = conf.getInt(ASYNC_MESSAGING_BATCH_SIZE_KEY, 10); + this.threadCount = conf.getInt(ASYNC_MESSAGING_THREAD_COUNT_KEY, 10); + + final String combinerName = conf.get("bsp.combiner.class"); + try{ + if (combinerName != null) { + combiner = (Combiner) ReflectionUtils.newInstance( + conf.getClassByName(combinerName), conf); + } + } + catch(Exception e){ + LOG.warn("Cannot get combiner", e); + } + + service = Executors.newFixedThreadPool(threadCount); + } + + + /* + * Attempts to speed up message sending by doing it asynchronously + * during the superstep. + */ + @Override + public void send(final String peerName, M msg) throws IOException { + if(size.get() == capacity){ + super.send(peerName, msg); + return; + } + size.incrementAndGet(); + final InetSocketAddress addr = getInetSocketAddress(peerName); + ArrayBlockingQueue queue = outgoingQueues.get(addr); + + if(queue == null){ + queue = new ArrayBlockingQueue(batchSize); + outgoingQueues.put(addr, queue); + } + + try { + queue.put(msg); + } catch (InterruptedException e) { + e.printStackTrace(); + LOG.error("This exception should never come", e); + } + + if(queue.size() == batchSize){ + final ArrayBlockingQueue quarantineMessages = queue; + outgoingQueues.put(addr, new ArrayBlockingQueue(batchSize)); + service.submit(new Runnable(){ + + @Override + public void run() { + transfer(quarantineMessages, addr); + size.getAndAdd(-1*quarantineMessages.size()); + for(M msg:quarantineMessages){ + notifySentMessage(peerName, msg); + } + } + }); + } + } + + private final void transfer(ArrayBlockingQueue quarantineMessages, InetSocketAddress addr){ + try{ + HadoopMessageManager bspPeerConnection = getBSPPeerConnection(addr); + if (bspPeerConnection == null) { + throw new IllegalArgumentException("Can not find " + addr.toString() + + " to transfer messages to!"); + } else { + BSPMessageBundle bundle = new BSPMessageBundle(); + if(combiner != null){ + bundle.addMessage(combiner.combine(quarantineMessages)); + } else{ + for(M msg:quarantineMessages){ + bundle.addMessage(msg); + } + } + bspPeerConnection.put(bundle); + } + } + catch(IOException e){ + LOG.error("Error in sending messages", e); + } + } + + + @Override + public void finishSendPhase() throws IOException { + for(Entry> entry:outgoingQueues.entrySet()){ + transfer(entry.getValue(), entry.getKey()); + } + super.finishSendPhase(); + } +} Index: core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (revision 1413154) +++ core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (working copy) @@ -41,7 +41,7 @@ import org.apache.hama.bsp.message.compress.BSPCompressedBundle; import org.apache.hama.util.LRUCache; -public final class AvroMessageManagerImpl extends +public class AvroMessageManagerImpl extends CompressableMessageManager implements Sender { private NettyServer server = null; Index: core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (revision 1413154) +++ core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,8 +40,8 @@ * Implementation of the {@link HadoopMessageManager}. * */ -public final class HadoopMessageManagerImpl extends - CompressableMessageManager implements HadoopMessageManager { +public class HadoopMessageManagerImpl extends +CompressableMessageManager implements HadoopMessageManager { private static final Log LOG = LogFactory .getLog(HadoopMessageManagerImpl.class); @@ -48,14 +49,19 @@ private Server server = null; private LRUCache> peersLRUCache = null; + private ReentrantLock locks[]; @SuppressWarnings("serial") @Override - public final void init(TaskAttemptID attemptId, BSPPeer peer, + public void init(TaskAttemptID attemptId, BSPPeer peer, Configuration conf, InetSocketAddress peerAddress) { super.init(attemptId, peer, conf, peerAddress); super.initCompression(conf); startRPCServer(conf, peerAddress); + locks = new ReentrantLock[conf.getInt(ASYNC_MESSAGING_THREAD_COUNT_KEY, 10)]; + for(int i = 0; i < locks.length; i++){ + locks[i] = new ReentrantLock(); + } peersLRUCache = new LRUCache>( maxCachedConnections) { @Override @@ -123,16 +129,22 @@ @SuppressWarnings("unchecked") protected final HadoopMessageManager getBSPPeerConnection( InetSocketAddress addr) throws IOException { - HadoopMessageManager bspPeerConnection; - if (!peersLRUCache.containsKey(addr)) { - bspPeerConnection = (HadoopMessageManager) RPC.getProxy( - HadoopMessageManager.class, HamaRPCProtocolVersion.versionID, addr, - this.conf); - peersLRUCache.put(addr, bspPeerConnection); - } else { - bspPeerConnection = peersLRUCache.get(addr); + locks[getIndex(addr)].lock(); + try{ + HadoopMessageManager bspPeerConnection; + if (!peersLRUCache.containsKey(addr)) { + bspPeerConnection = (HadoopMessageManager) RPC.getProxy( + HadoopMessageManager.class, HamaRPCProtocolVersion.versionID, addr, + this.conf); + peersLRUCache.put(addr, bspPeerConnection); + } else { + bspPeerConnection = peersLRUCache.get(addr); + } + return bspPeerConnection; } - return bspPeerConnection; + finally{ + locks[getIndex(addr)].unlock(); + } } @Override @@ -157,4 +169,8 @@ return versionID; } + private int getIndex(Object obj){ + return Math.abs(obj.hashCode())%locks.length; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1413154) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -38,6 +38,9 @@ public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class"; public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections"; + public final static String ASYNC_MESSAGING_POOL_SIZE_KEY = "hama.messenger.async.pool.size"; + public final static String ASYNC_MESSAGING_THREAD_COUNT_KEY = "hama.messenger.async.thread.count"; + public final static String ASYNC_MESSAGING_BATCH_SIZE_KEY = "hama.messenger.async.batch.size"; /** * Init can be used to start servers and initialize internal state. If you are