Index: src/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJob.java (리비전 957790) +++ src/java/org/apache/hama/bsp/BSPJob.java (작업 사본) @@ -192,4 +192,8 @@ public void set(String name, String value) { conf.set(name, value); } + + public void setNumBspTask(int tasks) { + conf.setInt("bsp.peers.num", tasks); + } } Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 957790) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -172,9 +172,6 @@ final static FsPermission JOB_DIR_PERMISSION = FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx - public BSPJobClient() { - } - public BSPJobClient(Configuration conf) throws IOException { setConf(conf); init(conf); @@ -182,8 +179,12 @@ public void init(Configuration conf) throws IOException { // it will be used to determine if the bspmaster is running on local or not. - //String tracker = conf.get("bsp.master.address", "local"); - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + String master = conf.get("bsp.master.address", "local"); + if ("local".equals(master)) { + this.jobSubmitClient = new LocalJobRunner(conf); + } else { + this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + } } private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, @@ -305,7 +306,7 @@ public RunningJob submitJobInternal(BSPJob job) throws IOException { - BSPJobID jobId = jobSubmitClient.getNewJobId(); + BSPJobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitJobFile = new Path(submitJobDir, "job.xml"); @@ -416,9 +417,10 @@ return sysDir; } - public RunningJob runJob(BSPJob job) throws FileNotFoundException, + public static RunningJob runJob(BSPJob job) throws FileNotFoundException, IOException { - return submitJobInternal(job); + BSPJobClient jc = new BSPJobClient(job.getConf()); + return jc.submitJobInternal(job); } /** Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (리비전 957790) +++ src/java/org/apache/hama/bsp/BSPPeer.java (작업 사본) @@ -167,7 +167,7 @@ while (true) { synchronized (mutex) { List list = zk.getChildren(bspRoot, true); - if (list.size() < Integer.valueOf(conf.get("bsp.peers.num"))) { + if (list.size() < conf.getInt("bsp.peers.num", 0)) { mutex.wait(); } else { return true; Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 0) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 0) @@ -0,0 +1,163 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.Constants; +import org.apache.hama.bsp.UserInterface.PiEstimator; +import org.apache.hama.ipc.InterTrackerProtocol; +import org.apache.hama.ipc.JobSubmissionProtocol; + +public class LocalJobRunner implements JobSubmissionProtocol { + private static final Log LOG = LogFactory.getLog(BSPJobClient.class); + private FileSystem fs; + private Configuration conf; + private int nextJobId = 1; + private HashMap jobs = new HashMap(); + + public LocalJobRunner(Configuration conf) throws IOException { + // TODO Auto-generated constructor stub + this.fs = FileSystem.get(conf); + this.conf = conf; + } + + @Override + public JobStatus[] getAllJobs() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ClusterStatus getClusterStatus(boolean detailed) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getFilesystemName() throws IOException { + // TODO Auto-generated method stub + return fs.getUri().toString(); + } + + @Override + public JobProfile getJobProfile(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobStatus getJobStatus(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public BSPJobID getNewJobId() throws IOException { + return new BSPJobID("local", nextJobId++); + } + + @Override + public String getSystemDir() { + // TODO Auto-generated method stub + Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system")); + return fs.makeQualified(sysDir).toString(); + } + + @Override + public void killJob(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public JobStatus submitJob(BSPJobID jobID) throws IOException { + return new Job(jobID, this.conf).status; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(InterTrackerProtocol.class.getName())) { + return InterTrackerProtocol.versionID; + } else if (protocol.equals(JobSubmissionProtocol.class.getName())) { + return JobSubmissionProtocol.versionID; + } else { + throw new IOException("Unknown protocol to job tracker: " + protocol); + } + } + + /** + * Local Job + */ + private class Job extends Thread { + private JobStatus status = new JobStatus(); + private String id; + private Configuration conf; + private int tasks; + List list; + + public Job(BSPJobID jobID, Configuration conf) throws IOException { + this.conf = conf; + tasks = conf.getInt("bsp.peers.num", 0); + list = new ArrayList(tasks); + jobs.put(id, this); + this.start(); + } + + public void run() { + // TODO Auto-generated method stub + /* + PiEstimator thread; + for (int i = 0; i < NUM_PEER; i++) { + conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); + conf.set(Constants.PEER_HOST, "localhost"); + conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); + conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); + thread = new PiEstimator(conf); + list.add(thread); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).start(); + } + + for (int i = 0; i < NUM_PEER; i++) { + list.get(i).join(); + } + */ + + try { + System.out.println("Creates " + tasks + " tasks."); + + //PiEstimator thread; + for (int i = 0; i < tasks; i++) { + System.out.println("// TODO: Runs a BSP task - " + i); + } + + for (int i = 0; i < tasks; i++) { + list.get(i).start(); + } + + for (int i = 0; i < tasks; i++) { + list.get(i).join(); + } + } catch (Exception e) { + + } + } + } +} Index: src/test/org/apache/hama/bsp/BSPPeerTest.java =================================================================== --- src/test/org/apache/hama/bsp/BSPPeerTest.java (리비전 957790) +++ src/test/org/apache/hama/bsp/BSPPeerTest.java (작업 사본) @@ -155,7 +155,7 @@ BSPPeerThread thread; for (int i = 0; i < NUM_PEER; i++) { - conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); + conf.setInt("bsp.peers.num", NUM_PEER); conf.set(Constants.PEER_HOST, "localhost"); conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); Index: src/test/org/apache/hama/bsp/SerializePrinting.java =================================================================== --- src/test/org/apache/hama/bsp/SerializePrinting.java (리비전 957790) +++ src/test/org/apache/hama/bsp/SerializePrinting.java (작업 사본) @@ -60,7 +60,7 @@ BSPPeerThread thread; int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 }; for (int i = 0; i < NUM_PEER; i++) { - conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); + conf.setInt("bsp.peers.num", NUM_PEER); conf.set(Constants.PEER_HOST, "localhost"); conf.set(Constants.PEER_PORT, String .valueOf(30000 + randomSequence[i])); Index: src/test/org/apache/hama/bsp/UserInterface.java =================================================================== --- src/test/org/apache/hama/bsp/UserInterface.java (리비전 957790) +++ src/test/org/apache/hama/bsp/UserInterface.java (작업 사본) @@ -103,32 +103,14 @@ } public void testBSPMain() throws InterruptedException, IOException { - PiEstimator thread; - for (int i = 0; i < NUM_PEER; i++) { - conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); - conf.set(Constants.PEER_HOST, "localhost"); - conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); - conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810"); - thread = new PiEstimator(conf); - list.add(thread); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).start(); - } - - for (int i = 0; i < NUM_PEER; i++) { - list.get(i).join(); - } - - /* // BSP job configuration BSPJob bsp = new BSPJob(this.conf); // Set the job name + conf.set("bsp.master.address", "local"); bsp.setJobName("bsp test job"); bsp.setBspClass(PiEstimator.class); - bsp.submit(); - */ + bsp.setNumBspTask(5); + BSPJobClient.runJob(bsp); } @Override