Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BSPPeerImpl; @@ -40,7 +39,6 @@ import org.apache.hama.bsp.message.queue.DiskQueue; import org.apache.hama.bsp.message.queue.MemoryQueue; import org.apache.hama.bsp.message.queue.MessageQueue; -import org.apache.hama.bsp.message.queue.MessageTransferQueue; import org.apache.hama.bsp.message.queue.SingleLockQueue; import org.apache.hama.bsp.message.queue.SynchronizedQueue; import org.apache.hama.util.BSPNetUtils; @@ -208,12 +206,9 @@ * @return a new queue implementation. */ protected MessageQueue getSenderQueue() { - Class queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class); - LOG.debug("Creating new " + queueClass); @SuppressWarnings("unchecked") - MessageTransferQueue newInstance = (MessageTransferQueue) ReflectionUtils - .newInstance(queueClass, conf); - MessageQueue queue = newInstance.getSenderQueue(); + MessageQueue queue = MessageTransferQueueFactory + .getMessageTransferQueue(conf).getSenderQueue(conf); queue.init(conf, attemptId); return queue; } @@ -227,12 +222,9 @@ * @return a new queue implementation. */ protected MessageQueue getReceiverQueue() { - Class queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class); - LOG.debug("Creating new " + queueClass); @SuppressWarnings("unchecked") - MessageTransferQueue newInstance = (MessageTransferQueue) ReflectionUtils - .newInstance(queueClass, conf); - MessageQueue queue = newInstance.getReceiverQueue(); + MessageQueue queue = MessageTransferQueueFactory + .getMessageTransferQueue(conf).getReceiverQueue(conf); queue.init(conf, attemptId); return queue; } Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -36,7 +36,11 @@ */ public interface MessageManager { + @Deprecated public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class"; + public static final String RECEIVE_QUEUE_TYPE_CLASS = "hama.messenger.receive.queue.class"; + public static final String SENDER_QUEUE_TYPE_CLASS = "hama.messenger.sender.queue.class"; + public static final String TRANSFER_QUEUE_TYPE_CLASS = "hama.messenger.xfer.queue.class"; public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections"; /** Index: core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (working copy) @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.bsp.message.queue.MessageTransferQueue; +import org.apache.hama.bsp.message.queue.SpillingQueue; +import org.apache.hama.util.ReflectionUtils; + +/** + * Factory class to define protocols between the sender and receiver queues. + * + * @param The message type. + */ +public class MessageTransferQueueFactory { + + private static final Log LOG = LogFactory.getLog(MessageTransferQueue.class); + + private static class BackwardCompatibleTransferQueue implements + MessageTransferQueue { + + @Override + public MessageQueue getSenderQueue(Configuration conf) { + return getMessageQueue(conf); + } + + @Override + public MessageQueue getReceiverQueue(Configuration conf) { + return getMessageQueue(conf); + } + + @SuppressWarnings({ "unchecked", "deprecation" }) + private MessageQueue getMessageQueue(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MessageManager.QUEUE_TYPE_CLASS, SpillingQueue.class, + MessageQueue.class)); + } + + } + + private static class DefaultMessageTransferQueue implements + MessageTransferQueue { + + @SuppressWarnings("unchecked") + @Override + public MessageQueue getSenderQueue(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MessageManager.SENDER_QUEUE_TYPE_CLASS, SpillingQueue.class, + MessageQueue.class)); + } + + @SuppressWarnings("unchecked") + @Override + public MessageQueue getReceiverQueue(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MessageManager.RECEIVE_QUEUE_TYPE_CLASS, SpillingQueue.class, + MessageQueue.class)); + } + + } + + @SuppressWarnings({ "rawtypes", "deprecation" }) + public static MessageTransferQueue getMessageTransferQueue(Configuration conf) { + + if (conf.getClass(MessageManager.QUEUE_TYPE_CLASS, null) != null) { + LOG.warn("Message queue is configured on deprecated parameter:" + + MessageManager.QUEUE_TYPE_CLASS); + return new BackwardCompatibleTransferQueue(); + } + return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass( + MessageManager.TRANSFER_QUEUE_TYPE_CLASS, + DefaultMessageTransferQueue.class, MessageTransferQueue.class)); + + } +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (working copy) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.bundle; + +import org.apache.hadoop.io.Writable; + +/** + * BSPMessageBundle stores a group of BSPMessages so that they can be sent in + * batch rather than individually. + * + */ +public interface BSPMessageBundle { + + /** + * Returns the size of the message. + * + * @return Size of serialized message bundle. -1 if the size is not known. + */ + public long getSize(); + + /** + * Returns the number of elements. + * + * @return Number of elements. -1 if the number of elements is not known. + */ + public int getNumElements(); + +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (working copy) @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.bundle; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * BSP Message Bundle that encapsulates a ByteBuffer. + * + * @param Message type. + */ +public class ByteBufferBSPMessageBundle implements + BSPMessageBundle { + + private ByteBuffer[] byteArr; + private int count; + + public ByteBufferBSPMessageBundle(ByteBuffer[] buffer, int count) { + byteArr = buffer; + this.count = count; + } + + public ByteBufferBSPMessageBundle(ByteBuffer[] buffer) { + this(buffer, -1); + } + + public ByteBuffer[] getBuffers() { + return byteArr; + } + + @Override + public long getSize() { + return byteArr.length; + } + + @Override + public int getNumElements() { + return count; + } +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (working copy) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.bundle; + +import org.apache.hadoop.io.Writable; + +/** + * BSP Message Bundle that stores the messages as heap byte arrays. + * + * @param Message type. + */ +public class HeapByteArrayBSPMessageBundle implements + BSPMessageBundle { + + byte[] byteArr; + int count; + + public HeapByteArrayBSPMessageBundle(byte[] buffer) { + this(buffer, -1); + } + + public HeapByteArrayBSPMessageBundle(byte[] buffer, int count) { + byteArr = buffer; + this.count = count; + } + + public byte[] getBuffer() { + return byteArr; + } + + @Override + public long getSize() { + return byteArr.length; + } + + @Override + public int getNumElements() { + return byteArr.length; + } +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (working copy) @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.bundle; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; + +public class POJOMessageBundle implements + BSPMessageBundle, Iterable { + + protected static final Log LOG = LogFactory.getLog(POJOMessageBundle.class); + + protected HashMap> messages = new HashMap>(); + protected HashMap> classCache = new HashMap>(); + + protected int numElements; + + private static class BundleIterator implements + Iterator { + + private Iterator> listIterator; + private Iterator messageIterator; + + public BundleIterator(Iterator> listIterator) { + this.listIterator = listIterator; + } + + @Override + public boolean hasNext() { + return listIterator.hasNext() || messageIterator.hasNext(); + } + + @Override + public M next() { + while (true) { + if (messageIterator != null && messageIterator.hasNext()) { + return messageIterator.next(); + } else { + if (listIterator.hasNext()) { + messageIterator = listIterator.next().iterator(); + } else { + return null; + } + } + } + } + + @Override + public void remove() { + } + + } + + public POJOMessageBundle() { + } + + /** + * Add message to this bundle. + * + * @param message BSPMessage to add. + */ + public void addMessage(M message) { + String className = message.getClass().getName(); + List list = messages.get(className); + ++numElements; + if (list == null) { + list = new ArrayList(); + messages.put(className, list); + } + + list.add(message); + } + + public List getMessages() { + // here we use an arraylist, because we know the size and outside may need + // random access + List mergeList = new ArrayList(messages.size()); + for (List c : messages.values()) { + mergeList.addAll(c); + } + return mergeList; + } + + @Override + public Iterator iterator() { + return new BundleIterator(this.messages.values().iterator()); + } + + @Override + public long getSize() { + return numElements; + } + + @Override + public int getNumElements() { + return numElements; + } +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (working copy) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.bundle; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +public class WritableMessageBundle extends + POJOMessageBundle implements Writable { + + @Override + public void write(DataOutput out) throws IOException { + // writes the k/v mapping size + out.writeInt(messages.size()); + if (messages.size() > 0) { + for (Entry> entry : messages.entrySet()) { + out.writeUTF(entry.getKey()); + List messageList = entry.getValue(); + out.writeInt(messageList.size()); + for (M msg : messageList) { + msg.write(out); + } + } + } + } + + @Override + @SuppressWarnings("unchecked") + public void readFields(DataInput in) throws IOException { + if (messages == null) { + messages = new HashMap>(); + } + int numMessages = in.readInt(); + if (numMessages > 0) { + for (int entries = 0; entries < numMessages; entries++) { + String className = in.readUTF(); + int size = in.readInt(); + List msgList = new ArrayList(size); + messages.put(className, msgList); + + Class clazz = null; + if ((clazz = classCache.get(className)) == null) { + try { + clazz = (Class) Class.forName(className); + classCache.put(className, clazz); + } catch (ClassNotFoundException e) { + LOG.error("Class was not found.", e); + } + } + + for (int i = 0; i < size; i++) { + M msg = ReflectionUtils.newInstance(clazz, null); + msg.readFields(in); + msgList.add(msg); + } + + } + } + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (working copy) @@ -441,6 +441,7 @@ } } + closed_ = true; } } Index: core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (working copy) @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.message.bundle.BSPMessageBundle; + +public interface BSPMessageInterface { + + public void add(BSPMessageBundle bundle); + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (working copy) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.message.bundle.BSPMessageBundle; + +public abstract class ByteArrayMessageQueue implements + BSPMessageInterface, MessageQueue { + + @Override + public void add(BSPMessageBundle bundle) { + // TODO Auto-generated method stub + + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hama.bsp.message.queue; import java.io.IOException; -import java.util.Collection; import java.util.Iterator; import org.apache.commons.logging.Log; @@ -46,8 +45,7 @@ * configuration.
* It is experimental to use. */ -public final class DiskQueue implements MessageQueue, - MessageTransferQueue { +public final class DiskQueue extends POJOMessageQueue { public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir"; @@ -172,7 +170,7 @@ } @Override - public final void addAll(Collection col) { + public final void addAll(Iterable col) { for (M item : col) { add(item); } @@ -310,15 +308,4 @@ public boolean isMessageSerialized() { return false; } - - @Override - public MessageQueue getSenderQueue() { - return this; - } - - @Override - public MessageQueue getReceiverQueue() { - return this; - } - } Index: core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java (working copy) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; + +/** + * The disk transfer queue protocol. + * + * @param + */ +public class DiskTransferProtocolQueue implements + MessageTransferQueue { + + @Override + public MessageQueue getSenderQueue(Configuration conf) { + return new DiskQueue(); + } + + @Override + public MessageQueue getReceiverQueue(Configuration conf) { + return new DiskQueue(); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hama.bsp.message.queue; import java.util.ArrayDeque; -import java.util.Collection; import java.util.Deque; import java.util.Iterator; @@ -29,15 +28,15 @@ /** * LinkedList backed queue structure for bookkeeping messages. */ -public final class MemoryQueue implements MessageQueue, - MessageTransferQueue { +public final class MemoryQueue extends POJOMessageQueue { private final Deque deque = new ArrayDeque(); private Configuration conf; @Override - public final void addAll(Collection col) { - deque.addAll(col); + public final void addAll(Iterable col) { + for (M m : col) + deque.add(m); } @Override @@ -108,15 +107,4 @@ public boolean isMessageSerialized() { return false; } - - @Override - public MessageQueue getSenderQueue() { - return this; - } - - @Override - public MessageQueue getReceiverQueue() { - return this; - } - } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java (working copy) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; + +/** + * Queue transfer protocol for memory queue. + * + * @param + */ +public class MemoryTransferProtocol implements + MessageTransferQueue { + + @Override + public MessageQueue getSenderQueue(Configuration conf) { + return new MemoryQueue(); + } + + @Override + public MessageQueue getReceiverQueue(Configuration conf) { + return new MemoryQueue(); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (working copy) @@ -17,8 +17,6 @@ */ package org.apache.hama.bsp.message.queue; -import java.util.Collection; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hama.bsp.TaskAttemptID; @@ -51,7 +49,7 @@ /** * Adds a whole Java Collection to the implementing queue. */ - public void addAll(Collection col); + public void addAll(Iterable col); /** * Adds the other queue to this queue. Index: core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (working copy) @@ -17,21 +17,23 @@ */ package org.apache.hama.bsp.message.queue; +import org.apache.hadoop.conf.Configuration; + /** + * Interface to define the sender queue and receiver queue protocol. * - * * @param */ public interface MessageTransferQueue { - + /** - * + * Instantiate a sender queue. */ - public MessageQueue getSenderQueue(); + public MessageQueue getSenderQueue(Configuration conf); /** - * + * Instantiate a receiver queue. */ - public MessageQueue getReceiverQueue(); + public MessageQueue getReceiverQueue(Configuration conf); } Index: core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (working copy) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.message.bundle.BSPMessageBundle; +import org.apache.hama.bsp.message.bundle.POJOMessageBundle; + +/** + * Java object message queue. + * + * @param Message type. + */ +public abstract class POJOMessageQueue implements + BSPMessageInterface, Iterable, MessageQueue { + + @Override + public void add(BSPMessageBundle bundle){ + this.addAll((POJOMessageBundle)bundle); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (working copy) @@ -17,7 +17,6 @@ */ package org.apache.hama.bsp.message.queue; -import java.util.Collection; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; @@ -117,9 +116,10 @@ * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection) */ @Override - public void addAll(Collection col) { + public void addAll(Iterable col) { synchronized (mutex) { - queue.addAll(col); + for (T m : col) + queue.add(m); } } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (working copy) @@ -17,20 +17,21 @@ */ package org.apache.hama.bsp.message.queue; -import java.util.Collection; import java.util.Iterator; import java.util.PriorityQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.message.bundle.BSPMessageBundle; +import org.apache.hama.bsp.message.bundle.POJOMessageBundle; /** * Heap (Java's priority queue) based message queue implementation that supports * sorted receive and send. */ public final class SortedMessageQueue> - implements MessageQueue, MessageTransferQueue { + implements MessageQueue, BSPMessageInterface { private final PriorityQueue queue = new PriorityQueue(); private Configuration conf; @@ -51,8 +52,9 @@ } @Override - public void addAll(Collection col) { - queue.addAll(col); + public void addAll(Iterable col) { + for (M m : col) + queue.add(m); } @Override @@ -111,13 +113,8 @@ } @Override - public MessageQueue getSenderQueue() { - return this; + public void add(BSPMessageBundle bundle) { + addAll((POJOMessageBundle) bundle); } - @Override - public MessageQueue getReceiverQueue() { - return this; - } - } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (revision 1454689) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (working copy) @@ -22,7 +22,6 @@ import java.io.IOException; import java.math.BigInteger; import java.security.SecureRandom; -import java.util.Collection; import java.util.Iterator; import org.apache.commons.logging.Log; @@ -33,6 +32,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.message.bundle.BSPMessageBundle; +import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle; import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor; import org.apache.hama.bsp.message.io.PreFetchCache; import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; @@ -44,8 +45,8 @@ * * @param */ -public class SpillingQueue implements MessageQueue, - MessageTransferQueue { +public class SpillingQueue extends ByteArrayMessageQueue + implements MessageTransferQueue { private static final Log LOG = LogFactory.getLog(SpillingQueue.class); @@ -144,7 +145,7 @@ } @Override - public void addAll(Collection msgs) { + public void addAll(Iterable msgs) { for (M msg : msgs) { add(msg); } @@ -342,13 +343,23 @@ } @Override - public MessageQueue getSenderQueue() { + public MessageQueue getSenderQueue(Configuration conf) { return this; } @Override - public MessageQueue getReceiverQueue() { + public MessageQueue getReceiverQueue(Configuration conf) { return this; } + @Override + public void add(BSPMessageBundle bundle) { + try { + this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle) bundle) + .getBuffer()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java (working copy) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; + +/** + * Queue transfer protocol for spilling queue. + * + * @param The message type. + */ +public class SpillingQueueTransferProtocol implements + MessageTransferQueue { + + @Override + public MessageQueue getSenderQueue(Configuration conf) { + return new SpillingQueue(); + } + + @Override + public MessageQueue getReceiverQueue(Configuration conf) { + return new SpillingQueue(); + } +} Index: core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (revision 1454689) +++ core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (working copy) @@ -33,8 +33,10 @@ import org.apache.hama.bsp.Counters; import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.bsp.message.queue.DiskQueue; -import org.apache.hama.bsp.message.queue.MemoryQueue; +import org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue; +import org.apache.hama.bsp.message.queue.MemoryTransferProtocol; import org.apache.hama.bsp.message.queue.MessageQueue; +import org.apache.hama.bsp.message.queue.MessageTransferQueue; import org.apache.hama.util.BSPNetUtils; public class TestHadoopMessageManager extends TestCase { @@ -46,8 +48,8 @@ public void testMemoryMessaging() throws Exception { Configuration conf = new Configuration(); - conf.set(MessageManager.QUEUE_TYPE_CLASS, - MemoryQueue.class.getCanonicalName()); + conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, + MemoryTransferProtocol.class, MessageTransferQueue.class); conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); messagingInternal(conf); } @@ -55,6 +57,8 @@ public void testDiskMessaging() throws Exception { Configuration conf = new Configuration(); conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); + conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, + DiskTransferProtocolQueue.class, MessageTransferQueue.class); messagingInternal(conf); } Index: core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (revision 0) +++ core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (working copy) @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashSet; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.message.bundle.POJOMessageBundle; +import org.apache.hama.bsp.message.bundle.WritableMessageBundle; + +public class TestMessageBundle extends TestCase { + + public void testPOJOWritableMessageBundle() { + + POJOMessageBundle messageBundle = new POJOMessageBundle(); + for (int i = 0; i < 100; ++i) { + messageBundle.addMessage(new IntWritable(i)); + } + assertEquals(100, messageBundle.getSize()); + assertEquals(100, messageBundle.getNumElements()); + + int i = 0; + for (IntWritable writable : messageBundle) { + assertEquals(i++, writable.get()); + } + + } + + public void testDifferentWritableMessageBundle() { + WritableMessageBundle messageBundle = new WritableMessageBundle(); + int numElements = 5; + + HashSet set = new HashSet(); + + for (int i = 0; i < numElements; ++i) { + Writable w = new IntWritable(i); + set.add(w); + messageBundle.addMessage(w); + } + String msg; + for (int i = 0; i < numElements; ++i) { + msg = "" + i; + Writable w = new Text(msg); + set.add(w); + messageBundle.addMessage(w); + } + + assertEquals(2 * numElements, messageBundle.getSize()); + assertEquals(2 * numElements, messageBundle.getNumElements()); + + for (Writable writable : messageBundle) { + set.remove(writable); + } + assertTrue(set.isEmpty()); + + } + + public void testReadWriteWritableMessageBundle() throws IOException { + WritableMessageBundle messageBundle = new WritableMessageBundle(); + int numElements = 5; + + HashSet set = new HashSet(); + + for (int i = 0; i < numElements; ++i) { + Writable w = new IntWritable(i); + set.add(w); + messageBundle.addMessage(w); + } + String msg; + for (int i = 0; i < numElements; ++i) { + msg = "" + i; + Writable w = new Text(msg); + set.add(w); + messageBundle.addMessage(w); + } + + assertEquals(2 * numElements, messageBundle.getSize()); + assertEquals(2 * numElements, messageBundle.getNumElements()); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024); + DataOutput output = new DataOutputStream(outputStream); + messageBundle.write(output); + + ByteArrayInputStream inStream = new ByteArrayInputStream( + outputStream.toByteArray()); + DataInput in = new DataInputStream(inStream); + WritableMessageBundle newBundle = new WritableMessageBundle(); + newBundle.readFields(in); + + for (Writable writable : newBundle) { + set.remove(writable); + } + assertTrue(set.isEmpty()); + + } + +}