Index: core/src/main/java/org/apache/hama/bsp/Combiner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/Combiner.java (revision 1298258) +++ core/src/main/java/org/apache/hama/bsp/Combiner.java (working copy) @@ -27,6 +27,6 @@ * @param messages * @return the combined message */ - public abstract BSPMessageBundle combine(Iterable messages); + public abstract M combine(Iterable messages); } Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1298258) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -311,7 +311,10 @@ Combiner combiner = (Combiner) ReflectionUtils.newInstance( conf.getClass("bsp.combiner.class", Combiner.class), conf); - return combiner.combine(messages); + BSPMessageBundle bundle = new BSPMessageBundle(); + bundle.addMessage(combiner.combine(messages)); + + return bundle; } else { BSPMessageBundle bundle = new BSPMessageBundle(); for (M message : messages) { Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1298258) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.util.KeyValuePair; @@ -44,6 +45,7 @@ private Map vertices = new HashMap(); private String masterTask; private String FLAG_MESSAGE = "hama.graph.msg.counts"; + private Configuration conf; @SuppressWarnings("unchecked") @Override @@ -105,9 +107,25 @@ peer.send(masterTask, updatedCnt); for (Map.Entry> e : msgMap.entrySet()) { - if (e.getValue().size() > 0) { - vertices.get(e.getKey()).compute(e.getValue().iterator()); + LinkedList messages = e.getValue(); + + if (!conf + .getClass("hama.vertex.message.combiner.class", Combiner.class) + .equals(Combiner.class)) { + LOG.debug("vertex message combiner class: " + + conf.get("hama.vertex.message.combiner.class")); + + Combiner combiner = (Combiner) ReflectionUtils + .newInstance(conf.getClass("hama.vertex.message.combiner.class", + Combiner.class), conf); + + Writable combined = combiner.combine(messages); + + messages = new LinkedList(); + messages.add(combined); } + + vertices.get(e.getKey()).compute(messages.iterator()); } iteration++; } @@ -116,7 +134,8 @@ @SuppressWarnings("unchecked") public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { - Configuration conf = peer.getConfiguration(); + this.conf = peer.getConfiguration(); + // Choose one as a master to collect global updates masterTask = peer.getPeerName(0); LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class")); @@ -124,9 +143,8 @@ KeyValuePair next = null; while ((next = peer.readNext()) != null) { Vertex vertex = (Vertex) ReflectionUtils - .newInstance( - peer.getConfiguration().getClass("hama.graph.vertex.class", - Vertex.class), peer.getConfiguration()); + .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class), + conf); vertex.setVertexID(next.getKey().getName()); vertex.peer = peer; Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1298258) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -22,11 +22,14 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.Combiner; public class GraphJob extends BSPJob { public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class"; + public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class"; - public GraphJob(HamaConfiguration conf, Class exampleClass) throws IOException { + public GraphJob(HamaConfiguration conf, Class exampleClass) + throws IOException { super(conf); this.setBspClass(GraphJobRunner.class); this.setJarByClass(exampleClass); @@ -48,4 +51,11 @@ return (Class>) conf.getClass( VERTEX_CLASS_ATTR, Vertex.class); } + + @Override + public void setCombinerClass(Class> cls) { + ensureState(JobState.DEFINE); + conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class); + } + } Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1298258) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -21,8 +21,10 @@ import java.util.Iterator; import java.util.List; -public interface VertexInterface { +import org.apache.hadoop.io.Writable; +public interface VertexInterface { + /** * @return the vertex ID. */ Index: examples/src/main/java/org/apache/hama/examples/CombineExample.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/CombineExample.java (revision 1298258) +++ examples/src/main/java/org/apache/hama/examples/CombineExample.java (working copy) @@ -33,7 +33,6 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.FileOutputFormat; @@ -71,8 +70,7 @@ public static class SumCombiner extends Combiner { @Override - public BSPMessageBundle combine(Iterable messages) { - BSPMessageBundle bundle = new BSPMessageBundle(); + public IntWritable combine(Iterable messages) { int sum = 0; Iterator it = messages.iterator(); @@ -80,8 +78,7 @@ sum += it.next().get(); } - bundle.addMessage(new IntWritable(sum)); - return bundle; + return new IntWritable(sum); } } Index: examples/src/main/java/org/apache/hama/examples/SSSP.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SSSP.java (revision 1298258) +++ examples/src/main/java/org/apache/hama/examples/SSSP.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; @@ -67,6 +68,23 @@ } } + public static class MinIntCombiner extends Combiner { + + @Override + public IntWritable combine(Iterable messages) { + int minDist = Integer.MAX_VALUE; + + Iterator it = messages.iterator(); + while (it.hasNext()) { + int msgValue = it.next().get(); + if (minDist > msgValue) + minDist = msgValue; + } + + return new IntWritable(minDist); + } + } + private static void printUsage() { System.out.println("Usage: [tasks]"); System.exit(-1); @@ -92,6 +110,7 @@ } ssspJob.setVertexClass(ShortestPathVertex.class); + ssspJob.setCombinerClass(MinIntCombiner.class); ssspJob.setInputFormat(SequenceFileInputFormat.class); ssspJob.setInputKeyClass(VertexWritable.class); ssspJob.setInputValueClass(VertexArrayWritable.class);