Index: src/examples/org/apache/hama/examples/PiEstimator.java
===================================================================
--- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440)
+++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본)
@@ -87,8 +87,8 @@
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
- conf.set("bsp.master.address", "local");
-
+ conf.set("bsp.master.address", "localhost:40000");
+
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
Index: src/java/org/apache/hama/bsp/BSPJobClient.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본)
@@ -19,7 +19,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import javax.security.auth.login.LoginException;
@@ -35,7 +34,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.ipc.JobSubmissionProtocol;
public class BSPJobClient extends Configured {
@@ -183,17 +181,12 @@
if ("local".equals(master)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(
+ conf, JobSubmissionProtocol.class));
}
}
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
- conf, JobSubmissionProtocol.class));
- }
-
/**
* Close the JobClient.
*/
@@ -366,7 +359,11 @@
//
// Now, actually submit the job (using the submit name)
//
+ System.out.println(jobSubmitClient.getFilesystemName());
+ System.out.println(jobSubmitClient.getClusterStatus(true));
JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString());
+ System.out.println(jobId.toString() + ", " + submitJobFile.toString());
+ System.out.println(status);
if (status != null) {
return new NetworkedJob(status);
} else {
Index: src/java/org/apache/hama/bsp/BSPMaster.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본)
@@ -25,9 +25,14 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,7 @@
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs.
*/
-public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol,
GroomServerManager {
static {
@@ -67,14 +72,15 @@
}
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
-
+ public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+
// States
State state = State.INITIALIZING;
// Attributes
String masterIdentifier;
private Server interTrackerServer;
-
+
// Filesystem
static final String SUBDIR = "bspMaster";
FileSystem fs = null;
@@ -100,11 +106,82 @@
private Map jobs = new TreeMap();
private TaskScheduler taskScheduler;
- /*
- * private final List jobInProgressListeners = new
- * CopyOnWriteArrayList();
- */
+ ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+ Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
+ "expireLaunchingTasks");
+ private class ExpireLaunchingTasks implements Runnable {
+ private volatile boolean shouldRun = true;
+ private Map launchingTasks = new LinkedHashMap();
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ long now = System.currentTimeMillis();
+
+ synchronized (BSPMaster.this) {
+ synchronized (launchingTasks) {
+ Iterator> itr = launchingTasks
+ .entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry pair = itr.next();
+ String taskId = pair.getKey();
+ long age = now - ((Long) pair.getValue()).longValue();
+ LOG.debug(taskId + " is " + age + " ms debug.");
+
+ LOG.info(taskId);
+ if (age > GROOMSERVER_EXPIRY_INTERVAL) {
+ LOG.info("Launching task " + taskId + " timed out.");
+ TaskInProgress tip = null;
+ tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+ if (tip != null) {
+ JobInProgress job = tip.getJob();
+ String groomName = getAssignedTracker(taskId);
+ GroomServerStatus trackerStatus =
+ getGroomServer(groomName);
+ // This might happen when the tasktracker has already
+ // expired and this thread tries to call failedtask
+ // again. expire tasktracker should have called failed
+ // task!
+ if (trackerStatus != null) {
+ /*
+ job.failedTask(tip, taskId, "Error launching task",
+ tip.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.STARTING,
+ trackerStatus.getHost(), trackerName,
+ myMetrics);
+ */
+ }
+ }
+ itr.remove();
+ } else {
+ // the tasks are sorted by start time, so once we find
+ // one that we want to keep, we are done for this cycle.
+ break;
+ }
+
+ }
+ }
+ }
+ }
+ }
+
+ private String getAssignedTracker(String taskId) {
+ return taskidToTrackerMap.get(taskId);
+ }
+
+ public void addNewTask(String string) {
+ synchronized (launchingTasks) {
+ launchingTasks.put(string, new Long(System.currentTimeMillis()));
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+
+ }
+
/**
* Start the BSPMaster process, listen on the indicated hostname/port
*/
@@ -116,6 +193,7 @@
InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
+ //expireLaunchingTaskThread.start();
// Create the scheduler
Class extends TaskScheduler> schedulerClass = conf.getClass(
@@ -228,8 +306,8 @@
return conf.getLocalPath("bsp.local.dir", pathString);
}
- public BSPMaster startMaster() throws IOException,
- InterruptedException {
+ public static BSPMaster startMaster(HamaConfiguration conf)
+ throws IOException, InterruptedException {
return startTracker(conf, generateNewIdentifier());
}
@@ -238,6 +316,7 @@
BSPMaster result = null;
result = new BSPMaster(conf, identifier);
+ result.taskScheduler.setGroomServerManager(result);
return result;
}
@@ -279,37 +358,6 @@
LOG.info("Stopped interTrackerServer");
}
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
- if (args.length != 0) {
- System.out.println("usage: HamaMaster");
- System.exit(-1);
- }
-
- try {
- HamaConfiguration conf = new HamaConfiguration();
- conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local");
-
- BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf);
- master.start();
- } catch (Throwable e) {
- LOG.fatal(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
- public void run() {
- try {
- offerService();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
// //////////////////////////////////////////////////
// GroomServerManager
// //////////////////////////////////////////////////
@@ -330,8 +378,7 @@
@Override
public JobInProgress getJob(BSPJobID jobid) {
- // TODO Auto-generated method stub
- return null;
+ return jobs.get(jobid);
}
@Override
@@ -343,13 +390,12 @@
@Override
public int getNumberOfUniqueHosts() {
// TODO Auto-generated method stub
- return 0;
+ return 1;
}
@Override
public Collection grooms() {
- // TODO Auto-generated method stub
- return null;
+ return groomServers.values();
}
@Override
@@ -387,11 +433,6 @@
public HeartbeatResponse heartbeat(GroomServerStatus status,
boolean restarted, boolean initialContact, boolean acceptNewTasks,
short responseId) throws IOException {
- LOG.debug(">>> Received the heartbeat message from ");
- LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")");
- LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact);
- LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:"
- + status.getTaskReports().size());
// First check if the last heartbeat response got through
String groomName = status.getGroomName();
@@ -412,7 +453,7 @@
}
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
- // List actions = new ArrayList();
+ List actions = new ArrayList();
// Check for new tasks to be executed on the groom server
if (acceptNewTasks) {
@@ -420,19 +461,101 @@
if (groomStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + groomName);
} else {
- // TODO - assignTasks should be implemented
- /*
- * List tasks = taskScheduler.assignTasks(groomStatus); for(Task
- * task : tasks) { if(tasks != null) { LOG.debug(groomName +
- * "-> LaunchTask: " + task.getTaskID()); actions.add(new
- * LaunchTaskAction(task)); } }
- */
+ List tasks = taskScheduler.assignTasks(groomStatus);
+ for (Task task : tasks) {
+ if (tasks != null) {
+ expireLaunchingTasks.addNewTask(task.getTaskID());
+ actions.add(new LaunchTaskAction(task));
+ }
+ }
}
}
+ // Check for tasks to be killed
+ List killTasksList = getTasksToKill(groomName);
+ if (killTasksList != null) {
+ actions.addAll(killTasksList);
+ }
+
+ response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
+
+ groomToHeartbeatResponseMap.put(groomName, response);
+ removeMarkedTasks(groomName);
+
return response;
}
+
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
+ TreeMap> trackerToMarkedTasksMap = new TreeMap();
+
+ private void removeMarkedTasks(String groomName) {
+ // Purge all the 'marked' tasks which were running at taskTracker
+ TreeSet markedTaskSet =
+ (TreeSet) trackerToMarkedTasksMap.get(groomName);
+ if (markedTaskSet != null) {
+ for (String taskid : markedTaskSet) {
+ removeTaskEntry(taskid);
+ LOG.info("Removed completed task '" + taskid + "' from '" +
+ groomName + "'");
+ }
+ // Clear
+ trackerToMarkedTasksMap.remove(groomName);
+ }
+ }
+
+ private void removeTaskEntry(String taskid) {
+ // taskid --> tracker
+ String tracker = taskidToTrackerMap.remove(taskid);
+
+ // tracker --> taskid
+ if (tracker != null) {
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+ if (trackerSet != null) {
+ trackerSet.remove(taskid);
+ }
+ }
+
+ // taskid --> TIP
+ taskidToTIPMap.remove(taskid);
+
+ LOG.debug("Removing task '" + taskid + "'");
+ }
+
+ private List getTasksToKill(String groomName) {
+ Set taskIds = (TreeSet) trackerToTaskMap.get(groomName);
+ if (taskIds != null) {
+ List killList = new ArrayList();
+ Set killJobIds = new TreeSet();
+ for (String killTaskId : taskIds ) {
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+ if (tip.shouldCloseForClosedJob(killTaskId)) {
+ //
+ // This is how the JobTracker ends a task at the TaskTracker.
+ // It may be successfully completed, or may be killed in
+ // mid-execution.
+ //
+ if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
+ killList.add(new KillTaskAction(killTaskId));
+ LOG.debug(groomName + " -> KillTaskAction: " + killTaskId);
+ } else {
+ String killJobId = tip.getJob().getStatus().getJobID().getJtIdentifier();
+ killJobIds.add(killJobId);
+ }
+ }
+ }
+
+ for (String killJobId : killJobIds) {
+ killList.add(new KillJobAction(killJobId));
+ LOG.debug(groomName + " -> KillJobAction: " + killJobId);
+ }
+
+ return killList;
+ }
+ return null;
+
+ }
+
/**
* Process incoming heartbeat messages from the groom.
*/
@@ -463,17 +586,15 @@
}
@Override
- public JobStatus submitJob(BSPJobID jobId) throws IOException {
- LOG.info("Submitted a job (" + jobId + ")");
- if (jobs.containsKey(jobId)) {
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ if (jobs.containsKey(jobID)) {
// job already running, don't start twice
- LOG.info("The job (" + jobId + ") is already subbmitted");
- return jobs.get(jobId).getStatus();
+ LOG.info("The job (" + jobID + ") is already subbmitted");
+ return jobs.get(jobID).getStatus();
}
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
-
- return addJob(jobId, job);
+ JobInProgress job = new JobInProgress(jobID, this, this.conf);
+ return addJob(jobID, job);
}
@Override
@@ -502,7 +623,6 @@
jobs.put(job.getProfile().getJobID(), job);
taskScheduler.addJob(job);
}
-
return job.getStatus();
}
@@ -589,9 +709,56 @@
this.interTrackerServer.stop();
}
- @Override
- public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ TreeMap taskidToTrackerMap = new TreeMap();
+ TreeMap> trackerToTaskMap = new TreeMap>();
+ Map taskidToTIPMap = new TreeMap();
+
+ public void createTaskEntry(String taskid, String groomServer,
+ TaskInProgress taskInProgress) {
+ LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId() + ", for tracker '" + groomServer + "'");
+ /*
+ // taskid --> groom
+ taskidToTrackerMap.put(taskid, groomServer);
+ // groom --> taskid
+ TreeSet taskset = null;
+ if(trackerToTaskMap.entrySet().size() > 0) {
+ taskset = trackerToTaskMap.get(groomServer);
+ LOG.info(taskset.size());
+ LOG.info(taskset.size());
+ LOG.info(taskset.size());
+ }
+
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToTaskMap.put(groomServer, taskset);
+ }
+ taskset.add(taskid);
+ taskidToTIPMap.put(taskid, taskInProgress);
+
+ LOG.info("" + taskidToTrackerMap);
+ LOG.info("" + taskidToTIPMap);
+ */
}
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+ if (args.length != 0) {
+ System.out.println("usage: HamaMaster");
+ System.exit(-1);
+ }
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.addResource(new Path(
+ "/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path(
+ "/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
+ BSPMaster master = startMaster(conf);
+ master.offerService();
+ } catch (Throwable e) {
+ LOG.fatal(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
}
Index: src/java/org/apache/hama/bsp/BSPRunner.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPRunner.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/BSPRunner.java (작업 사본)
@@ -13,10 +13,13 @@
private BSPPeer bspPeer;
private Configuration conf;
private BSP bsp;
-
+ private boolean isDone;
+
public void run() {
try {
+ isDone = bspPeer.leaveBarrier();
bsp.bsp(bspPeer);
+ isDone = bspPeer.enterBarrier();
} catch (Exception e) {
LOG.error(e);
}
@@ -39,4 +42,8 @@
bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
BSP.class), conf);
}
+
+ public boolean isDone() {
+ return this.isDone;
+ }
}
Index: src/java/org/apache/hama/bsp/BSPTask.java
===================================================================
--- src/java/org/apache/hama/bsp/BSPTask.java (리비전 0)
+++ src/java/org/apache/hama/bsp/BSPTask.java (리비전 0)
@@ -0,0 +1,18 @@
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPTask extends Task {
+
+ public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.taskId = taskid;
+ this.partition = partition;
+ this.runner = (BSPRunner) ReflectionUtils.newInstance(
+ BSPRunner.class, conf);
+
+ }
+
+}
Index: src/java/org/apache/hama/bsp/GroomServer.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServer.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본)
@@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -86,6 +88,9 @@
Map runningTasks = null;
Map runningJobs = null;
+ private BlockingQueue tasksToCleanup =
+ new LinkedBlockingQueue();
+
public GroomServer(Configuration conf) throws IOException {
this.conf = conf;
bspMasterAddr = BSPMaster.getAddress(conf);
@@ -203,6 +208,23 @@
// Send the heartbeat and process the bspmaster's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+ GroomServerAction[] actions = heartbeatResponse.getActions();
+ LOG.info("Got heartbeatResponse from BSPMaster with responseId: " +
+ heartbeatResponse.getResponseId() + " and " +
+ ((actions != null) ? actions.length : 0) + " actions");
+
+
+ if (actions != null){
+ for(GroomServerAction action: actions) {
+ if (action instanceof LaunchTaskAction) {
+ startNewTask((LaunchTaskAction) action);
+ } else {
+ tasksToCleanup.put(action);
+ }
+ }
+ }
+
//
// The heartbeat got through successfully!
//
@@ -236,8 +258,16 @@
return State.NORMAL;
}
+ private void startNewTask(LaunchTaskAction action) {
+ // TODO Auto-generated method stub
+ Task t = action.getTask();
+ LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+
+ // TODO: execute task
+
+ }
+
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
-
//
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the BSPMaster;
@@ -273,7 +303,9 @@
initialize();
startCleanupThreads();
boolean denied = false;
+ LOG.info("Why? " + running + ", " + shuttingDown + ", " + denied);
while (running && !shuttingDown && !denied) {
+
boolean staleState = false;
try {
while (running && !staleState && !shuttingDown && !denied) {
@@ -335,10 +367,9 @@
try {
Configuration conf = new HamaConfiguration();
- conf.set("bsp.master.port", "40000");
- conf.set("bsp.groom.port", "40020");
- conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
- conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
+ conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
+
GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
startGroomServer(groom);
} catch (Throwable e) {
Index: src/java/org/apache/hama/bsp/GroomServerStatus.java
===================================================================
--- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/GroomServerStatus.java (작업 사본)
@@ -144,7 +144,7 @@
Text.writeString(out, host);
out.writeInt(failures);
out.writeInt(maxTasks);
- out.writeInt(taskReports.size());
+ out.writeInt(taskReports.size());
for(TaskStatus taskStatus : taskReports) {
taskStatus.write(out);
}
Index: src/java/org/apache/hama/bsp/JobInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본)
@@ -45,6 +45,7 @@
static final Log LOG = LogFactory.getLog(JobInProgress.class);
+ Configuration conf;
JobProfile profile;
JobStatus status;
Path jobFile = null;
@@ -62,6 +63,7 @@
public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
throws IOException {
+ this.conf = conf;
this.jobId = jobId;
this.master = master;
@@ -80,10 +82,10 @@
fs.copyToLocalFile(jobFile, localJobFile);
BSPJobContext job = new BSPJobContext(localJobFile, jobId);
- System.out.println("user:" + job.getUser());
- System.out.println("jobId:" + jobId);
- System.out.println("jobFile:" + jobFile.toString());
- System.out.println("jobName:" + job.getJobName());
+ LOG.info("user:" + job.getUser());
+ LOG.info("jobId:" + jobId);
+ LOG.info("jobFile:" + jobFile.toString());
+ LOG.info("jobName:" + job.getJobName());
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
.getJobName());
@@ -135,7 +137,16 @@
// ///////////////////////////////////////////////////
public synchronized Task obtainNewTask(GroomServerStatus status,
int clusterSize, int numUniqueHosts) {
+ Task result = null;
+ try {
+ result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
+ numUniqueHosts).getTaskToRun(status);
+ LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
- return null;
+ return result;
}
}
Index: src/java/org/apache/hama/bsp/KillJobAction.java
===================================================================
--- src/java/org/apache/hama/bsp/KillJobAction.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/KillJobAction.java (작업 사본)
@@ -21,6 +21,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
+
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
* {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
@@ -28,30 +30,30 @@
*
*/
class KillJobAction extends GroomServerAction {
- final BSPJobID jobId;
+ String jobId;
public KillJobAction() {
super(ActionType.KILL_JOB);
- jobId = new BSPJobID();
+ jobId = new String();
}
- public KillJobAction(BSPJobID jobId) {
+ public KillJobAction(String killJobId) {
super(ActionType.KILL_JOB);
- this.jobId = jobId;
+ this.jobId = killJobId;
}
- public BSPJobID getJobID() {
+ public String getJobID() {
return jobId;
}
@Override
public void write(DataOutput out) throws IOException {
- jobId.write(out);
+ Text.writeString(out, jobId);
}
@Override
public void readFields(DataInput in) throws IOException {
- jobId.readFields(in);
+ jobId = Text.readString(in);
}
}
Index: src/java/org/apache/hama/bsp/KillTaskAction.java
===================================================================
--- src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/KillTaskAction.java (작업 사본)
@@ -21,36 +21,38 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
+
/**
* Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
* to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
*
*/
class KillTaskAction extends GroomServerAction {
- final TaskAttemptID taskId;
+ String taskId;
public KillTaskAction() {
super(ActionType.KILL_TASK);
- taskId = new TaskAttemptID();
+ taskId = new String();
}
- public KillTaskAction(TaskAttemptID taskId) {
+ public KillTaskAction(String killTaskId) {
super(ActionType.KILL_TASK);
- this.taskId = taskId;
+ this.taskId = killTaskId;
}
- public TaskAttemptID getTaskID() {
+ public String getTaskID() {
return taskId;
}
@Override
public void write(DataOutput out) throws IOException {
- taskId.write(out);
+ Text.writeString(out, taskId);
}
@Override
public void readFields(DataInput in) throws IOException {
- taskId.readFields(in);
+ taskId = Text.readString(in);
}
}
Index: src/java/org/apache/hama/bsp/LocalJobRunner.java
===================================================================
--- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454)
+++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본)
@@ -9,7 +9,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.ipc.InterTrackerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
@@ -42,6 +41,7 @@
@Override
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
// TODO Auto-generated method stub
+ System.out.println("LocalJobRunner.getClusterStatus(): Executed");
return null;
}
@@ -102,12 +102,6 @@
}
}
- @Override
- public JobStatus submitJob(BSPJobID jobName) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
/**
* Local Job
*/
@@ -116,12 +110,14 @@
private Configuration conf;
private int NUM_PEER;
private BSPJob job;
+ private String jobFile;
private boolean threadDone = false;
- private HashMap tasks = new HashMap();
+ private HashMap tasks = new HashMap();
public Job(BSPJobID jobID, String jobFile, Configuration conf)
throws IOException {
this.conf = conf;
+ this.jobFile = jobFile;
this.NUM_PEER = conf.getInt("bsp.peers.num", 0);
LOG.info("LocalJobRunner: " + jobID + ", " + jobFile);
this.job = new BSPJob(jobID, jobFile);
@@ -158,17 +154,17 @@
TaskID tID;
for (int i = 0; i < NUM_PEER; i++) {
this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
- BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance(
- BSPRunner.class, this.conf);
tID = new TaskID(job.getJobID(), false, i);
- tasks.put(tID.toString(), runner);
+
+ Task bspRunner = new BSPTask(jobFile, jobFile, jobFile, i, this.conf);
+ tasks.put(tID.toString(), bspRunner);
}
- for (Map.Entry e : tasks.entrySet()) {
- e.getValue().start();
+ for (Map.Entry e : tasks.entrySet()) {
+ e.getValue().runner.start();
}
- for (Map.Entry e : tasks.entrySet()) {
+ for (Map.Entry e : tasks.entrySet()) {
try {
e.getValue().join();
} catch (InterruptedException e1) {
Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (작업 사본)
@@ -54,13 +54,11 @@
public List assignTasks(GroomServerStatus groomStatus)
throws IOException {
ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
+
final int numGroomServers = clusterStatus.getGroomServers();
// final int clusterTaskCapacity = clusterStatus.getMaxTasks();
- //
// Get task counts for the current groom.
- //
// final int groomTaskCapacity = groom.getMaxTasks();
final int groomRunningTasks = groomStatus.countTasks();
@@ -73,14 +71,19 @@
// instance to the scheduler.
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
+ /*
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+ */
Task t = null;
t = job.obtainNewTask(groomStatus, numGroomServers,
groomServerManager.getNumberOfUniqueHosts());
+
+ LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+
if (t != null) {
assignedTasks.add(t);
break; // TODO - Now, simple scheduler assigns only one task to
Index: src/java/org/apache/hama/bsp/Task.java
===================================================================
--- src/java/org/apache/hama/bsp/Task.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/Task.java (작업 사본)
@@ -21,6 +21,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -28,23 +30,28 @@
/**
*
*/
-public class Task implements Writable {
+public class Task extends Thread implements Writable {
+ public static final Log LOG = LogFactory.getLog(Task.class);
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
- private String jobFile;
- private TaskAttemptID taskId;
- private int partition;
+ protected String jobId;
+ protected String jobFile;
+ protected String taskId;
+ protected int partition;
+
+ protected BSPRunner runner;
protected LocalDirAllocator lDirAlloc;
/**
*
*/
public Task() {
- taskId = new TaskAttemptID();
+ taskId = new String();
}
- public Task(String jobFile, TaskAttemptID taskId, int partition) {
+ public Task(String jobId, String jobFile, String taskId, int partition) {
+ this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskId;
@@ -62,7 +69,7 @@
return jobFile;
}
- public TaskAttemptID getTaskID() {
+ public String getTaskID() {
return taskId;
}
@@ -70,8 +77,8 @@
* Get the job name for this task.
* @return the job name
*/
- public BSPJobID getJobID() {
- return taskId.getJobID();
+ public String getJobID() {
+ return jobId;
}
/**
@@ -92,15 +99,20 @@
////////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
+ LOG.info(jobId +", "+ jobFile +", "+ taskId +", "+ partition);
+
+ Text.writeString(out, jobId);
Text.writeString(out, jobFile);
- taskId.write(out);
+ Text.writeString(out, taskId);
out.writeInt(partition);
}
@Override
public void readFields(DataInput in) throws IOException {
+ jobId = Text.readString(in);
jobFile = Text.readString(in);
- taskId.readFields(in);
+ taskId = Text.readString(in);
partition = in.readInt();
}
+
}
Index: src/java/org/apache/hama/bsp/TaskInProgress.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본)
@@ -17,10 +17,14 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
/**
*
@@ -28,7 +32,7 @@
class TaskInProgress {
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
- private BSPJobContext context;
+ private Configuration conf;
// Constants
static final int MAX_TASK_EXECS = 1;
@@ -58,7 +62,7 @@
// Map from task Id -> GroomServer Id, contains tasks that are
// currently runnings
- private TreeMap activeTasks = new TreeMap();
+ private TreeMap activeTasks = new TreeMap();
// All attempt Ids of this TIP
// private TreeSet tasks = new TreeSet();
/**
@@ -66,15 +70,44 @@
*/
private TreeMap taskStatuses = new TreeMap();
+ private BSPJobID jobId;
+
public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
- BSPJobContext context, JobInProgress job, int partition) {
+ Configuration conf, JobInProgress job, int partition) {
+ this.jobId = jobId;
this.jobFile = jobFile;
this.bspMaster = master;
this.job = job;
- this.context = context;
+ this.conf = conf;
this.partition = partition;
+
+ this.id = new TaskID(jobId, true, partition);
}
+ /**
+ * Return a Task that can be sent to a GroomServer for execution.
+ */
+ public Task getTaskToRun(GroomServerStatus status) throws IOException {
+ Task t = null;
+
+ String taskid = null;
+ if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+ taskid = new String("task_" + nextTaskId);
+ ++nextTaskId;
+ } else {
+ LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
+ " attempts for the tip '" + getTIPId() + "'");
+ return null;
+ }
+
+ t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, conf);
+ activeTasks.put(taskid, status.getGroomName());
+
+ // Ask JobTracker to note that the task exists
+ bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
+ return t;
+ }
+
// //////////////////////////////////
// Accessors
// //////////////////////////////////
@@ -123,4 +156,18 @@
public synchronized boolean isComplete() {
return (completes > 0);
}
+
+ private TreeSet tasksReportedClosed = new TreeSet();
+
+ public boolean shouldCloseForClosedJob(String taskid) {
+ TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+ if ((ts != null) &&
+ (! tasksReportedClosed.contains(taskid)) &&
+ (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+ tasksReportedClosed.add(taskid);
+ return true;
+ } else {
+ return false;
+ }
+ }
}
Index: src/java/org/apache/hama/bsp/TaskScheduler.java
===================================================================
--- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 958440)
+++ src/java/org/apache/hama/bsp/TaskScheduler.java (작업 사본)
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.mortbay.log.Log;
/**
* Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
@@ -43,6 +44,7 @@
public synchronized void setGroomServerManager(
GroomServerManager groomServerManager) {
+ Log.info("TaskScheduler.setGroomServermanager()");
this.groomServerManager = groomServerManager;
}
Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
===================================================================
--- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440)
+++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본)
@@ -44,11 +44,14 @@
* that job.
* The job files should be submitted in system-dir/jobName.
*
- * @param jobName
+ * @param jobID
+ * @param jobFile
* @return jobStatus
* @throws IOException
*/
- public JobStatus submitJob(BSPJobID jobName) throws IOException;
+ //public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
/**
* Get the current status of the cluster
@@ -104,7 +107,5 @@
*/
public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
-
- JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
}
Index: src/java/org/apache/hama/util/ClusterUtil.java
===================================================================
--- src/java/org/apache/hama/util/ClusterUtil.java (리비전 958440)
+++ src/java/org/apache/hama/util/ClusterUtil.java (작업 사본)
@@ -4,6 +4,7 @@
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.bsp.GroomServer;
import org.apache.log4j.Logger;
@@ -76,7 +77,7 @@
public static String startup(final BSPMaster m,
final List groomservers, Configuration conf) throws IOException, InterruptedException {
if (m != null) {
- m.start();
+ m.startMaster((HamaConfiguration) conf);
}
if (groomservers != null) {