Index: src/java/org/apache/hama/bsp/BSP.java =================================================================== --- src/java/org/apache/hama/bsp/BSP.java (revision 0) +++ src/java/org/apache/hama/bsp/BSP.java (revision 0) @@ -0,0 +1,38 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; + +public abstract class BSP extends Thread implements BSPInterface { + protected BSPPeer peer; + + /** + * Constructor for abstract class + * + * @param conf + * @throws IOException + */ + public BSP(Configuration conf) throws IOException { + this.peer = new BSPPeer(conf); + } + + public void run() { + bsp(); + } + + public void send(InetSocketAddress hostname, BSPMessage msg) + throws IOException { + peer.send(hostname, msg); + } + + public void sync() throws IOException, KeeperException, InterruptedException { + peer.sync(); + } + + public BSPMessage getCurrentMessage() throws IOException { + return peer.getCurrentMessage(); + } +} Index: src/java/org/apache/hama/bsp/BSPInterface.java =================================================================== --- src/java/org/apache/hama/bsp/BSPInterface.java (revision 949686) +++ src/java/org/apache/hama/bsp/BSPInterface.java (working copy) @@ -2,4 +2,9 @@ public interface BSPInterface { + /** + * + */ + public void bsp(); + } Index: src/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJob.java (revision 949686) +++ src/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -67,10 +67,16 @@ conf.set("bsp.working.dir", dir.toString()); } - public void setWorkClass(Class cls) + /** + * Set the BSP algorithm class for the job. + * + * @param cls + * @throws IllegalStateException + */ + public void setBspClass(Class cls) throws IllegalStateException { ensureState(JobState.DEFINE); - conf.setClass(WORK_CLASS_ATTR, cls, Work.class); + conf.setClass(WORK_CLASS_ATTR, cls, BSP.class); } public void setJar(String jar) { @@ -126,10 +132,6 @@ conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class); } - public void setBSPCode(Class class1) { - // TODO Auto-generated method stub - } - public void setUser(String user) { conf.set("user.name", user); } Index: src/java/org/apache/hama/bsp/Work.java =================================================================== --- src/java/org/apache/hama/bsp/Work.java (revision 949686) +++ src/java/org/apache/hama/bsp/Work.java (working copy) @@ -1,5 +0,0 @@ -package org.apache.hama.bsp; - -public class Work { - -} Index: src/test/org/apache/hama/bsp/BSPTestDriver.java =================================================================== --- src/test/org/apache/hama/bsp/BSPTestDriver.java (revision 949686) +++ src/test/org/apache/hama/bsp/BSPTestDriver.java (working copy) @@ -13,8 +13,8 @@ */ public static void main(String[] args) throws IOException, InterruptedException { BSPJob job = new BSPJob(new HamaConfiguration()); - job.setJarByClass(Work.class); - job.setWorkClass(Work.class); + job.setJarByClass(BSP.class); + job.setBspClass(BSP.class); job.submit(); Thread.sleep(3000); Index: src/test/org/apache/hama/bsp/UserInterface.java =================================================================== --- src/test/org/apache/hama/bsp/UserInterface.java (revision 949686) +++ src/test/org/apache/hama/bsp/UserInterface.java (working copy) @@ -1,105 +1,138 @@ package org.apache.hama.bsp; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; public class UserInterface extends HamaCluster implements Watcher { private HamaConfiguration conf; - private String JOBNAME = "hama.test.bsp"; - private Path INPUTPATH = new Path("/tmp/input"); - private Path OUTPUTPATH = new Path("/tmp/output"); - - public UserInterface() { - this.conf = getConf(); - } + private int NUM_PEER = 10; + List list = new ArrayList(NUM_PEER); - public void testScenario() throws InterruptedException, IOException { - // BSP job configuration - BSPJob bsp = new BSPJob(this.conf); - // Set the job name - bsp.setJobName(JOBNAME); + class PiEstimator extends BSP { + private static final int iterations = 10000; - // Set in/output path and formatter -// bsp.setInputPath(conf, INPUTPATH); -// bsp.setOutputPath(conf, OUTPUTPATH); - bsp.setInputFormat(MyInputFormat.class); - bsp.setOutputFormat(MyOutputFormat.class); + public PiEstimator(Configuration conf) throws IOException { + super(conf); + } - // Set the BSP code - bsp.setBSPCode(MyBSP.class); - bsp.submit(); + public void bsp() { + int in = 0, out = 0; + for (int i = 0; i < iterations; i++) { + double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; + if ((Math.sqrt(x * x + y * y) < 1.0)) { + in++; + } else { + out++; + } + } + double estimate = 4.0 * (double) in / (double) iterations; - //******************* - // assertion checking - assertEquals(bsp.getJobName(), JOBNAME); - //assertEquals(bsp.getInputPath(), INPUTPATH); - //assertEquals(bsp.getOutputPath(), OUTPUTPATH); - } + try { + InetSocketAddress addr = new InetSocketAddress("localhost", 30000); + BSPMessage msg = new BSPMessage(Bytes.toBytes(getName().toString()), + Bytes.toBytes(estimate)); - class MyBSP implements BSPInterface { - // TODO: implement some BSP example - } + send(addr, msg); + sync(); - class MyInputFormat extends InputFormat { + if (getCurrentMessage() != null) { + printPi(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } - @Override - public RecordReader createRecordReader(InputSplit arg0, - TaskAttemptContext arg1) throws IOException, InterruptedException { - // TODO Auto-generated method stub - return null; + private void printPi() throws IOException { + BSPMessage msg2; + double pi = 0.0; + while ((msg2 = getCurrentMessage()) != null) { + pi = (pi + Bytes.toDouble(msg2.getData())) / 2; + } + + System.out.println(pi); } + } - @Override - public List getSplits(JobContext arg0) throws IOException, - InterruptedException { - // TODO Auto-generated method stub - return null; + /** + * TODO: Should be deleted + */ + public void setUp() throws Exception { + super.setUp(); + this.conf = getConf(); + ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this); + Stat s = null; + if (zk != null) { + try { + s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); + } catch (Exception e) { + LOG.error(s); + } + + if (s == null) { + try { + zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.error(e); + } + } } - // TODO: implement some input Formatter } - class MyOutputFormat extends OutputFormat { + public void testBSPMain() throws InterruptedException, IOException { + PiEstimator thread; - @Override - public void checkOutputSpecs(JobContext arg0) throws IOException, - InterruptedException { - // TODO Auto-generated method stub - + for (int i = 0; i < NUM_PEER; i++) { + conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + thread = new PiEstimator(conf); + list.add(thread); } - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) - throws IOException, InterruptedException { - // TODO Auto-generated method stub - return null; + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).start(); } - @Override - public RecordWriter getRecordWriter(TaskAttemptContext arg0) - throws IOException, InterruptedException { - // TODO Auto-generated method stub - return null; + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).join(); } - // TODO: implement some input Formatter + + // BSP job configuration + BSPJob bsp = new BSPJob(this.conf); + // Set the job name + bsp.setJobName("bsp test job"); + // Set in/output path and formatter + // bsp.setInputPath(conf, INPUTPATH); + // bsp.setOutputPath(conf, OUTPUTPATH); + // bsp.setInputFormat(MyInputFormat.class); + // bsp.setOutputFormat(MyOutputFormat.class); // Set the BSP code + bsp.setBspClass(PiEstimator.class); + bsp.submit(); } @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub - + } }