Index: src/examples/org/apache/hama/examples/PiEstimator.java
===================================================================
--- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440)
+++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본)
@@ -87,8 +87,8 @@
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
- conf.set("bsp.master.address", "local");
-
+ conf.set("bsp.master.address", "localhost:40000");
+
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
Index: src/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본)
@@ -19,7 +19,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import javax.security.auth.login.LoginException;
@@ -35,7 +34,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.ipc.JobSubmissionProtocol;
public class BSPJobClient extends Configured {
@@ -183,17 +181,12 @@
if ("local".equals(master)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
}
}
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
- }
-
/**
* Close the JobClient.
*/
@@ -366,7 +359,11 @@
//
// Now, actually submit the job (using the submit name)
//
+ System.out.println(jobSubmitClient.getFilesystemName());
+ System.out.println(jobSubmitClient.getClusterStatus(true));
JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
+ System.out.println(jobId.toString() + ", " + submitJobFile.toString());
+ System.out.println(status);
if (status != null) {
return new NetworkedJob(status);
} else {
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본)
@@ -49,7 +49,7 @@
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs.
*/
-public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol,
GroomServerManager {
static {
@@ -228,7 +228,7 @@
return conf.getLocalPath("bsp.local.dir", pathString);
}
- public BSPMaster startMaster() throws IOException,
+ public static BSPMaster startMaster(HamaConfiguration conf) throws IOException,
InterruptedException {
return startTracker(conf, generateNewIdentifier());
}
@@ -238,7 +238,8 @@
BSPMaster result = null;
result = new BSPMaster(conf, identifier);
-
+ result.taskScheduler.setGroomServerManager(result);
+
return result;
}
@@ -279,37 +280,6 @@
LOG.info("Stopped interTrackerServer");
}
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: HamaMaster");
- System.exit(-1);
- }
-
- try {
- HamaConfiguration conf = new HamaConfiguration();
- conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local");
-
- BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
- master.start();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
- public void run() {
- try {
- offerService();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
// //////////////////////////////////////////////////
// GroomServerManager
// //////////////////////////////////////////////////
@@ -330,8 +300,7 @@
@Override
public JobInProgress getJob(BSPJobID jobid) {
- // TODO Auto-generated method stub
- return null;
+ return jobs.get(jobid);
}
@Override
@@ -343,13 +312,12 @@
@Override
public int getNumberOfUniqueHosts() {
// TODO Auto-generated method stub
- return 0;
+ return 1;
}
@Override
public Collection grooms() {
- // TODO Auto-generated method stub
- return null;
+ return groomServers.values();
}
@Override
@@ -412,7 +380,7 @@
}
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
- // List actions = new ArrayList();
+ List actions = new ArrayList();
// Check for new tasks to be executed on the groom server
if (acceptNewTasks) {
@@ -420,13 +388,15 @@
if (groomStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
} else {
- // TODO - assignTasks should be implemented
- /*
- * List tasks = taskScheduler.assignTasks(groomStatus); for(Task
- * task : tasks) { if(tasks != null) { LOG.debug(groomName +
- * "-> LaunchTask: " + task.getTaskID()); actions.add(new
- * LaunchTaskAction(task)); } }
- */
+ List tasks = taskScheduler.assignTasks(groomStatus);
+ LOG.info("heartbeat(), tasks.size(): " + tasks.size());
+ for(Task task : tasks) {
+ if(tasks != null) {
+ LOG.debug(groomName + "-> LaunchTask: " + task.getTaskID());
+ actions.add(new LaunchTaskAction(task));
+ LOG.info("heartbeat(), actions.size(): " + actions.size());
+ }
+ }
}
}
@@ -463,19 +433,17 @@
}
@Override
- public JobStatus submitJob(BSPJobID jobId) throws IOException {
- LOG.info("Submitted a job (" + jobId + ")");
- if (jobs.containsKey(jobId)) {
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ if (jobs.containsKey(jobID)) {
// job already running, don't start twice
- LOG.info("The job (" + jobId + ") is already subbmitted");
- return jobs.get(jobId).getStatus();
+ LOG.info("The job (" + jobID + ") is already subbmitted");
+ return jobs.get(jobID).getStatus();
}
-
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
-
- return addJob(jobId, job);
+
+ JobInProgress job = new JobInProgress(jobID, this, this.conf);
+ return addJob(jobID, job);
}
-
+
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
synchronized (groomServers) {
@@ -502,7 +470,6 @@
jobs.put(job.getProfile().getJobID(), job);
taskScheduler.addJob(job);
}
-
return job.getStatus();
}
@@ -589,9 +556,31 @@
this.interTrackerServer.stop();
}
- @Override
- public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ public void createTaskEntry(TaskAttemptID taskAttemptID, GroomServerStatus status,
+ TaskInProgress taskInProgress) {
+ LOG.info("Adding task '" + taskAttemptID.getId() + "', for tracker '" + status.getGroomName() + "'");
+
+
+
}
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: HamaMaster");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+ BSPMaster master = startMaster(conf);
+ master.offerService();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
}
Index: src/java/org/apache/hama/bsp/BSPTask.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPTask.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPTask.java (리비전 0)
@@ -0,0 +1,14 @@
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPTask extends Task {
+
+ public BSPTask(Configuration conf) {
+ this.runner = (BSPRunner) ReflectionUtils.newInstance(
+ BSPRunner.class, conf);
+
+ }
+
+}
Index: src/java/org/apache/hama/bsp/GroomServer.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServer.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본)
@@ -335,10 +335,9 @@
try {
Configuration conf = new HamaConfiguration();
- conf.set("bsp.master.port", "40000");
- conf.set("bsp.groom.port", "40020");
- conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
- conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
startGroomServer(groom);
} catch (Throwable e) {
Index: src/java/org/apache/hama/bsp/GroomServerStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/GroomServerStatus.java (작업 사본)
@@ -144,7 +144,7 @@
Text.writeString(out, host);
out.writeInt(failures);
out.writeInt(maxTasks);
- out.writeInt(taskReports.size());
+ out.writeInt(taskReports.size());
for(TaskStatus taskStatus : taskReports) {
taskStatus.write(out);
}
Index: src/java/org/apache/hama/bsp/JobInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본)
@@ -45,6 +45,7 @@
static final Log LOG = LogFactory.getLog(JobInProgress.class);
+ Configuration conf;
JobProfile profile;
JobStatus status;
Path jobFile = null;
@@ -62,6 +63,7 @@
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
+ this.conf = conf;
this.jobId = jobId;
this.master = master;
@@ -80,10 +82,10 @@
fs.copyToLocalFile(jobFile, localJobFile);
BSPJobContext job = new BSPJobContext(localJobFile, jobId);
- System.out.println("user:" + job.getUser());
- System.out.println("jobId:" + jobId);
- System.out.println("jobFile:" + jobFile.toString());
- System.out.println("jobName:" + job.getJobName());
+ LOG.info("user:" + job.getUser());
+ LOG.info("jobId:" + jobId);
+ LOG.info("jobFile:" + jobFile.toString());
+ LOG.info("jobName:" + job.getJobName());
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
.getJobName());
@@ -135,7 +137,14 @@
// ///////////////////////////////////////////////////
public synchronized Task obtainNewTask(GroomServerStatus status,
int clusterSize, int numUniqueHosts) {
+ Task result = null;
+ try {
+ result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
+ numUniqueHosts).getTaskToRun(status);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
- return null;
+ return result;
}
}
Index: src/java/org/apache/hama/bsp/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본)
@@ -9,7 +9,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
@@ -42,6 +41,7 @@
@Override
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
// TODO Auto-generated method stub
+ System.out.println("LocalJobRunner.getClusterStatus(): Executed");
return null;
}
@@ -102,12 +102,6 @@
}
}
- @Override
- public JobStatus submitJob(BSPJobID jobName) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
/**
* Local Job
*/
@@ -116,12 +110,14 @@
private Configuration conf;
private int NUM_PEER;
private BSPJob job;
+ private String jobFile;
private boolean threadDone = false;
- private HashMap tasks = new HashMap();
+ private HashMap tasks = new HashMap();
public Job(BSPJobID jobID, String jobFile, Configuration conf)
throws IOException {
this.conf = conf;
+ this.jobFile = jobFile;
this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
this.job = new BSPJob(jobID, jobFile);
@@ -158,17 +154,17 @@
TaskID tID;
for (int i = 0; i < NUM_PEER; i++) {
this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
- BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
- BSPRunner.class, this.conf);
tID = new TaskID(job.getJobID(), false, i);
- tasks.put(tID.toString(), runner);
+
+ Task bspRunner = new BSPTask(this.conf);
+ tasks.put(tID.toString(), bspRunner);
}
- for (Map.Entry e : tasks.entrySet()) {
- e.getValue().start();
+ for (Map.Entry e : tasks.entrySet()) {
+ e.getValue().runner.start();
}
- for (Map.Entry e : tasks.entrySet()) {
+ for (Map.Entry e : tasks.entrySet()) {
try {
e.getValue().join();
} catch (InterruptedException e1) {
Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (작업 사본)
@@ -54,13 +54,11 @@
public List assignTasks(GroomServerStatus groomStatus)
throws IOException {
ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
+
final int numGroomServers = clusterStatus.getGroomServers();
// final int clusterTaskCapacity = clusterStatus.getMaxTasks();
- //
// Get task counts for the current groom.
- //
// final int groomTaskCapacity = groom.getMaxTasks();
final int groomRunningTasks = groomStatus.countTasks();
@@ -73,9 +71,11 @@
// instance to the scheduler.
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
+ /*
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+ */
Task t = null;
Index: src/java/org/apache/hama/bsp/Task.java
===================================================================
--- src/java/org/apache/hama/bsp/Task.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/Task.java (작업 사본)
@@ -28,14 +28,16 @@
/**
*
*/
-public class Task implements Writable {
+public class Task extends Thread implements Writable {
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
+
private String jobFile;
private TaskAttemptID taskId;
private int partition;
+ protected BSPRunner runner;
protected LocalDirAllocator lDirAlloc;
/**
*
@@ -103,4 +105,5 @@
taskId.readFields(in);
partition = in.readInt();
}
+
}
Index: src/java/org/apache/hama/bsp/TaskInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본)
@@ -17,10 +17,12 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
/**
*
@@ -28,7 +30,7 @@
class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
- private BSPJobContext context;
+ private Configuration conf;
// Constants
static final int MAX_TASK_EXECS = 1;
@@ -67,14 +69,28 @@
private TreeMap taskStatuses = new TreeMap();
public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
- BSPJobContext context, JobInProgress job, int partition) {
+ Configuration conf, JobInProgress job, int partition) {
this.jobFile = jobFile;
this.bspMaster = master;
this.job = job;
- this.context = context;
+ this.conf = conf;
this.partition = partition;
}
+ /**
+ * Return a Task that can be sent to a GroomServer for execution.
+ */
+ public Task getTaskToRun(GroomServerStatus status) throws IOException {
+ Task t = null;
+ t = new BSPTask(conf);
+
+ activeTasks.put(t.getTaskID(), status.getGroomName());
+
+ // Ask JobTracker to note that the task exists
+ bspMaster.createTaskEntry(t.getTaskID(), status, this);
+ return t;
+ }
+
// //////////////////////////////////
// Accessors
// //////////////////////////////////
Index: src/java/org/apache/hama/bsp/TaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/TaskScheduler.java (작업 사본)
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.mortbay.log.Log;
/**
* Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
@@ -43,6 +44,7 @@
public synchronized void setGroomServerManager(
GroomServerManager groomServerManager) {
+ Log.info("TaskScheduler.setGroomServermanager()");
this.groomServerManager = groomServerManager;
}
Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440)
+++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본)
@@ -44,11 +44,14 @@
* that job.
* The job files should be submitted in system-dir/jobName.
*
- * @param jobName
+ * @param jobID
+ * @param jobFile
* @return jobStatus
* @throws IOException
*/
- public JobStatus submitJob(BSPJobID jobName) throws IOException;
+ //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
/**
* Get the current status of the cluster
@@ -104,7 +107,5 @@
*/
public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
-
- JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
}
Index: src/java/org/apache/hama/util/ClusterUtil.java
===================================================================
--- src/java/org/apache/hama/util/ClusterUtil.java (리비전 958440)
+++ src/java/org/apache/hama/util/ClusterUtil.java (작업 사본)
@@ -4,6 +4,7 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.bsp.GroomServer;
import org.apache.log4j.Logger;
@@ -76,7 +77,7 @@
public static String startup(final BSPMaster m,
final List groomservers, Configuration conf) throws IOException, InterruptedException {
if (m != null) {
- m.start();
+ m.startMaster((HamaConfiguration) conf);
}
if (groomservers != null) {