Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1671493) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -21,33 +21,39 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hama.Constants; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.sync.SyncException; public class YarnSerializePrinting { + public static Path OUTPUT_PATH = new Path("/tmp/serialout"); + public static class HelloBSP extends - BSP { + BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); - private final static int PRINT_INTERVAL = 1000; + //private final static int PRINT_INTERVAL = 1000; private int num; @Override public void bsp( - BSPPeer bspPeer) + BSPPeer bspPeer) throws IOException, SyncException, InterruptedException { num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1); - LOG.info(bspPeer.getAllPeerNames()); + IntWritable peerNum = new IntWritable(); + Text txt = new Text(); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { String peerName = bspPeer.getPeerName(); if (peerName.equals(otherPeer)) { - LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName); + peerNum.set(i); + txt.set("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName); + bspPeer.write(peerNum, txt); } - Thread.sleep(PRINT_INTERVAL); bspPeer.sync(); i++; } @@ -63,7 +69,10 @@ job.setJarByClass(HelloBSP.class); job.setJobName("Serialize Printing"); job.setInputFormat(NullInputFormat.class); - job.setOutputFormat(NullOutputFormat.class); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(Text.class); + job.setOutputPath(OUTPUT_PATH); job.setMemoryUsedPerTaskInMb(100); job.setNumBspTask(4); job.waitForCompletion(true);