Index: core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (revision 1201689) +++ core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (working copy) @@ -55,9 +55,11 @@ } public void testSubmitJob() throws Exception { - BSPJob bsp = new BSPJob(configuration, testjar.ClassSerializePrinting.HelloBSP.class); + BSPJob bsp = new BSPJob(configuration, + org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class); bsp.setJobName("Test Serialize Printing"); - bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class); + bsp + .setBspClass(org.apache.hama.examples.ClassSerializePrinting.HelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(configuration); @@ -65,7 +67,7 @@ ClusterStatus cluster = jobClient.getClusterStatus(false); assertEquals(this.numOfGroom, cluster.getGroomServers()); bsp.setNumBspTask(2); - + FileSystem fileSys = FileSystem.get(conf); if (bsp.waitForCompletion(true)) { @@ -78,7 +80,7 @@ int tasks) throws Exception { for (int i = 0; i < tasks; i++) { SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( - TMP_OUTPUT + "part-0000" +i), conf); + TMP_OUTPUT + "part-0000" + i), conf); LongWritable timestamp = new LongWritable(); Text message = new Text(); reader.next(timestamp, message); Index: core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java =================================================================== --- core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (revision 0) +++ core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java (working copy) @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package testjar; +package org.apache.hama.examples; import java.io.IOException; Index: core/src/test/java/org/apache/hama/examples/CombineExample.java =================================================================== --- core/src/test/java/org/apache/hama/examples/CombineExample.java (revision 0) +++ core/src/test/java/org/apache/hama/examples/CombineExample.java (working copy) @@ -82,7 +82,7 @@ // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); - BSPJob bsp = new BSPJob(conf, PiEstimator.class); + BSPJob bsp = new BSPJob(conf, CombineExample.class); // Set the job name bsp.setJobName("Combine Example"); bsp.setBspClass(MyBSP.class); Index: core/src/test/java/testjar/ClassSerializePrinting.java =================================================================== --- core/src/test/java/testjar/ClassSerializePrinting.java (revision 1201689) +++ core/src/test/java/testjar/ClassSerializePrinting.java (working copy) @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package testjar; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPPeer; -import org.apache.zookeeper.KeeperException; - -public class ClassSerializePrinting { - private static String TMP_OUTPUT = "/tmp/test-example/"; - - public static class HelloBSP extends - BSP { - public static final Log LOG = LogFactory.getLog(HelloBSP.class); - private Configuration conf; - private final static int PRINT_INTERVAL = 1000; - private FileSystem fileSys; - private int num; - - public void bsp( - BSPPeer bspPeer) - throws IOException, KeeperException, InterruptedException { - - int i = 0; - for (String otherPeer : bspPeer.getAllPeerNames()) { - String peerName = bspPeer.getPeerName(); - if (peerName.equals(otherPeer)) { - writeLogToFile(peerName, i); - } - - Thread.sleep(PRINT_INTERVAL); - bspPeer.sync(); - i++; - } - } - - private void writeLogToFile(String string, int i) throws IOException { - SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, - new Path(TMP_OUTPUT, "part-0000" + i), LongWritable.class, - Text.class, CompressionType.NONE); - writer.append(new LongWritable(System.currentTimeMillis()), new Text( - "Hello BSP from " + (i + 1) + " of " + num + ": " + string)); - writer.close(); - } - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - num = Integer.parseInt(conf.get("bsp.peers.num")); - try { - fileSys = FileSystem.get(conf); - } catch (IOException e) { - e.printStackTrace(); - } - } - } -} Index: examples/src/main/java/org/apache/hama/examples/CombineExample.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/CombineExample.java (revision 1201689) +++ examples/src/main/java/org/apache/hama/examples/CombineExample.java (working copy) @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.NullWritable; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPMessage; -import org.apache.hama.bsp.BSPMessageBundle; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.Combiner; -import org.apache.hama.bsp.IntegerMessage; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.NullOutputFormat; -import org.apache.zookeeper.KeeperException; - -public class CombineExample { - - public static class MyBSP extends - BSP { - public static final Log LOG = LogFactory.getLog(MyBSP.class); - - @Override - public void bsp(BSPPeer peer) throws IOException, - KeeperException, InterruptedException { - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new IntegerMessage(peer.getPeerName(), 1)); - peer.send(peerName, new IntegerMessage(peer.getPeerName(), 2)); - peer.send(peerName, new IntegerMessage(peer.getPeerName(), 3)); - } - peer.sync(); - - IntegerMessage received; - while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) { - LOG.info(received.getTag() + ": " + received.getData()); - } - } - - } - - public static class SumCombiner extends Combiner { - - @Override - public BSPMessageBundle combine(Iterable messages) { - BSPMessageBundle bundle = new BSPMessageBundle(); - int sum = 0; - - Iterator it = messages.iterator(); - while (it.hasNext()) { - sum += ((IntegerMessage) it.next()).getData(); - } - - bundle.addMessage(new IntegerMessage("Sum", sum)); - return bundle; - } - - } - - public static void main(String[] args) throws InterruptedException, - IOException, ClassNotFoundException { - // BSP job configuration - HamaConfiguration conf = new HamaConfiguration(); - - BSPJob bsp = new BSPJob(conf, PiEstimator.class); - // Set the job name - bsp.setJobName("Combine Example"); - bsp.setBspClass(MyBSP.class); - bsp.setCombinerClass(SumCombiner.class); - bsp.setInputFormat(NullInputFormat.class); - bsp.setOutputFormat(NullOutputFormat.class); - bsp.setNumBspTask(2); - - bsp.waitForCompletion(true); - } -} Index: examples/src/main/java/org/apache/hama/examples/ExampleDriver.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (revision 1201689) +++ examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (working copy) @@ -27,9 +27,6 @@ ProgramDriver pgd = new ProgramDriver(); try { pgd.addClass("pi", PiEstimator.class, "Pi Estimator"); - pgd.addClass("bench", RandBench.class, "Random Communication Benchmark"); - pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test"); - pgd.addClass("combine", CombineExample.class, "Combiner Example"); pgd.driver(args); } catch (Throwable e) { Index: examples/src/main/java/org/apache/hama/examples/RandBench.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/RandBench.java (revision 1201689) +++ examples/src/main/java/org/apache/hama/examples/RandBench.java (working copy) @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples; - -import java.io.IOException; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.NullWritable; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPMessage; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.ByteMessage; -import org.apache.hama.bsp.ClusterStatus; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.NullOutputFormat; -import org.apache.hama.util.Bytes; -import org.apache.zookeeper.KeeperException; - -public class RandBench { - private static final String SIZEOFMSG = "msg.size"; - private static final String N_COMMUNICATIONS = "communications.num"; - private static final String N_SUPERSTEPS = "supersteps.num"; - - public static class RandBSP extends - BSP { - public static final Log LOG = LogFactory.getLog(RandBSP.class); - private Random r = new Random(); - private int sizeOfMsg; - private int nCommunications; - private int nSupersteps; - - @Override - public void bsp( - BSPPeer peer) - throws IOException, KeeperException, InterruptedException { - byte[] dummyData = new byte[sizeOfMsg]; - BSPMessage msg = null; - String[] peers = peer.getAllPeerNames(); - String peerName = peer.getPeerName(); - - for (int i = 0; i < nSupersteps; i++) { - - for (int j = 0; j < nCommunications; j++) { - String tPeer = peers[r.nextInt(peers.length)]; - String tag = peerName + " to " + tPeer; - msg = new ByteMessage(Bytes.toBytes(tag), dummyData); - peer.send(tPeer, msg); - } - - peer.sync(); - - ByteMessage received; - while ((received = (ByteMessage) peer.getCurrentMessage()) != null) { - LOG.info(Bytes.toString(received.getTag()) + " : " - + received.getData().length); - } - - } - } - - @Override - public void setup( - BSPPeer peer) { - this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1); - this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1); - this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1); - } - } - - public static void main(String[] args) throws Exception { - if (args.length < 3) { - System.out.println("Usage: "); - System.exit(-1); - } - - // BSP job configuration - HamaConfiguration conf = new HamaConfiguration(); - - conf.setInt(SIZEOFMSG, Integer.parseInt(args[0])); - conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1])); - conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2])); - - BSPJob bsp = new BSPJob(conf, RandBench.class); - // Set the job name - bsp.setJobName("Random Communication Benchmark"); - bsp.setBspClass(RandBSP.class); - bsp.setInputFormat(NullInputFormat.class); - bsp.setOutputFormat(NullOutputFormat.class); - - // Set the task size as a number of GroomServer - BSPJobClient jobClient = new BSPJobClient(conf); - ClusterStatus cluster = jobClient.getClusterStatus(false); - bsp.setNumBspTask(cluster.getMaxTasks()); - - long startTime = System.currentTimeMillis(); - bsp.waitForCompletion(true); - System.out.println("Job Finished in " - + (double) (System.currentTimeMillis() - startTime) / 1000.0 - + " seconds"); - } -} Index: examples/src/main/java/org/apache/hama/examples/SerializePrinting.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (revision 1201689) +++ examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (working copy) @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples; - -import java.io.IOException; -import java.util.Date; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPJobClient; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.ClusterStatus; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.NullOutputFormat; -import org.apache.zookeeper.KeeperException; - -public class SerializePrinting { - private static String TMP_OUTPUT = "/tmp/serialize-example/"; - - public static class HelloBSP extends - BSP { - public static final Log LOG = LogFactory.getLog(HelloBSP.class); - private final static int PRINT_INTERVAL = 1000; - private FileSystem fileSys; - private int num; - - @Override - public void bsp( - BSPPeer peer) - throws IOException, KeeperException, InterruptedException { - - LOG.debug(peer.getAllPeerNames()); - int i = 0; - for (String otherPeer : peer.getAllPeerNames()) { - String peerName = peer.getPeerName(); - if (peerName.equals(otherPeer)) { - writeLogToFile(peerName, i); - } - - Thread.sleep(PRINT_INTERVAL); - peer.sync(); - i++; - } - } - - private void writeLogToFile(String string, int i) throws IOException { - SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, - new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, - CompressionType.NONE); - writer.append(new LongWritable(System.currentTimeMillis()), new Text( - "Hello BSP from " + (i + 1) + " of " + num + ": " + string)); - writer.close(); - } - - @Override - public void setup( - BSPPeer peer) { - num = Integer.parseInt(conf.get("bsp.peers.num")); - try { - fileSys = FileSystem.get(conf); - } catch (IOException e) { - throw new Error("Filesystem could not be initialized! ", e); - } - } - } - - private static void printOutput(FileSystem fileSys, ClusterStatus cluster, - HamaConfiguration conf) throws IOException { - System.out.println("Each task printed the \"Hello World\" as below:"); - for (int i = 0; i < cluster.getGroomServers(); i++) { - SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( - TMP_OUTPUT + i), conf); - LongWritable timestamp = new LongWritable(); - Text message = new Text(); - reader.next(timestamp, message); - System.out.println(new Date(timestamp.get()) + ": " + message); - reader.close(); - } - } - - private static void initTempDir(FileSystem fileSys) throws IOException { - if (fileSys.exists(new Path(TMP_OUTPUT))) { - fileSys.delete(new Path(TMP_OUTPUT), true); - } - } - - public static void main(String[] args) throws InterruptedException, - IOException, ClassNotFoundException { - // BSP job configuration - HamaConfiguration conf = new HamaConfiguration(); - - BSPJob bsp = new BSPJob(conf, SerializePrinting.class); - // Set the job name - bsp.setJobName("Serialize Printing"); - bsp.setBspClass(HelloBSP.class); - bsp.setInputFormat(NullInputFormat.class); - bsp.setOutputFormat(NullOutputFormat.class); - - // Set the task size as a number of GroomServer - BSPJobClient jobClient = new BSPJobClient(conf); - ClusterStatus cluster = jobClient.getClusterStatus(false); - bsp.setNumBspTask(cluster.getGroomServers()); - - FileSystem fileSys = FileSystem.get(conf); - initTempDir(fileSys); - - if (bsp.waitForCompletion(true)) { - printOutput(fileSys, cluster, conf); - } - } - -} Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (revision 1201689) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (working copy) @@ -201,7 +201,7 @@ if (adjacencyListPath == null) adjacencyList = ShortestPathsGraphLoader.loadGraph(); - BSPJob bsp = new BSPJob(conf, RandBench.class); + BSPJob bsp = new BSPJob(conf, ShortestPaths.class); // Set the job name bsp.setJobName("Single Source Shortest Path"); bsp.setBspClass(ShortestPaths.class);