Index: src/examples/org/apache/hama/examples/PiEstimator.java
===================================================================
--- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440)
+++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본)
@@ -87,8 +87,8 @@
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
- conf.set("bsp.master.address", "local");
-
+ conf.set("bsp.master.address", "localhost:40000");
+
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
Index: src/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본)
@@ -19,7 +19,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import javax.security.auth.login.LoginException;
@@ -35,7 +34,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.ipc.JobSubmissionProtocol;
public class BSPJobClient extends Configured {
@@ -183,17 +181,12 @@
if ("local".equals(master)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
}
}
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
- }
-
/**
* Close the JobClient.
*/
@@ -366,7 +359,11 @@
//
// Now, actually submit the job (using the submit name)
//
+ System.out.println(jobSubmitClient.getFilesystemName());
+ System.out.println(jobSubmitClient.getClusterStatus(true));
JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
+ System.out.println(jobId.toString() + ", " + submitJobFile.toString());
+ System.out.println(status);
if (status != null) {
return new NetworkedJob(status);
} else {
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본)
@@ -288,8 +288,6 @@
try {
HamaConfiguration conf = new HamaConfiguration();
- conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local");
-
BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
master.start();
} catch (Throwable e) {
@@ -462,6 +460,7 @@
return new BSPJobID(this.masterIdentifier, nextJobId++);
}
+ /*
@Override
public JobStatus submitJob(BSPJobID jobId) throws IOException {
LOG.info("Submitted a job (" + jobId + ")");
@@ -475,9 +474,25 @@
return addJob(jobId, job);
}
-
+ */
+
@Override
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ System.out.println("BSPMaster.submitJob(): " + jobID.toString());
+ if (jobs.containsKey(jobID)) {
+ // job already running, don't start twice
+ LOG.info("The job (" + jobID + ") is already subbmitted");
+ return jobs.get(jobID).getStatus();
+ }
+
+ JobInProgress job = new JobInProgress(jobID, this, this.conf);
+
+ return addJob(jobID, job);
+ }
+
+ @Override
public ClusterStatus getClusterStatus(boolean detailed) {
+ System.out.println("BSPMaster.getClusterStatus(): Executed");
synchronized (groomServers) {
if (detailed) {
List groomNames = groomServerNames();
@@ -513,6 +528,7 @@
@Override
public synchronized String getFilesystemName() throws IOException {
+ System.out.println("BSPMaster.getFilesystemName(): Executed");
if (fs == null) {
throw new IllegalStateException("FileSystem object not available yet");
}
@@ -589,9 +605,4 @@
this.interTrackerServer.stop();
}
- @Override
- public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
}
Index: src/java/org/apache/hama/bsp/JobInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본)
@@ -80,10 +80,10 @@
fs.copyToLocalFile(jobFile, localJobFile);
BSPJobContext job = new BSPJobContext(localJobFile, jobId);
- System.out.println("user:" + job.getUser());
- System.out.println("jobId:" + jobId);
- System.out.println("jobFile:" + jobFile.toString());
- System.out.println("jobName:" + job.getJobName());
+ LOG.info("user:" + job.getUser());
+ LOG.info("jobId:" + jobId);
+ LOG.info("jobFile:" + jobFile.toString());
+ LOG.info("jobName:" + job.getJobName());
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
.getJobName());
Index: src/java/org/apache/hama/bsp/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본)
@@ -42,6 +42,7 @@
@Override
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
// TODO Auto-generated method stub
+ System.out.println("LocalJobRunner.getClusterStatus(): Executed");
return null;
}
@@ -102,12 +103,6 @@
}
}
- @Override
- public JobStatus submitJob(BSPJobID jobName) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
/**
* Local Job
*/
Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440)
+++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본)
@@ -48,7 +48,9 @@
* @return jobStatus
* @throws IOException
*/
- public JobStatus submitJob(BSPJobID jobName) throws IOException;
+ //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
/**
* Get the current status of the cluster
@@ -104,7 +106,5 @@
*/
public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
-
- JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
}