Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (working copy) @@ -38,7 +38,7 @@ * */ public class BSPMessageBundle implements Writable, - Iterable { + Iterable, BSPMessageBundleInterface { public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class); @@ -111,6 +111,10 @@ bundleSize++; } + public byte[] getBuffer() { + return byteBuffer.toByteArray(); + } + public Iterator iterator() { bis = new ByteArrayInputStream(byteBuffer.toByteArray()); dis = new DataInputStream(bis); @@ -193,7 +197,7 @@ * @return the byte length of messages * @throws IOException */ - public long getLength() throws IOException { + public long getLength() { return bundleLength; } @@ -219,4 +223,5 @@ bufferDos.write(temp); } } + } Index: core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (working copy) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.Writable; + +public interface BSPMessageBundleInterface { + + /** + * @return the number of the messages. + */ + public int size(); + + /** + * Add message to this bundle. + * + * @param message BSPMessage to add. + */ + public void addMessage(M message); + + /** + * @return the iterator. + */ + public Iterator iterator(); + + /** + * @return the message buffer. + */ + public byte[] getBuffer(); + + /** + * @return the total byte length of messages + * @throws IOException + */ + public long getLength(); + +} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (working copy) @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import org.apache.hadoop.io.Writable; - -/** - * BSPMessageBundle stores a group of BSPMessages so that they can be sent in - * batch rather than individually. - * - */ -public interface BSPMessageBundle { - - /** - * Returns the size of the message. - * - * @return Size of serialized message bundle. -1 if the size is not known. - */ - public long getSize(); - - /** - * Returns the number of elements. - * - * @return Number of elements. -1 if the number of elements is not known. - */ - public int getNumElements(); - -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (working copy) @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.io.Writable; - -/** - * BSP Message Bundle that encapsulates a ByteBuffer. - * - * @param Message type. - */ -public class ByteBufferBSPMessageBundle implements - BSPMessageBundle { - - private ByteBuffer[] byteArr; - private int count; - - public ByteBufferBSPMessageBundle(ByteBuffer[] buffer, int count) { - byteArr = buffer; - this.count = count; - } - - public ByteBufferBSPMessageBundle(ByteBuffer[] buffer) { - this(buffer, -1); - } - - public ByteBuffer[] getBuffers() { - return byteArr; - } - - @Override - public long getSize() { - return byteArr.length; - } - - @Override - public int getNumElements() { - return count; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (working copy) @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import org.apache.hadoop.io.Writable; - -/** - * BSP Message Bundle that stores the messages as heap byte arrays. - * - * @param Message type. - */ -public class HeapByteArrayBSPMessageBundle implements - BSPMessageBundle { - - byte[] byteArr; - int count; - - public HeapByteArrayBSPMessageBundle(byte[] buffer) { - this(buffer, -1); - } - - public HeapByteArrayBSPMessageBundle(byte[] buffer, int count) { - byteArr = buffer; - this.count = count; - } - - public byte[] getBuffer() { - return byteArr; - } - - @Override - public long getSize() { - return byteArr.length; - } - - @Override - public int getNumElements() { - return byteArr.length; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (working copy) @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; - -public class POJOMessageBundle implements - BSPMessageBundle, Iterable { - - protected static final Log LOG = LogFactory.getLog(POJOMessageBundle.class); - - protected HashMap> messages = new HashMap>(); - protected HashMap> classCache = new HashMap>(); - - protected int numElements; - - private static class BundleIterator implements - Iterator { - - private Iterator> listIterator; - private Iterator messageIterator; - - public BundleIterator(Iterator> listIterator) { - this.listIterator = listIterator; - } - - @Override - public boolean hasNext() { - return listIterator.hasNext() || messageIterator.hasNext(); - } - - @Override - public M next() { - while (true) { - if (messageIterator != null && messageIterator.hasNext()) { - return messageIterator.next(); - } else { - if (listIterator.hasNext()) { - messageIterator = listIterator.next().iterator(); - } else { - return null; - } - } - } - } - - @Override - public void remove() { - } - - } - - public POJOMessageBundle() { - } - - /** - * Add message to this bundle. - * - * @param message BSPMessage to add. - */ - public void addMessage(M message) { - String className = message.getClass().getName(); - List list = messages.get(className); - ++numElements; - if (list == null) { - list = new ArrayList(); - messages.put(className, list); - } - - list.add(message); - } - - public List getMessages() { - // here we use an arraylist, because we know the size and outside may need - // random access - List mergeList = new ArrayList(messages.size()); - for (List c : messages.values()) { - mergeList.addAll(c); - } - return mergeList; - } - - @Override - public Iterator iterator() { - return new BundleIterator(this.messages.values().iterator()); - } - - @Override - public long getSize() { - return numElements; - } - - @Override - public int getNumElements() { - return numElements; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (working copy) @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; - -public class WritableMessageBundle extends - POJOMessageBundle implements Writable { - - @Override - public void write(DataOutput out) throws IOException { - // writes the k/v mapping size - out.writeInt(messages.size()); - if (messages.size() > 0) { - for (Entry> entry : messages.entrySet()) { - out.writeUTF(entry.getKey()); - List messageList = entry.getValue(); - out.writeInt(messageList.size()); - for (M msg : messageList) { - msg.write(out); - } - } - } - } - - @Override - @SuppressWarnings("unchecked") - public void readFields(DataInput in) throws IOException { - if (messages == null) { - messages = new HashMap>(); - } - int numMessages = in.readInt(); - if (numMessages > 0) { - for (int entries = 0; entries < numMessages; entries++) { - String className = in.readUTF(); - int size = in.readInt(); - List msgList = new ArrayList(size); - messages.put(className, msgList); - - Class clazz = null; - if ((clazz = classCache.get(className)) == null) { - try { - clazz = (Class) Class.forName(className); - classCache.put(className, clazz); - } catch (ClassNotFoundException e) { - LOG.error("Class was not found.", e); - } - } - - for (int i = 0; i < size; i++) { - M msg = ReflectionUtils.newInstance(clazz, null); - msg.readFields(in); - msgList.add(msg); - } - - } - } - } - -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (working copy) @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import org.apache.hadoop.io.Writable; - -/** - * BSPMessageBundle stores a group of BSPMessages so that they can be sent in - * batch rather than individually. - * - */ -public interface BSPMessageBundle { - - /** - * Returns the size of the message. - * - * @return Size of serialized message bundle. -1 if the size is not known. - */ - public long getSize(); - - /** - * Returns the number of elements. - * - * @return Number of elements. -1 if the number of elements is not known. - */ - public int getNumElements(); - -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (working copy) @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.io.Writable; - -/** - * BSP Message Bundle that encapsulates a ByteBuffer. - * - * @param Message type. - */ -public class ByteBufferBSPMessageBundle implements - BSPMessageBundle { - - private ByteBuffer[] byteArr; - private int count; - - public ByteBufferBSPMessageBundle(ByteBuffer[] buffer, int count) { - byteArr = buffer; - this.count = count; - } - - public ByteBufferBSPMessageBundle(ByteBuffer[] buffer) { - this(buffer, -1); - } - - public ByteBuffer[] getBuffers() { - return byteArr; - } - - @Override - public long getSize() { - return byteArr.length; - } - - @Override - public int getNumElements() { - return count; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (working copy) @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import org.apache.hadoop.io.Writable; - -/** - * BSP Message Bundle that stores the messages as heap byte arrays. - * - * @param Message type. - */ -public class HeapByteArrayBSPMessageBundle implements - BSPMessageBundle { - - byte[] byteArr; - int count; - - public HeapByteArrayBSPMessageBundle(byte[] buffer) { - this(buffer, -1); - } - - public HeapByteArrayBSPMessageBundle(byte[] buffer, int count) { - byteArr = buffer; - this.count = count; - } - - public byte[] getBuffer() { - return byteArr; - } - - @Override - public long getSize() { - return byteArr.length; - } - - @Override - public int getNumElements() { - return byteArr.length; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (working copy) @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; - -public class POJOMessageBundle implements - BSPMessageBundle, Iterable { - - protected static final Log LOG = LogFactory.getLog(POJOMessageBundle.class); - - protected HashMap> messages = new HashMap>(); - protected HashMap> classCache = new HashMap>(); - - protected int numElements; - - private static class BundleIterator implements - Iterator { - - private Iterator> listIterator; - private Iterator messageIterator; - - public BundleIterator(Iterator> listIterator) { - this.listIterator = listIterator; - } - - @Override - public boolean hasNext() { - return listIterator.hasNext() || messageIterator.hasNext(); - } - - @Override - public M next() { - while (true) { - if (messageIterator != null && messageIterator.hasNext()) { - return messageIterator.next(); - } else { - if (listIterator.hasNext()) { - messageIterator = listIterator.next().iterator(); - } else { - return null; - } - } - } - } - - @Override - public void remove() { - } - - } - - public POJOMessageBundle() { - } - - /** - * Add message to this bundle. - * - * @param message BSPMessage to add. - */ - public void addMessage(M message) { - String className = message.getClass().getName(); - List list = messages.get(className); - ++numElements; - if (list == null) { - list = new ArrayList(); - messages.put(className, list); - } - - list.add(message); - } - - public List getMessages() { - // here we use an arraylist, because we know the size and outside may need - // random access - List mergeList = new ArrayList(messages.size()); - for (List c : messages.values()) { - mergeList.addAll(c); - } - return mergeList; - } - - @Override - public Iterator iterator() { - return new BundleIterator(this.messages.values().iterator()); - } - - @Override - public long getSize() { - return numElements; - } - - @Override - public int getNumElements() { - return numElements; - } -} Index: core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (working copy) @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.bundle; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; - -public class WritableMessageBundle extends - POJOMessageBundle implements Writable { - - @Override - public void write(DataOutput out) throws IOException { - // writes the k/v mapping size - out.writeInt(messages.size()); - if (messages.size() > 0) { - for (Entry> entry : messages.entrySet()) { - out.writeUTF(entry.getKey()); - List messageList = entry.getValue(); - out.writeInt(messageList.size()); - for (M msg : messageList) { - msg.write(out); - } - } - } - } - - @Override - @SuppressWarnings("unchecked") - public void readFields(DataInput in) throws IOException { - if (messages == null) { - messages = new HashMap>(); - } - int numMessages = in.readInt(); - if (numMessages > 0) { - for (int entries = 0; entries < numMessages; entries++) { - String className = in.readUTF(); - int size = in.readInt(); - List msgList = new ArrayList(size); - messages.put(className, msgList); - - Class clazz = null; - if ((clazz = classCache.get(className)) == null) { - try { - clazz = (Class) Class.forName(className); - classCache.put(className, clazz); - } catch (ClassNotFoundException e) { - LOG.error("Class was not found.", e); - } - } - - for (int i = 0; i < size; i++) { - M msg = ReflectionUtils.newInstance(clazz, null); - msg.readFields(in); - msgList.add(msg); - } - - } - } - } - -} Index: core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (working copy) @@ -18,7 +18,7 @@ package org.apache.hama.bsp.message.queue; import org.apache.hadoop.io.Writable; -import org.apache.hama.bsp.message.bundle.BSPMessageBundle; +import org.apache.hama.bsp.BSPMessageBundle; public interface BSPMessageInterface { Index: core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (working copy) @@ -18,7 +18,7 @@ package org.apache.hama.bsp.message.queue; import org.apache.hadoop.io.Writable; -import org.apache.hama.bsp.message.bundle.BSPMessageBundle; +import org.apache.hama.bsp.BSPMessageBundle; public abstract class ByteArrayMessageQueue implements BSPMessageInterface, MessageQueue { Index: core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java (working copy) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.queue; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPMessageBundle; + +/** + * Java object message queue. + * + * @param Message type. + */ +public abstract class DefaultMessageQueue implements + BSPMessageInterface, Iterable, MessageQueue { + + @Override + public void add(BSPMessageBundle bundle){ + addAll(bundle); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (working copy) @@ -45,7 +45,7 @@ * configuration.
* It is experimental to use. */ -public final class DiskQueue extends POJOMessageQueue { +public final class DiskQueue extends DefaultMessageQueue { public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir"; Index: core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (working copy) @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message.queue; - -import org.apache.hadoop.io.Writable; -import org.apache.hama.bsp.message.bundle.BSPMessageBundle; -import org.apache.hama.bsp.message.bundle.POJOMessageBundle; - -/** - * Java object message queue. - * - * @param Message type. - */ -public abstract class POJOMessageQueue implements - BSPMessageInterface, Iterable, MessageQueue { - - @Override - public void add(BSPMessageBundle bundle){ - this.addAll((POJOMessageBundle)bundle); - } - -} Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (working copy) @@ -23,9 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.TaskAttemptID; -import org.apache.hama.bsp.message.bundle.BSPMessageBundle; -import org.apache.hama.bsp.message.bundle.POJOMessageBundle; /** * Heap (Java's priority queue) based message queue implementation that supports @@ -115,7 +114,7 @@ @Override public void add(BSPMessageBundle bundle) { - addAll((POJOMessageBundle) bundle); + addAll(bundle); } @Override Index: core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (revision 1597931) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (working copy) @@ -32,8 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.bsp.TaskAttemptID; -import org.apache.hama.bsp.message.bundle.BSPMessageBundle; -import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle; +import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor; import org.apache.hama.bsp.message.io.PreFetchCache; import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; @@ -346,8 +345,7 @@ @Override public void add(BSPMessageBundle bundle) { try { - this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle) bundle) - .getBuffer()); + this.spillOutputBuffer.write(bundle.getBuffer()); } catch (IOException e) { throw new RuntimeException(e); } Index: core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (revision 1597931) +++ core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (working copy) @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.bsp.message; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.HashSet; - -import junit.framework.TestCase; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hama.bsp.message.bundle.POJOMessageBundle; -import org.apache.hama.bsp.message.bundle.WritableMessageBundle; - -public class TestMessageBundle extends TestCase { - - public void testPOJOWritableMessageBundle() { - - POJOMessageBundle messageBundle = new POJOMessageBundle(); - for (int i = 0; i < 100; ++i) { - messageBundle.addMessage(new IntWritable(i)); - } - assertEquals(100, messageBundle.getSize()); - assertEquals(100, messageBundle.getNumElements()); - - int i = 0; - for (IntWritable writable : messageBundle) { - assertEquals(i++, writable.get()); - } - - } - - public void testDifferentWritableMessageBundle() { - WritableMessageBundle messageBundle = new WritableMessageBundle(); - int numElements = 5; - - HashSet set = new HashSet(); - - for (int i = 0; i < numElements; ++i) { - Writable w = new IntWritable(i); - set.add(w); - messageBundle.addMessage(w); - } - String msg; - for (int i = 0; i < numElements; ++i) { - msg = "" + i; - Writable w = new Text(msg); - set.add(w); - messageBundle.addMessage(w); - } - - assertEquals(2 * numElements, messageBundle.getSize()); - assertEquals(2 * numElements, messageBundle.getNumElements()); - - for (Writable writable : messageBundle) { - set.remove(writable); - } - assertTrue(set.isEmpty()); - - } - - public void testReadWriteWritableMessageBundle() throws IOException { - WritableMessageBundle messageBundle = new WritableMessageBundle(); - int numElements = 5; - - HashSet set = new HashSet(); - - for (int i = 0; i < numElements; ++i) { - Writable w = new IntWritable(i); - set.add(w); - messageBundle.addMessage(w); - } - String msg; - for (int i = 0; i < numElements; ++i) { - msg = "" + i; - Writable w = new Text(msg); - set.add(w); - messageBundle.addMessage(w); - } - - assertEquals(2 * numElements, messageBundle.getSize()); - assertEquals(2 * numElements, messageBundle.getNumElements()); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024); - DataOutput output = new DataOutputStream(outputStream); - messageBundle.write(output); - - ByteArrayInputStream inStream = new ByteArrayInputStream( - outputStream.toByteArray()); - DataInput in = new DataInputStream(inStream); - WritableMessageBundle newBundle = new WritableMessageBundle(); - newBundle.readFields(in); - - for (Writable writable : newBundle) { - set.remove(writable); - } - assertTrue(set.isEmpty()); - - } - -}