Index: core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (revision 1701164) +++ core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (working copy) @@ -390,15 +390,21 @@ Path path = new Path(checkpointPath(superstepProgress)); FSDataInputStream in = this.fs.open(path); BSPMessageBundle bundle = new BSPMessageBundle(); + String className = in.readUTF(); + try { - for (int i = 0; i < numMessages; ++i) { - String className = in.readUTF(); - @SuppressWarnings("unchecked") - M message = (M) ReflectionUtils.newInstance( - Class.forName(className), conf); - message.readFields(in); - bundle.addMessage(message); + if (className.equals(BSPMessageBundle.class.getCanonicalName())) { + bundle.readFields(in); + } else { + for (int i = 0; i < numMessages; ++i) { + @SuppressWarnings("unchecked") + M message = (M) ReflectionUtils.newInstance( + Class.forName(className), conf); + message.readFields(in); + bundle.addMessage(message); + } } + messenger.loopBackBundle(bundle); } catch (EOFException e) { LOG.error("Error recovering from checkpointing", e); @@ -507,6 +513,8 @@ LOG.debug("Creating path " + checkpointedPath); } checkpointStream = this.fs.create(new Path(checkpointedPath)); + // message type is always the same + checkpointStream.writeUTF(message.getClass().getCanonicalName()); } catch (IOException ioe) { LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe); @@ -516,7 +524,6 @@ } try { ++checkpointMessageCount; - checkpointStream.writeUTF(message.getClass().getCanonicalName()); message.write(checkpointStream); } catch (IOException ioe) { LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe); @@ -537,6 +544,48 @@ } + @Override + public void onBundleReceived(BSPMessageBundle bundle) { + String checkpointedPath = null; + + if (bundle == null) { + LOG.error("bundle is found to be null"); + } + + synchronized (this) { + if (checkpointState) { + if (this.checkpointStream == null) { + checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1); + try { + LOG.info("Creating path " + checkpointedPath); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating path " + checkpointedPath); + } + checkpointStream = this.fs.create(new Path(checkpointedPath)); + checkpointStream.writeUTF(bundle.getClass().getCanonicalName()); + } catch (IOException ioe) { + LOG.error("Fail checkpointing messages to " + checkpointedPath, + ioe); + throw new RuntimeException("Failed opening HDFS file " + + checkpointedPath, ioe); + } + } + try { + ++checkpointMessageCount; + bundle.write(checkpointStream); + } catch (IOException ioe) { + LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe); + throw new RuntimeException("Failed writing to HDFS file " + + checkpointedPath, ioe); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("message count = " + checkpointMessageCount); + } + } + } + } + } } Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1701164) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -129,7 +129,7 @@ public final int getNumCurrentMessages() { return localQueue.size(); } - + public void clearIncomingMessages() { localQueue.clear(); } @@ -145,13 +145,13 @@ if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false) && localQueue.size() > 0) { - // To reduce the number of element additions - if (localQueue.size() > localQueueForNextIteration.size()) { - localQueue.addAll(localQueueForNextIteration); - } else { - localQueueForNextIteration.addAll(localQueue); - localQueue = localQueueForNextIteration.getMessageQueue(); - } + // To reduce the number of element additions + if (localQueue.size() > localQueueForNextIteration.size()) { + localQueue.addAll(localQueueForNextIteration); + } else { + localQueueForNextIteration.addAll(localQueue); + localQueue = localQueueForNextIteration.getMessageQueue(); + } } else { if (localQueue != null) { @@ -178,7 +178,7 @@ notifySentMessage(peerName, msg); } - + /* * (non-Javadoc) * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator() @@ -239,6 +239,13 @@ } } + private void notifyReceivedMessage(BSPMessageBundle bundle) + throws IOException { + for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { + aMessageListenerQueue.onBundleReceived(bundle); + } + } + private void notifyInit() { for (MessageEventListener aMessageListenerQueue : this.messageListenerQueue) { aMessageListenerQueue.onInitialized(); @@ -261,11 +268,10 @@ @Override public void loopBackBundle(BSPMessageBundle bundle) throws IOException { - peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bundle.size()); + peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, + bundle.size()); this.localQueueForNextIteration.addBundle(bundle); - - // TODO checkpoint bundle itself instead of unpacked messages. -- edwardyoon - // notifyReceivedMessage(bundle); + notifyReceivedMessage(bundle); } @SuppressWarnings("unchecked") Index: core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java (revision 1701164) +++ core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java (working copy) @@ -17,8 +17,11 @@ */ package org.apache.hama.bsp.message; -public interface MessageEventListener { +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPMessageBundle; +public interface MessageEventListener { + /** * * @@ -48,6 +51,8 @@ * @param message The message received. */ void onMessageReceived(final M message); + + void onBundleReceived(final BSPMessageBundle bundle); /** * The function to handle the event when the queue is closed. Index: core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (revision 1701164) +++ core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (working copy) @@ -38,6 +38,8 @@ public static final Log LOG = LogFactory.getLog(TestPartitioning.class); public void testMemoryQueue() throws Exception { + BSPMessageBundle x = new BSPMessageBundle(); + System.out.println(x.getClass().getCanonicalName() + ", " + BSPMessageBundle.class.getCanonicalName()); BSPJob bsp = getNewJobConf(); bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, "org.apache.hama.bsp.message.queue.MemoryQueue"); Index: examples/src/main/java/org/apache/hama/examples/RandBench.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/RandBench.java (revision 1701164) +++ examples/src/main/java/org/apache/hama/examples/RandBench.java (working copy) @@ -55,7 +55,7 @@ byte[] dummyData = new byte[sizeOfMsg]; String[] peers = peer.getAllPeerNames(); - for (int i = 0; i < nSupersteps; i++) { + for (int i = (int) peer.getSuperstepCount(); i < nSupersteps; i++) { for (int j = 0; j < nCommunications; j++) { String tPeer = peers[r.nextInt(peers.length)];