Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1539920) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -36,8 +36,6 @@ */ public interface MessageManager { - @Deprecated - public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class"; public static final String RECEIVE_QUEUE_TYPE_CLASS = "hama.messenger.receive.queue.class"; public static final String SENDER_QUEUE_TYPE_CLASS = "hama.messenger.sender.queue.class"; public static final String TRANSFER_QUEUE_TYPE_CLASS = "hama.messenger.xfer.queue.class"; Index: core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (revision 1539920) +++ core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (working copy) @@ -17,8 +17,6 @@ */ package org.apache.hama.bsp.message; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hama.bsp.message.queue.MemoryQueue; import org.apache.hama.bsp.message.queue.MessageQueue; @@ -32,30 +30,6 @@ */ public class MessageTransferQueueFactory { - private static final Log LOG = LogFactory.getLog(MessageTransferQueueFactory.class); - - private static class BackwardCompatibleTransferQueue implements - MessageTransferQueue { - - @Override - public MessageQueue getSenderQueue(Configuration conf) { - return getMessageQueue(conf); - } - - @Override - public MessageQueue getReceiverQueue(Configuration conf) { - return getMessageQueue(conf); - } - - @SuppressWarnings({ "unchecked", "deprecation" }) - private MessageQueue getMessageQueue(Configuration conf) { - return ReflectionUtils.newInstance(conf.getClass( - MessageManager.QUEUE_TYPE_CLASS, MemoryQueue.class, - MessageQueue.class)); - } - - } - private static class DefaultMessageTransferQueue implements MessageTransferQueue { @@ -77,16 +51,7 @@ } - @SuppressWarnings({ "rawtypes", "deprecation" }) public static MessageTransferQueue getMessageTransferQueue(Configuration conf) { - - if (conf.getClass(MessageManager.QUEUE_TYPE_CLASS, null) != null) { - // TODO print only once -- Ed - - // LOG.warn("Message queue is configured on deprecated parameter:" - // + MessageManager.QUEUE_TYPE_CLASS); - return new BackwardCompatibleTransferQueue(); - } return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass( MessageManager.TRANSFER_QUEUE_TYPE_CLASS, DefaultMessageTransferQueue.class, MessageTransferQueue.class)); Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageTransferProtocol.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageTransferProtocol.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageTransferProtocol.java (working copy) @@ -0,0 +1,24 @@ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; + +/** + * Queue transfer protocol for sorted message queue. + * + * @param + */ +public class SortedMessageTransferProtocol> implements + MessageTransferQueue { + + @Override + public SortedMessageQueue getSenderQueue(Configuration conf) { + return new SortedMessageQueue(); + } + + @Override + public SortedMessageQueue getReceiverQueue(Configuration conf) { + return new SortedMessageQueue(); + } + +} Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1539920) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -31,8 +31,8 @@ import org.apache.hama.bsp.Partitioner; import org.apache.hama.bsp.PartitioningRunner.RecordConverter; import org.apache.hama.bsp.message.MessageManager; -import org.apache.hama.bsp.message.queue.MessageQueue; -import org.apache.hama.bsp.message.queue.SortedMessageQueue; +import org.apache.hama.bsp.message.queue.MessageTransferQueue; +import org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol; import com.google.common.base.Preconditions; @@ -123,8 +123,8 @@ /** * Sets the input reader for parsing the input to vertices. */ - public void setVertexInputReaderClass( - @SuppressWarnings("rawtypes") Class cls) { + public void setVertexInputReaderClass(@SuppressWarnings("rawtypes") + Class cls) { ensureState(JobState.DEFINE); conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls, RecordConverter.class); @@ -135,8 +135,8 @@ * Sets the output writer for materializing vertices to the output sink. If * not set, the default DefaultVertexOutputWriter will be used. */ - public void setVertexOutputWriterClass( - @SuppressWarnings("rawtypes") Class cls) { + public void setVertexOutputWriterClass(@SuppressWarnings("rawtypes") + Class cls) { ensureState(JobState.DEFINE); conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls, VertexOutputWriter.class); @@ -149,8 +149,8 @@ } @Override - public void setPartitioner( - @SuppressWarnings("rawtypes") Class theClass) { + public void setPartitioner(@SuppressWarnings("rawtypes") + Class theClass) { super.setPartitioner(theClass); conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); } @@ -197,8 +197,8 @@ } // add the default message queue to the sorted one - this.getConfiguration().setClass(MessageManager.QUEUE_TYPE_CLASS, - SortedMessageQueue.class, MessageQueue.class); + this.getConfiguration().setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, + SortedMessageTransferProtocol.class, MessageTransferQueue.class); super.submit(); }