Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1187186) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.bsp.sync.SyncClient; import org.apache.hama.bsp.sync.SyncServiceFactory; @@ -110,7 +111,8 @@ // consistent peernames. syncClient.enterBarrier(taskId.getJobID(), taskId, -1); syncClient.leaveBarrier(taskId.getJobID(), taskId, -1); - setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0, TaskStatus.State.RUNNING, "running", peerAddress.getHostName(), + setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0, + TaskStatus.State.RUNNING, "running", peerAddress.getHostName(), TaskStatus.Phase.STARTING)); } @@ -129,7 +131,7 @@ syncClient = SyncServiceFactory.getSyncClient(conf); syncClient.init(conf, taskId.getJobID(), taskId); - + } @Override @@ -194,8 +196,17 @@ BSPPeer peer = getBSPPeerConnection(entry.getKey()); Iterable messages = entry.getValue(); BSPMessageBundle bundle = new BSPMessageBundle(); - for (BSPMessage message : messages) { - bundle.addMessage(message); + + Combiner combiner = (Combiner) ReflectionUtils.newInstance( + conf.getClass("bsp.combiner.class", Combiner.class), conf); + + if (combiner != null) { + // In combined messages case, the size of bundle will be always a 1. + bundle = combiner.combine(messages); + } else { + for (BSPMessage message : messages) { + bundle.addMessage(message); + } } // checkpointing Index: core/src/main/java/org/apache/hama/bsp/Combiner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/Combiner.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/Combiner.java (revision 0) @@ -0,0 +1,13 @@ +package org.apache.hama.bsp; + +public interface Combiner { + + /** + * Combines messages + * + * @param messages + * @return the combined message + */ + public BSPMessageBundle combine(Iterable messages); + +}