Index: yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (revision 1672226) +++ yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (working copy) @@ -424,7 +424,6 @@ return files; } - private void addToLocalResources(FileSystem fs, String fileSrcPath, String fileDstPath, String fileName, Map localResources) throws IOException { Index: yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java =================================================================== --- yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (revision 1672226) +++ yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (working copy) @@ -21,33 +21,42 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; -import org.apache.hama.Constants; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.sync.SyncException; public class YarnSerializePrinting { + public static Path OUTPUT_PATH = new Path("/tmp/serialout"); + public static class HelloBSP extends - BSP { + BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); - private final static int PRINT_INTERVAL = 1000; private int num; @Override public void bsp( - BSPPeer bspPeer) + BSPPeer bspPeer) throws IOException, SyncException, InterruptedException { num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1); - LOG.info(bspPeer.getAllPeerNames()); + IntWritable peerNum = new IntWritable(); + Text txt = new Text(); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { String peerName = bspPeer.getPeerName(); if (peerName.equals(otherPeer)) { - LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName); + peerNum.set(i); + txt.set("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName); + bspPeer.write(null, txt); } - Thread.sleep(PRINT_INTERVAL); bspPeer.sync(); i++; } @@ -54,6 +63,20 @@ } } + static void printOutput(HamaConfiguration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] files = fs.listStatus(OUTPUT_PATH); + for (FileStatus file : files) { + if (file.getLen() > 0) { + FSDataInputStream in = fs.open(file.getPath()); + IOUtils.copyBytes(in, System.out, conf, false); + in.close(); + } + } + + //fs.delete(OUTPUT_PATH, true); + } + public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { HamaConfiguration conf = new HamaConfiguration(); @@ -63,9 +86,17 @@ job.setJarByClass(HelloBSP.class); job.setJobName("Serialize Printing"); job.setInputFormat(NullInputFormat.class); - job.setOutputFormat(NullOutputFormat.class); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(Text.class); + job.setOutputPath(OUTPUT_PATH); job.setMemoryUsedPerTaskInMb(100); job.setNumBspTask(4); + + long startTime = System.currentTimeMillis(); job.waitForCompletion(true); + printOutput(conf); + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } }