Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440) +++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본) @@ -87,8 +87,8 @@ // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally - conf.set("bsp.master.address", "local"); - + conf.set("bsp.master.address", "localhost:40000"); + BSPJob bsp = new BSPJob(conf, PiEstimator.class); // Set the job name bsp.setJobName("pi estimation example"); Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -19,7 +19,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; import javax.security.auth.login.LoginException; @@ -35,7 +34,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hama.bsp.BSPMaster; import org.apache.hama.ipc.JobSubmissionProtocol; public class BSPJobClient extends Configured { @@ -183,17 +181,12 @@ if ("local".equals(master)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory( + conf, JobSubmissionProtocol.class)); } } - private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, - Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory( - conf, JobSubmissionProtocol.class)); - } - /** * Close the JobClient. */ @@ -366,7 +359,11 @@ // // Now, actually submit the job (using the submit name) // + System.out.println(jobSubmitClient.getFilesystemName()); + System.out.println(jobSubmitClient.getClusterStatus(true)); JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString()); + System.out.println(jobId.toString() + ", " + submitJobFile.toString()); + System.out.println(status); if (status != null) { return new NetworkedJob(status); } else { Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -288,8 +288,6 @@ try { HamaConfiguration conf = new HamaConfiguration(); - conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local"); - BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf); master.start(); } catch (Throwable e) { @@ -462,6 +460,7 @@ return new BSPJobID(this.masterIdentifier, nextJobId++); } + /* @Override public JobStatus submitJob(BSPJobID jobId) throws IOException { LOG.info("Submitted a job (" + jobId + ")"); @@ -475,9 +474,25 @@ return addJob(jobId, job); } - + */ + @Override + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { + System.out.println("BSPMaster.submitJob(): " + jobID.toString()); + if (jobs.containsKey(jobID)) { + // job already running, don't start twice + LOG.info("The job (" + jobID + ") is already subbmitted"); + return jobs.get(jobID).getStatus(); + } + + JobInProgress job = new JobInProgress(jobID, this, this.conf); + + return addJob(jobID, job); + } + + @Override public ClusterStatus getClusterStatus(boolean detailed) { + System.out.println("BSPMaster.getClusterStatus(): Executed"); synchronized (groomServers) { if (detailed) { List groomNames = groomServerNames(); @@ -513,6 +528,7 @@ @Override public synchronized String getFilesystemName() throws IOException { + System.out.println("BSPMaster.getFilesystemName(): Executed"); if (fs == null) { throw new IllegalStateException("FileSystem object not available yet"); } @@ -589,9 +605,4 @@ this.interTrackerServer.stop(); } - @Override - public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { - // TODO Auto-generated method stub - return null; - } } Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440) +++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본) @@ -80,10 +80,10 @@ fs.copyToLocalFile(jobFile, localJobFile); BSPJobContext job = new BSPJobContext(localJobFile, jobId); - System.out.println("user:" + job.getUser()); - System.out.println("jobId:" + jobId); - System.out.println("jobFile:" + jobFile.toString()); - System.out.println("jobName:" + job.getJobName()); + LOG.info("user:" + job.getUser()); + LOG.info("jobId:" + jobId); + LOG.info("jobFile:" + jobFile.toString()); + LOG.info("jobName:" + job.getJobName()); this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job .getJobName()); Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본) @@ -42,6 +42,7 @@ @Override public ClusterStatus getClusterStatus(boolean detailed) throws IOException { // TODO Auto-generated method stub + System.out.println("LocalJobRunner.getClusterStatus(): Executed"); return null; } @@ -102,12 +103,6 @@ } } - @Override - public JobStatus submitJob(BSPJobID jobName) throws IOException { - // TODO Auto-generated method stub - return null; - } - /** * Local Job */ Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -48,7 +48,9 @@ * @return jobStatus * @throws IOException */ - public JobStatus submitJob(BSPJobID jobName) throws IOException; + //public JobStatus submitJob(BSPJobID jobName) throws IOException; + + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; /** * Get the current status of the cluster @@ -104,7 +106,5 @@ */ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; - - JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; }