Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 957790) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -172,9 +172,6 @@ final static FsPermission JOB_DIR_PERMISSION = FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx - public BSPJobClient() { - } - public BSPJobClient(Configuration conf) throws IOException { setConf(conf); init(conf); @@ -182,8 +179,12 @@ public void init(Configuration conf) throws IOException { // it will be used to determine if the bspmaster is running on local or not. - //String tracker = conf.get("bsp.master.address", "local"); - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + String master = conf.get("bsp.master.address", "local"); + if ("local".equals(master)) { + this.jobSubmitClient = new LocalJobRunner(conf); + } else { + this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + } } private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, @@ -305,7 +306,7 @@ public RunningJob submitJobInternal(BSPJob job) throws IOException { - BSPJobID jobId = jobSubmitClient.getNewJobId(); + BSPJobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitJobFile = new Path(submitJobDir, "job.xml"); @@ -416,9 +417,10 @@ return sysDir; } - public RunningJob runJob(BSPJob job) throws FileNotFoundException, + public static RunningJob runJob(BSPJob job) throws FileNotFoundException, IOException { - return submitJobInternal(job); + BSPJobClient jc = new BSPJobClient(job.getConf()); + return jc.submitJobInternal(job); } /** Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 0) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 0) @@ -0,0 +1,122 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.ipc.InterTrackerProtocol; +import org.apache.hama.ipc.JobSubmissionProtocol; + +public class LocalJobRunner implements JobSubmissionProtocol { + private static final Log LOG = LogFactory.getLog(BSPJobClient.class); + private FileSystem fs; + private Configuration conf; + private int nextJobId = 1; + private HashMap jobs = new HashMap(); + + public LocalJobRunner(Configuration conf) throws IOException { + // TODO Auto-generated constructor stub + this.fs = FileSystem.get(conf); + this.conf = conf; + } + + @Override + public JobStatus[] getAllJobs() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ClusterStatus getClusterStatus(boolean detailed) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getFilesystemName() throws IOException { + // TODO Auto-generated method stub + return fs.getUri().toString(); + } + + @Override + public JobProfile getJobProfile(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobStatus getJobStatus(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public BSPJobID getNewJobId() throws IOException { + return new BSPJobID("local", nextJobId++); + } + + @Override + public String getSystemDir() { + // TODO Auto-generated method stub + Path sysDir = new Path(conf.get("bsp.system.dir", "/tmp/hadoop/bsp/system")); + return fs.makeQualified(sysDir).toString(); + } + + @Override + public void killJob(BSPJobID jobid) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean killTask(TaskAttemptID taskId, boolean shouldFail) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public JobStatus submitJob(BSPJobID jobID) throws IOException { + return new Job(jobID, this.conf).status; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + if (protocol.equals(InterTrackerProtocol.class.getName())) { + return InterTrackerProtocol.versionID; + } else if (protocol.equals(JobSubmissionProtocol.class.getName())) { + return JobSubmissionProtocol.versionID; + } else { + throw new IOException("Unknown protocol to job tracker: " + protocol); + } + } + + + + + + + + /** + * Local Job + */ + private class Job extends Thread { + private JobStatus status = new JobStatus(); + private String id; + + public Job(BSPJobID jobID, Configuration conf) throws IOException { + jobs.put(id, this); + this.start(); + } + + public void run() { + // TODO Auto-generated method stub + System.out.println(getName()); + + } + } +} Index: src/test/org/apache/hama/bsp/UserInterface.java =================================================================== --- src/test/org/apache/hama/bsp/UserInterface.java (리비전 957790) +++ src/test/org/apache/hama/bsp/UserInterface.java (작업 사본) @@ -103,6 +103,7 @@ } public void testBSPMain() throws InterruptedException, IOException { + /* PiEstimator thread; for (int i = 0; i < NUM_PEER; i++) { conf.set("bsp.peers.num", String.valueOf(NUM_PEER)); @@ -120,15 +121,15 @@ for (int i = 0; i < NUM_PEER; i++) { list.get(i).join(); } - - /* + */ + // BSP job configuration BSPJob bsp = new BSPJob(this.conf); // Set the job name + conf.set("bsp.master.address", "local"); bsp.setJobName("bsp test job"); bsp.setBspClass(PiEstimator.class); - bsp.submit(); - */ + BSPJobClient.runJob(bsp); } @Override