Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440) +++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본) @@ -87,8 +87,8 @@ // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally - conf.set("bsp.master.address", "local"); - + conf.set("bsp.master.address", "localhost:40000"); + BSPJob bsp = new BSPJob(conf, PiEstimator.class); // Set the job name bsp.setJobName("pi estimation example"); Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -19,7 +19,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; import javax.security.auth.login.LoginException; @@ -35,7 +34,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hama.bsp.BSPMaster; import org.apache.hama.ipc.JobSubmissionProtocol; public class BSPJobClient extends Configured { @@ -183,17 +181,12 @@ if ("local".equals(master)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory( + conf, JobSubmissionProtocol.class)); } } - private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, - Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory( - conf, JobSubmissionProtocol.class)); - } - /** * Close the JobClient. */ @@ -366,7 +359,11 @@ // // Now, actually submit the job (using the submit name) // + System.out.println(jobSubmitClient.getFilesystemName()); + System.out.println(jobSubmitClient.getClusterStatus(true)); JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString()); + System.out.println(jobId.toString() + ", " + submitJobFile.toString()); + System.out.println(status); if (status != null) { return new NetworkedJob(status); } else { Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -25,9 +25,14 @@ import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +54,7 @@ * BSPMaster is responsible to control all the groom servers and to manage bsp * jobs. */ -public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol, +public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol, GroomServerManager { static { @@ -67,14 +72,15 @@ } private static final int FS_ACCESS_RETRY_PERIOD = 10000; - + public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000; + // States State state = State.INITIALIZING; // Attributes String masterIdentifier; private Server interTrackerServer; - + // Filesystem static final String SUBDIR = "bspMaster"; FileSystem fs = null; @@ -100,11 +106,82 @@ private Map jobs = new TreeMap(); private TaskScheduler taskScheduler; - /* - * private final List jobInProgressListeners = new - * CopyOnWriteArrayList(); - */ + ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); + Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, + "expireLaunchingTasks"); + private class ExpireLaunchingTasks implements Runnable { + private volatile boolean shouldRun = true; + private Map launchingTasks = new LinkedHashMap(); + + @Override + public void run() { + while (shouldRun) { + long now = System.currentTimeMillis(); + + synchronized (BSPMaster.this) { + synchronized (launchingTasks) { + Iterator> itr = launchingTasks + .entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry pair = itr.next(); + String taskId = pair.getKey(); + long age = now - ((Long) pair.getValue()).longValue(); + LOG.debug(taskId + " is " + age + " ms debug."); + + LOG.info(taskId); + if (age > GROOMSERVER_EXPIRY_INTERVAL) { + LOG.info("Launching task " + taskId + " timed out."); + TaskInProgress tip = null; + tip = (TaskInProgress) taskidToTIPMap.get(taskId); + if (tip != null) { + JobInProgress job = tip.getJob(); + String groomName = getAssignedTracker(taskId); + GroomServerStatus trackerStatus = + getGroomServer(groomName); + // This might happen when the tasktracker has already + // expired and this thread tries to call failedtask + // again. expire tasktracker should have called failed + // task! + if (trackerStatus != null) { + /* + job.failedTask(tip, taskId, "Error launching task", + tip.isMapTask()? TaskStatus.Phase.MAP: + TaskStatus.Phase.STARTING, + trackerStatus.getHost(), trackerName, + myMetrics); + */ + } + } + itr.remove(); + } else { + // the tasks are sorted by start time, so once we find + // one that we want to keep, we are done for this cycle. + break; + } + + } + } + } + } + } + + private String getAssignedTracker(String taskId) { + return taskidToTrackerMap.get(taskId); + } + + public void addNewTask(String string) { + synchronized (launchingTasks) { + launchingTasks.put(string, new Long(System.currentTimeMillis())); + } + } + + public void stop() { + shouldRun = false; + } + + } + /** * Start the BSPMaster process, listen on the indicated hostname/port */ @@ -116,6 +193,7 @@ InterruptedException { this.conf = conf; this.masterIdentifier = identifier; + //expireLaunchingTaskThread.start(); // Create the scheduler Class schedulerClass = conf.getClass( @@ -228,8 +306,8 @@ return conf.getLocalPath("bsp.local.dir", pathString); } - public BSPMaster startMaster() throws IOException, - InterruptedException { + public static BSPMaster startMaster(HamaConfiguration conf) + throws IOException, InterruptedException { return startTracker(conf, generateNewIdentifier()); } @@ -238,6 +316,7 @@ BSPMaster result = null; result = new BSPMaster(conf, identifier); + result.taskScheduler.setGroomServerManager(result); return result; } @@ -279,37 +358,6 @@ LOG.info("Stopped interTrackerServer"); } - public static void main(String[] args) { - StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); - if (args.length != 0) { - System.out.println("usage: HamaMaster"); - System.exit(-1); - } - - try { - HamaConfiguration conf = new HamaConfiguration(); - conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local"); - - BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf); - master.start(); - } catch (Throwable e) { - LOG.fatal(StringUtils.stringifyException(e)); - System.exit(-1); - } - } - - public void run() { - try { - offerService(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - // ////////////////////////////////////////////////// // GroomServerManager // ////////////////////////////////////////////////// @@ -330,8 +378,7 @@ @Override public JobInProgress getJob(BSPJobID jobid) { - // TODO Auto-generated method stub - return null; + return jobs.get(jobid); } @Override @@ -343,13 +390,12 @@ @Override public int getNumberOfUniqueHosts() { // TODO Auto-generated method stub - return 0; + return 1; } @Override public Collection grooms() { - // TODO Auto-generated method stub - return null; + return groomServers.values(); } @Override @@ -387,11 +433,6 @@ public HeartbeatResponse heartbeat(GroomServerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { - LOG.debug(">>> Received the heartbeat message from "); - LOG.debug(">>> " + status.groomName + "(" + status.getHost() + ")"); - LOG.debug(">>> restarted:" + restarted + ",first:" + initialContact); - LOG.debug(">>> maxTaskCapacity:" + status.getMaxTasks() + ",taskCapacity:" - + status.getTaskReports().size()); // First check if the last heartbeat response got through String groomName = status.getGroomName(); @@ -412,7 +453,7 @@ } HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); - // List actions = new ArrayList(); + List actions = new ArrayList(); // Check for new tasks to be executed on the groom server if (acceptNewTasks) { @@ -420,19 +461,101 @@ if (groomStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + groomName); } else { - // TODO - assignTasks should be implemented - /* - * List tasks = taskScheduler.assignTasks(groomStatus); for(Task - * task : tasks) { if(tasks != null) { LOG.debug(groomName + - * "-> LaunchTask: " + task.getTaskID()); actions.add(new - * LaunchTaskAction(task)); } } - */ + List tasks = taskScheduler.assignTasks(groomStatus); + for (Task task : tasks) { + if (tasks != null) { + expireLaunchingTasks.addNewTask(task.getTaskID()); + actions.add(new LaunchTaskAction(task)); + } + } } } + // Check for tasks to be killed + List killTasksList = getTasksToKill(groomName); + if (killTasksList != null) { + actions.addAll(killTasksList); + } + + response.setActions(actions.toArray(new GroomServerAction[actions.size()])); + + groomToHeartbeatResponseMap.put(groomName, response); + removeMarkedTasks(groomName); + return response; } + + // (trackerID -> TreeSet of completed taskids running at that tracker) + TreeMap> trackerToMarkedTasksMap = new TreeMap(); + + private void removeMarkedTasks(String groomName) { + // Purge all the 'marked' tasks which were running at taskTracker + TreeSet markedTaskSet = + (TreeSet) trackerToMarkedTasksMap.get(groomName); + if (markedTaskSet != null) { + for (String taskid : markedTaskSet) { + removeTaskEntry(taskid); + LOG.info("Removed completed task '" + taskid + "' from '" + + groomName + "'"); + } + // Clear + trackerToMarkedTasksMap.remove(groomName); + } + } + + private void removeTaskEntry(String taskid) { + // taskid --> tracker + String tracker = taskidToTrackerMap.remove(taskid); + + // tracker --> taskid + if (tracker != null) { + TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker); + if (trackerSet != null) { + trackerSet.remove(taskid); + } + } + + // taskid --> TIP + taskidToTIPMap.remove(taskid); + + LOG.debug("Removing task '" + taskid + "'"); + } + + private List getTasksToKill(String groomName) { + Set taskIds = (TreeSet) trackerToTaskMap.get(groomName); + if (taskIds != null) { + List killList = new ArrayList(); + Set killJobIds = new TreeSet(); + for (String killTaskId : taskIds ) { + TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId); + if (tip.shouldCloseForClosedJob(killTaskId)) { + // + // This is how the JobTracker ends a task at the TaskTracker. + // It may be successfully completed, or may be killed in + // mid-execution. + // + if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) { + killList.add(new KillTaskAction(killTaskId)); + LOG.debug(groomName + " -> KillTaskAction: " + killTaskId); + } else { + String killJobId = tip.getJob().getStatus().getJobID().getJtIdentifier(); + killJobIds.add(killJobId); + } + } + } + + for (String killJobId : killJobIds) { + killList.add(new KillJobAction(killJobId)); + LOG.debug(groomName + " -> KillJobAction: " + killJobId); + } + + return killList; + } + return null; + + } + /** * Process incoming heartbeat messages from the groom. */ @@ -463,17 +586,15 @@ } @Override - public JobStatus submitJob(BSPJobID jobId) throws IOException { - LOG.info("Submitted a job (" + jobId + ")"); - if (jobs.containsKey(jobId)) { + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { + if (jobs.containsKey(jobID)) { // job already running, don't start twice - LOG.info("The job (" + jobId + ") is already subbmitted"); - return jobs.get(jobId).getStatus(); + LOG.info("The job (" + jobID + ") is already subbmitted"); + return jobs.get(jobID).getStatus(); } - JobInProgress job = new JobInProgress(jobId, this, this.conf); - - return addJob(jobId, job); + JobInProgress job = new JobInProgress(jobID, this, this.conf); + return addJob(jobID, job); } @Override @@ -502,7 +623,6 @@ jobs.put(job.getProfile().getJobID(), job); taskScheduler.addJob(job); } - return job.getStatus(); } @@ -589,9 +709,56 @@ this.interTrackerServer.stop(); } - @Override - public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { - // TODO Auto-generated method stub - return null; + TreeMap taskidToTrackerMap = new TreeMap(); + TreeMap> trackerToTaskMap = new TreeMap>(); + Map taskidToTIPMap = new TreeMap(); + + public void createTaskEntry(String taskid, String groomServer, + TaskInProgress taskInProgress) { + LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId() + ", for tracker '" + groomServer + "'"); + /* + // taskid --> groom + taskidToTrackerMap.put(taskid, groomServer); + // groom --> taskid + TreeSet taskset = null; + if(trackerToTaskMap.entrySet().size() > 0) { + taskset = trackerToTaskMap.get(groomServer); + LOG.info(taskset.size()); + LOG.info(taskset.size()); + LOG.info(taskset.size()); + } + + if (taskset == null) { + taskset = new TreeSet(); + trackerToTaskMap.put(groomServer, taskset); + } + taskset.add(taskid); + taskidToTIPMap.put(taskid, taskInProgress); + + LOG.info("" + taskidToTrackerMap); + LOG.info("" + taskidToTIPMap); + */ } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); + if (args.length != 0) { + System.out.println("usage: HamaMaster"); + System.exit(-1); + } + + try { + HamaConfiguration conf = new HamaConfiguration(); + conf.addResource(new Path( + "/home/edward/workspace/hama-trunk/conf/hama-default.xml")); + conf.addResource(new Path( + "/home/edward/workspace/hama-trunk/conf/hama-site.xml")); + + BSPMaster master = startMaster(conf); + master.offerService(); + } catch (Throwable e) { + LOG.fatal(StringUtils.stringifyException(e)); + System.exit(-1); + } + } } Index: src/java/org/apache/hama/bsp/BSPRunner.java =================================================================== --- src/java/org/apache/hama/bsp/BSPRunner.java (리비전 958440) +++ src/java/org/apache/hama/bsp/BSPRunner.java (작업 사본) @@ -13,10 +13,13 @@ private BSPPeer bspPeer; private Configuration conf; private BSP bsp; - + private boolean isDone; + public void run() { try { + isDone = bspPeer.leaveBarrier(); bsp.bsp(bspPeer); + isDone = bspPeer.enterBarrier(); } catch (Exception e) { LOG.error(e); } @@ -39,4 +42,8 @@ bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class", BSP.class), conf); } + + public boolean isDone() { + return this.isDone; + } } Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPTask.java (리비전 0) @@ -0,0 +1,18 @@ +package org.apache.hama.bsp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +public class BSPTask extends Task { + + public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) { + this.jobId = jobId; + this.jobFile = jobFile; + this.taskId = taskid; + this.partition = partition; + this.runner = (BSPRunner) ReflectionUtils.newInstance( + BSPRunner.class, conf); + + } + +} Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 958440) +++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본) @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,6 +88,9 @@ Map runningTasks = null; Map runningJobs = null; + private BlockingQueue tasksToCleanup = + new LinkedBlockingQueue(); + public GroomServer(Configuration conf) throws IOException { this.conf = conf; bspMasterAddr = BSPMaster.getAddress(conf); @@ -203,6 +208,23 @@ // Send the heartbeat and process the bspmaster's directives HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); + + GroomServerAction[] actions = heartbeatResponse.getActions(); + LOG.info("Got heartbeatResponse from BSPMaster with responseId: " + + heartbeatResponse.getResponseId() + " and " + + ((actions != null) ? actions.length : 0) + " actions"); + + + if (actions != null){ + for(GroomServerAction action: actions) { + if (action instanceof LaunchTaskAction) { + startNewTask((LaunchTaskAction) action); + } else { + tasksToCleanup.put(action); + } + } + } + // // The heartbeat got through successfully! // @@ -236,8 +258,16 @@ return State.NORMAL; } + private void startNewTask(LaunchTaskAction action) { + // TODO Auto-generated method stub + Task t = action.getTask(); + LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition()); + + // TODO: execute task + + } + private HeartbeatResponse transmitHeartBeat(long now) throws IOException { - // // Check if the last heartbeat got through... // if so then build the heartbeat information for the BSPMaster; @@ -273,7 +303,9 @@ initialize(); startCleanupThreads(); boolean denied = false; + LOG.info("Why? " + running + ", " + shuttingDown + ", " + denied); while (running && !shuttingDown && !denied) { + boolean staleState = false; try { while (running && !staleState && !shuttingDown && !denied) { @@ -335,10 +367,9 @@ try { Configuration conf = new HamaConfiguration(); - conf.set("bsp.master.port", "40000"); - conf.set("bsp.groom.port", "40020"); - conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local"); - conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system"); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml")); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml")); + GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf); startGroomServer(groom); } catch (Throwable e) { Index: src/java/org/apache/hama/bsp/GroomServerStatus.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 958440) +++ src/java/org/apache/hama/bsp/GroomServerStatus.java (작업 사본) @@ -144,7 +144,7 @@ Text.writeString(out, host); out.writeInt(failures); out.writeInt(maxTasks); - out.writeInt(taskReports.size()); + out.writeInt(taskReports.size()); for(TaskStatus taskStatus : taskReports) { taskStatus.write(out); } Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440) +++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본) @@ -45,6 +45,7 @@ static final Log LOG = LogFactory.getLog(JobInProgress.class); + Configuration conf; JobProfile profile; JobStatus status; Path jobFile = null; @@ -62,6 +63,7 @@ public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf) throws IOException { + this.conf = conf; this.jobId = jobId; this.master = master; @@ -80,10 +82,10 @@ fs.copyToLocalFile(jobFile, localJobFile); BSPJobContext job = new BSPJobContext(localJobFile, jobId); - System.out.println("user:" + job.getUser()); - System.out.println("jobId:" + jobId); - System.out.println("jobFile:" + jobFile.toString()); - System.out.println("jobName:" + job.getJobName()); + LOG.info("user:" + job.getUser()); + LOG.info("jobId:" + jobId); + LOG.info("jobFile:" + jobFile.toString()); + LOG.info("jobName:" + job.getJobName()); this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job .getJobName()); @@ -135,7 +137,16 @@ // /////////////////////////////////////////////////// public synchronized Task obtainNewTask(GroomServerStatus status, int clusterSize, int numUniqueHosts) { + Task result = null; + try { + result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this, + numUniqueHosts).getTaskToRun(status); + LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition()); + + } catch (IOException e) { + e.printStackTrace(); + } - return null; + return result; } } Index: src/java/org/apache/hama/bsp/KillJobAction.java =================================================================== --- src/java/org/apache/hama/bsp/KillJobAction.java (리비전 958440) +++ src/java/org/apache/hama/bsp/KillJobAction.java (작업 사본) @@ -21,6 +21,8 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Text; + /** * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup @@ -28,30 +30,30 @@ * */ class KillJobAction extends GroomServerAction { - final BSPJobID jobId; + String jobId; public KillJobAction() { super(ActionType.KILL_JOB); - jobId = new BSPJobID(); + jobId = new String(); } - public KillJobAction(BSPJobID jobId) { + public KillJobAction(String killJobId) { super(ActionType.KILL_JOB); - this.jobId = jobId; + this.jobId = killJobId; } - public BSPJobID getJobID() { + public String getJobID() { return jobId; } @Override public void write(DataOutput out) throws IOException { - jobId.write(out); + Text.writeString(out, jobId); } @Override public void readFields(DataInput in) throws IOException { - jobId.readFields(in); + jobId = Text.readString(in); } } Index: src/java/org/apache/hama/bsp/KillTaskAction.java =================================================================== --- src/java/org/apache/hama/bsp/KillTaskAction.java (리비전 958440) +++ src/java/org/apache/hama/bsp/KillTaskAction.java (작업 사본) @@ -21,36 +21,38 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Text; + /** * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} * to the {@link org.apache.hama.bsp.GroomServer} to kill a task. * */ class KillTaskAction extends GroomServerAction { - final TaskAttemptID taskId; + String taskId; public KillTaskAction() { super(ActionType.KILL_TASK); - taskId = new TaskAttemptID(); + taskId = new String(); } - public KillTaskAction(TaskAttemptID taskId) { + public KillTaskAction(String killTaskId) { super(ActionType.KILL_TASK); - this.taskId = taskId; + this.taskId = killTaskId; } - public TaskAttemptID getTaskID() { + public String getTaskID() { return taskId; } @Override public void write(DataOutput out) throws IOException { - taskId.write(out); + Text.writeString(out, taskId); } @Override public void readFields(DataInput in) throws IOException { - taskId.readFields(in); + taskId = Text.readString(in); } } Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본) @@ -9,7 +9,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.ipc.InterTrackerProtocol; import org.apache.hama.ipc.JobSubmissionProtocol; @@ -42,6 +41,7 @@ @Override public ClusterStatus getClusterStatus(boolean detailed) throws IOException { // TODO Auto-generated method stub + System.out.println("LocalJobRunner.getClusterStatus(): Executed"); return null; } @@ -102,12 +102,6 @@ } } - @Override - public JobStatus submitJob(BSPJobID jobName) throws IOException { - // TODO Auto-generated method stub - return null; - } - /** * Local Job */ @@ -116,12 +110,14 @@ private Configuration conf; private int NUM_PEER; private BSPJob job; + private String jobFile; private boolean threadDone = false; - private HashMap tasks = new HashMap(); + private HashMap tasks = new HashMap(); public Job(BSPJobID jobID, String jobFile, Configuration conf) throws IOException { this.conf = conf; + this.jobFile = jobFile; this.NUM_PEER = conf.getInt("bsp.peers.num", 0); LOG.info("LocalJobRunner: " + jobID + ", " + jobFile); this.job = new BSPJob(jobID, jobFile); @@ -158,17 +154,17 @@ TaskID tID; for (int i = 0; i < NUM_PEER; i++) { this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); - BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance( - BSPRunner.class, this.conf); tID = new TaskID(job.getJobID(), false, i); - tasks.put(tID.toString(), runner); + + Task bspRunner = new BSPTask(jobFile, jobFile, jobFile, i, this.conf); + tasks.put(tID.toString(), bspRunner); } - for (Map.Entry e : tasks.entrySet()) { - e.getValue().start(); + for (Map.Entry e : tasks.entrySet()) { + e.getValue().runner.start(); } - for (Map.Entry e : tasks.entrySet()) { + for (Map.Entry e : tasks.entrySet()) { try { e.getValue().join(); } catch (InterruptedException e1) { Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 958440) +++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (작업 사본) @@ -54,13 +54,11 @@ public List assignTasks(GroomServerStatus groomStatus) throws IOException { ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false); - + final int numGroomServers = clusterStatus.getGroomServers(); // final int clusterTaskCapacity = clusterStatus.getMaxTasks(); - // // Get task counts for the current groom. - // // final int groomTaskCapacity = groom.getMaxTasks(); final int groomRunningTasks = groomStatus.countTasks(); @@ -73,14 +71,19 @@ // instance to the scheduler. synchronized (jobQueue) { for (JobInProgress job : jobQueue) { + /* if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } + */ Task t = null; t = job.obtainNewTask(groomStatus, numGroomServers, groomServerManager.getNumberOfUniqueHosts()); + + LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition()); + if (t != null) { assignedTasks.add(t); break; // TODO - Now, simple scheduler assigns only one task to Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (리비전 958440) +++ src/java/org/apache/hama/bsp/Task.java (작업 사본) @@ -21,6 +21,8 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -28,23 +30,28 @@ /** * */ -public class Task implements Writable { +public class Task extends Thread implements Writable { + public static final Log LOG = LogFactory.getLog(Task.class); //////////////////////////////////////////// // Fields //////////////////////////////////////////// - private String jobFile; - private TaskAttemptID taskId; - private int partition; + protected String jobId; + protected String jobFile; + protected String taskId; + protected int partition; + + protected BSPRunner runner; protected LocalDirAllocator lDirAlloc; /** * */ public Task() { - taskId = new TaskAttemptID(); + taskId = new String(); } - public Task(String jobFile, TaskAttemptID taskId, int partition) { + public Task(String jobId, String jobFile, String taskId, int partition) { + this.jobId = jobId; this.jobFile = jobFile; this.taskId = taskId; @@ -62,7 +69,7 @@ return jobFile; } - public TaskAttemptID getTaskID() { + public String getTaskID() { return taskId; } @@ -70,8 +77,8 @@ * Get the job name for this task. * @return the job name */ - public BSPJobID getJobID() { - return taskId.getJobID(); + public String getJobID() { + return jobId; } /** @@ -92,15 +99,20 @@ //////////////////////////////////////////// @Override public void write(DataOutput out) throws IOException { + LOG.info(jobId +", "+ jobFile +", "+ taskId +", "+ partition); + + Text.writeString(out, jobId); Text.writeString(out, jobFile); - taskId.write(out); + Text.writeString(out, taskId); out.writeInt(partition); } @Override public void readFields(DataInput in) throws IOException { + jobId = Text.readString(in); jobFile = Text.readString(in); - taskId.readFields(in); + taskId = Text.readString(in); partition = in.readInt(); } + } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 958440) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본) @@ -17,10 +17,14 @@ */ package org.apache.hama.bsp; +import java.io.IOException; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobStatus; /** * @@ -28,7 +32,7 @@ class TaskInProgress { public static final Log LOG = LogFactory.getLog(TaskInProgress.class); - private BSPJobContext context; + private Configuration conf; // Constants static final int MAX_TASK_EXECS = 1; @@ -58,7 +62,7 @@ // Map from task Id -> GroomServer Id, contains tasks that are // currently runnings - private TreeMap activeTasks = new TreeMap(); + private TreeMap activeTasks = new TreeMap(); // All attempt Ids of this TIP // private TreeSet tasks = new TreeSet(); /** @@ -66,15 +70,44 @@ */ private TreeMap taskStatuses = new TreeMap(); + private BSPJobID jobId; + public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master, - BSPJobContext context, JobInProgress job, int partition) { + Configuration conf, JobInProgress job, int partition) { + this.jobId = jobId; this.jobFile = jobFile; this.bspMaster = master; this.job = job; - this.context = context; + this.conf = conf; this.partition = partition; + + this.id = new TaskID(jobId, true, partition); } + /** + * Return a Task that can be sent to a GroomServer for execution. + */ + public Task getTaskToRun(GroomServerStatus status) throws IOException { + Task t = null; + + String taskid = null; + if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) { + taskid = new String("task_" + nextTaskId); + ++nextTaskId; + } else { + LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) + + " attempts for the tip '" + getTIPId() + "'"); + return null; + } + + t = new BSPTask(jobId.getJtIdentifier(), jobFile, taskid, partition, conf); + activeTasks.put(taskid, status.getGroomName()); + + // Ask JobTracker to note that the task exists + bspMaster.createTaskEntry(taskid, status.getGroomName(), this); + return t; + } + // ////////////////////////////////// // Accessors // ////////////////////////////////// @@ -123,4 +156,18 @@ public synchronized boolean isComplete() { return (completes > 0); } + + private TreeSet tasksReportedClosed = new TreeSet(); + + public boolean shouldCloseForClosedJob(String taskid) { + TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); + if ((ts != null) && + (! tasksReportedClosed.contains(taskid)) && + (job.getStatus().getRunState() != JobStatus.RUNNING)) { + tasksReportedClosed.add(taskid); + return true; + } else { + return false; + } + } } Index: src/java/org/apache/hama/bsp/TaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 958440) +++ src/java/org/apache/hama/bsp/TaskScheduler.java (작업 사본) @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.mortbay.log.Log; /** * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer} @@ -43,6 +44,7 @@ public synchronized void setGroomServerManager( GroomServerManager groomServerManager) { + Log.info("TaskScheduler.setGroomServermanager()"); this.groomServerManager = groomServerManager; } Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -44,11 +44,14 @@ * that job. * The job files should be submitted in system-dir/jobName. * - * @param jobName + * @param jobID + * @param jobFile * @return jobStatus * @throws IOException */ - public JobStatus submitJob(BSPJobID jobName) throws IOException; + //public JobStatus submitJob(BSPJobID jobName) throws IOException; + + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; /** * Get the current status of the cluster @@ -104,7 +107,5 @@ */ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; - - JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; } Index: src/java/org/apache/hama/util/ClusterUtil.java =================================================================== --- src/java/org/apache/hama/util/ClusterUtil.java (리비전 958440) +++ src/java/org/apache/hama/util/ClusterUtil.java (작업 사본) @@ -4,6 +4,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPMaster; import org.apache.hama.bsp.GroomServer; import org.apache.log4j.Logger; @@ -76,7 +77,7 @@ public static String startup(final BSPMaster m, final List groomservers, Configuration conf) throws IOException, InterruptedException { if (m != null) { - m.start(); + m.startMaster((HamaConfiguration) conf); } if (groomservers != null) {