Index: src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/main/java/org/apache/hama/bsp/BSPPeer.java (revision 1442827) +++ src/main/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -56,7 +56,7 @@ public int getNumCurrentMessages(); /** - * Barrier Synchronization. + * Standard Barrier Synchronization. * * Sends all the messages in the outgoing message queues to the corresponding * remote peers. @@ -68,6 +68,16 @@ public void sync() throws IOException, SyncException, InterruptedException; /** + * if true, synchronizes without clearing the unconsumed messages in the queues. + * + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + public void sync(boolean append) throws IOException, SyncException, + InterruptedException; + + /** * @return the count of current super-step */ public long getSuperstepCount(); Index: src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1442827) +++ src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -58,7 +58,10 @@ private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class); public static enum PeerCounter { - COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS + COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, + TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, + MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, + COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS } private final Configuration conf; @@ -392,6 +395,19 @@ @Override public final void sync() throws IOException, SyncException, InterruptedException { + sync(false); + } + + /** + * if true, synchronizes without clearing the unconsumed messages in the + * queues. + * + * @throws IOException + * @throws SyncException + * @throws InterruptedException + */ + public final void sync(boolean append) throws IOException, SyncException, + InterruptedException { // normally all messages should been send now, finalizing the send phase messenger.finishSendPhase(); @@ -432,8 +448,11 @@ } } - // Clear outgoing queues. - messenger.clearOutgoingQueues(); + if (append) { + messenger.appendOutgoingQueues(); + } else { + messenger.clearOutgoingQueues(); + } leaveBarrier(); Index: src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1442827) +++ src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -161,6 +161,12 @@ localQueueForNextIteration = getSynchronizedQueue(); notifyInit(); } + + public final void appendOutgoingQueues() { + localQueue.addAll(localQueueForNextIteration.getMessageQueue()); + localQueueForNextIteration = getSynchronizedQueue(); + notifyInit(); + } /* * (non-Javadoc) Index: src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1442827) +++ src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -93,6 +93,11 @@ public void clearOutgoingQueues(); /** + * Appends the outgoing queue. + */ + public void appendOutgoingQueues(); + + /** * Gets the number of messages in the current queue. * */ Index: src/test/java/org/apache/hama/bsp/TestAppendSync.java =================================================================== --- src/test/java/org/apache/hama/bsp/TestAppendSync.java (revision 0) +++ src/test/java/org/apache/hama/bsp/TestAppendSync.java (working copy) @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.sync.SyncException; + +public class TestAppendSync extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestPartitioning.class); + + public void testPartitioner() throws Exception { + + Configuration conf = new Configuration(); + BSPJob bsp = new BSPJob(new HamaConfiguration(conf)); + bsp.setJobName("Test append sync"); + bsp.setBspClass(AppendBSP.class); + bsp.setNumBspTask(2); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputFormat(NullOutputFormat.class); + assertTrue(bsp.waitForCompletion(true)); + } + + public static class AppendBSP extends + BSP { + + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + + for (int i = 0; i < 10; i++) { + peer.send(peer.getPeerName(0), new IntWritable(i)); + peer.send(peer.getPeerName(1), new IntWritable(i)); + peer.sync(true); + } + + int cnt = 0; + while ((peer.getCurrentMessage()) != null) { + cnt++; + } + assertTrue(cnt == 20); + } + } + +} Index: src/test/java/org/apache/hama/bsp/TestCheckpoint.java =================================================================== --- src/test/java/org/apache/hama/bsp/TestCheckpoint.java (revision 1442827) +++ src/test/java/org/apache/hama/bsp/TestCheckpoint.java (working copy) @@ -150,6 +150,11 @@ this.listener = listener; } + @Override + public void appendOutgoingQueues() { + // TODO Auto-generated method stub + } + } public static class TestBSPPeer implements @@ -300,6 +305,13 @@ return null; } + @Override + public void sync(boolean append) throws IOException, SyncException, + InterruptedException { + // TODO Auto-generated method stub + + } + } public static class TempSyncClient extends BSPPeerSyncClient { @@ -442,8 +454,8 @@ } private static void checkSuperstepMsgCount(PeerSyncClient syncClient, - @SuppressWarnings("rawtypes") - BSPPeer bspTask, BSPJob job, long step, long count) { + @SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step, + long count) { ArrayWritable writableVal = new ArrayWritable(LongWritable.class);