Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 958440) +++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본) @@ -87,8 +87,8 @@ // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally - conf.set("bsp.master.address", "local"); - + conf.set("bsp.master.address", "localhost:40000"); + BSPJob bsp = new BSPJob(conf, PiEstimator.class); // Set the job name bsp.setJobName("pi estimation example"); Index: src/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- src/java/org/apache/hama/bsp/BSPJobClient.java (리비전 958440) +++ src/java/org/apache/hama/bsp/BSPJobClient.java (작업 사본) @@ -19,7 +19,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; import javax.security.auth.login.LoginException; @@ -35,7 +34,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hama.bsp.BSPMaster; import org.apache.hama.ipc.JobSubmissionProtocol; public class BSPJobClient extends Configured { @@ -183,17 +181,12 @@ if ("local".equals(master)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf); + this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory( + conf, JobSubmissionProtocol.class)); } } - private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, - Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory( - conf, JobSubmissionProtocol.class)); - } - /** * Close the JobClient. */ @@ -366,7 +359,11 @@ // // Now, actually submit the job (using the submit name) // + System.out.println(jobSubmitClient.getFilesystemName()); + System.out.println(jobSubmitClient.getClusterStatus(true)); JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile.toString()); + System.out.println(jobId.toString() + ", " + submitJobFile.toString()); + System.out.println(status); if (status != null) { return new NetworkedJob(status); } else { Index: src/java/org/apache/hama/bsp/BSPMaster.java =================================================================== --- src/java/org/apache/hama/bsp/BSPMaster.java (리비전 958454) +++ src/java/org/apache/hama/bsp/BSPMaster.java (작업 사본) @@ -49,7 +49,7 @@ * BSPMaster is responsible to control all the groom servers and to manage bsp * jobs. */ -public class BSPMaster extends Thread implements JobSubmissionProtocol, InterTrackerProtocol, +public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol, GroomServerManager { static { @@ -228,7 +228,7 @@ return conf.getLocalPath("bsp.local.dir", pathString); } - public BSPMaster startMaster() throws IOException, + public static BSPMaster startMaster(HamaConfiguration conf) throws IOException, InterruptedException { return startTracker(conf, generateNewIdentifier()); } @@ -238,7 +238,8 @@ BSPMaster result = null; result = new BSPMaster(conf, identifier); - + result.taskScheduler.setGroomServerManager(result); + return result; } @@ -279,37 +280,6 @@ LOG.info("Stopped interTrackerServer"); } - public static void main(String[] args) { - StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); - if (args.length != 0) { - System.out.println("usage: HamaMaster"); - System.exit(-1); - } - - try { - HamaConfiguration conf = new HamaConfiguration(); - conf.set("bsp.local.dir", conf.get("hama.tmp.dir") + "/bsp/local"); - - BSPMaster master = BSPMaster.constructMaster(BSPMaster.class, conf); - master.start(); - } catch (Throwable e) { - LOG.fatal(StringUtils.stringifyException(e)); - System.exit(-1); - } - } - - public void run() { - try { - offerService(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - // ////////////////////////////////////////////////// // GroomServerManager // ////////////////////////////////////////////////// @@ -330,8 +300,7 @@ @Override public JobInProgress getJob(BSPJobID jobid) { - // TODO Auto-generated method stub - return null; + return jobs.get(jobid); } @Override @@ -343,13 +312,12 @@ @Override public int getNumberOfUniqueHosts() { // TODO Auto-generated method stub - return 0; + return 1; } @Override public Collection grooms() { - // TODO Auto-generated method stub - return null; + return groomServers.values(); } @Override @@ -412,7 +380,7 @@ } HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); - // List actions = new ArrayList(); + List actions = new ArrayList(); // Check for new tasks to be executed on the groom server if (acceptNewTasks) { @@ -420,13 +388,15 @@ if (groomStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + groomName); } else { - // TODO - assignTasks should be implemented - /* - * List tasks = taskScheduler.assignTasks(groomStatus); for(Task - * task : tasks) { if(tasks != null) { LOG.debug(groomName + - * "-> LaunchTask: " + task.getTaskID()); actions.add(new - * LaunchTaskAction(task)); } } - */ + List tasks = taskScheduler.assignTasks(groomStatus); + LOG.info("heartbeat(), tasks.size(): " + tasks.size()); + for(Task task : tasks) { + if(tasks != null) { + LOG.debug(groomName + "-> LaunchTask: " + task.getTaskID()); + actions.add(new LaunchTaskAction(task)); + LOG.info("heartbeat(), actions.size(): " + actions.size()); + } + } } } @@ -463,19 +433,17 @@ } @Override - public JobStatus submitJob(BSPJobID jobId) throws IOException { - LOG.info("Submitted a job (" + jobId + ")"); - if (jobs.containsKey(jobId)) { + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { + if (jobs.containsKey(jobID)) { // job already running, don't start twice - LOG.info("The job (" + jobId + ") is already subbmitted"); - return jobs.get(jobId).getStatus(); + LOG.info("The job (" + jobID + ") is already subbmitted"); + return jobs.get(jobID).getStatus(); } - - JobInProgress job = new JobInProgress(jobId, this, this.conf); - - return addJob(jobId, job); + + JobInProgress job = new JobInProgress(jobID, this, this.conf); + return addJob(jobID, job); } - + @Override public ClusterStatus getClusterStatus(boolean detailed) { synchronized (groomServers) { @@ -502,7 +470,6 @@ jobs.put(job.getProfile().getJobID(), job); taskScheduler.addJob(job); } - return job.getStatus(); } @@ -589,9 +556,31 @@ this.interTrackerServer.stop(); } - @Override - public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException { - // TODO Auto-generated method stub - return null; + public void createTaskEntry(TaskAttemptID taskAttemptID, GroomServerStatus status, + TaskInProgress taskInProgress) { + LOG.info("Adding task '" + taskAttemptID.getId() + "', for tracker '" + status.getGroomName() + "'"); + + + } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG); + if (args.length != 0) { + System.out.println("usage: HamaMaster"); + System.exit(-1); + } + + try { + HamaConfiguration conf = new HamaConfiguration(); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml")); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml")); + + BSPMaster master = startMaster(conf); + master.offerService(); + } catch (Throwable e) { + LOG.fatal(StringUtils.stringifyException(e)); + System.exit(-1); + } + } } Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPTask.java (리비전 0) @@ -0,0 +1,14 @@ +package org.apache.hama.bsp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; + +public class BSPTask extends Task { + + public BSPTask(Configuration conf) { + this.runner = (BSPRunner) ReflectionUtils.newInstance( + BSPRunner.class, conf); + + } + +} Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 958440) +++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본) @@ -335,10 +335,9 @@ try { Configuration conf = new HamaConfiguration(); - conf.set("bsp.master.port", "40000"); - conf.set("bsp.groom.port", "40020"); - conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local"); - conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system"); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml")); + conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml")); + GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf); startGroomServer(groom); } catch (Throwable e) { Index: src/java/org/apache/hama/bsp/GroomServerStatus.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServerStatus.java (리비전 958440) +++ src/java/org/apache/hama/bsp/GroomServerStatus.java (작업 사본) @@ -144,7 +144,7 @@ Text.writeString(out, host); out.writeInt(failures); out.writeInt(maxTasks); - out.writeInt(taskReports.size()); + out.writeInt(taskReports.size()); for(TaskStatus taskStatus : taskReports) { taskStatus.write(out); } Index: src/java/org/apache/hama/bsp/JobInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/JobInProgress.java (리비전 958440) +++ src/java/org/apache/hama/bsp/JobInProgress.java (작업 사본) @@ -45,6 +45,7 @@ static final Log LOG = LogFactory.getLog(JobInProgress.class); + Configuration conf; JobProfile profile; JobStatus status; Path jobFile = null; @@ -62,6 +63,7 @@ public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf) throws IOException { + this.conf = conf; this.jobId = jobId; this.master = master; @@ -80,10 +82,10 @@ fs.copyToLocalFile(jobFile, localJobFile); BSPJobContext job = new BSPJobContext(localJobFile, jobId); - System.out.println("user:" + job.getUser()); - System.out.println("jobId:" + jobId); - System.out.println("jobFile:" + jobFile.toString()); - System.out.println("jobName:" + job.getJobName()); + LOG.info("user:" + job.getUser()); + LOG.info("jobId:" + jobId); + LOG.info("jobFile:" + jobFile.toString()); + LOG.info("jobName:" + job.getJobName()); this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job .getJobName()); @@ -135,7 +137,14 @@ // /////////////////////////////////////////////////// public synchronized Task obtainNewTask(GroomServerStatus status, int clusterSize, int numUniqueHosts) { + Task result = null; + try { + result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this, + numUniqueHosts).getTaskToRun(status); + } catch (IOException e) { + e.printStackTrace(); + } - return null; + return result; } } Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 958454) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본) @@ -9,7 +9,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; import org.apache.hama.ipc.InterTrackerProtocol; import org.apache.hama.ipc.JobSubmissionProtocol; @@ -42,6 +41,7 @@ @Override public ClusterStatus getClusterStatus(boolean detailed) throws IOException { // TODO Auto-generated method stub + System.out.println("LocalJobRunner.getClusterStatus(): Executed"); return null; } @@ -102,12 +102,6 @@ } } - @Override - public JobStatus submitJob(BSPJobID jobName) throws IOException { - // TODO Auto-generated method stub - return null; - } - /** * Local Job */ @@ -116,12 +110,14 @@ private Configuration conf; private int NUM_PEER; private BSPJob job; + private String jobFile; private boolean threadDone = false; - private HashMap tasks = new HashMap(); + private HashMap tasks = new HashMap(); public Job(BSPJobID jobID, String jobFile, Configuration conf) throws IOException { this.conf = conf; + this.jobFile = jobFile; this.NUM_PEER = conf.getInt("bsp.peers.num", 0); LOG.info("LocalJobRunner: " + jobID + ", " + jobFile); this.job = new BSPJob(jobID, jobFile); @@ -158,17 +154,17 @@ TaskID tID; for (int i = 0; i < NUM_PEER; i++) { this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i)); - BSPRunner runner = (BSPRunner) ReflectionUtils.newInstance( - BSPRunner.class, this.conf); tID = new TaskID(job.getJobID(), false, i); - tasks.put(tID.toString(), runner); + + Task bspRunner = new BSPTask(this.conf); + tasks.put(tID.toString(), bspRunner); } - for (Map.Entry e : tasks.entrySet()) { - e.getValue().start(); + for (Map.Entry e : tasks.entrySet()) { + e.getValue().runner.start(); } - for (Map.Entry e : tasks.entrySet()) { + for (Map.Entry e : tasks.entrySet()) { try { e.getValue().join(); } catch (InterruptedException e1) { Index: src/java/org/apache/hama/bsp/SimpleTaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (리비전 958440) +++ src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (작업 사본) @@ -54,13 +54,11 @@ public List assignTasks(GroomServerStatus groomStatus) throws IOException { ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false); - + final int numGroomServers = clusterStatus.getGroomServers(); // final int clusterTaskCapacity = clusterStatus.getMaxTasks(); - // // Get task counts for the current groom. - // // final int groomTaskCapacity = groom.getMaxTasks(); final int groomRunningTasks = groomStatus.countTasks(); @@ -73,9 +71,11 @@ // instance to the scheduler. synchronized (jobQueue) { for (JobInProgress job : jobQueue) { + /* if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } + */ Task t = null; Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (리비전 958440) +++ src/java/org/apache/hama/bsp/Task.java (작업 사본) @@ -28,14 +28,16 @@ /** * */ -public class Task implements Writable { +public class Task extends Thread implements Writable { //////////////////////////////////////////// // Fields //////////////////////////////////////////// + private String jobFile; private TaskAttemptID taskId; private int partition; + protected BSPRunner runner; protected LocalDirAllocator lDirAlloc; /** * @@ -103,4 +105,5 @@ taskId.readFields(in); partition = in.readInt(); } + } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 958440) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본) @@ -17,10 +17,12 @@ */ package org.apache.hama.bsp; +import java.io.IOException; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; /** * @@ -28,7 +30,7 @@ class TaskInProgress { public static final Log LOG = LogFactory.getLog(TaskInProgress.class); - private BSPJobContext context; + private Configuration conf; // Constants static final int MAX_TASK_EXECS = 1; @@ -67,14 +69,28 @@ private TreeMap taskStatuses = new TreeMap(); public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master, - BSPJobContext context, JobInProgress job, int partition) { + Configuration conf, JobInProgress job, int partition) { this.jobFile = jobFile; this.bspMaster = master; this.job = job; - this.context = context; + this.conf = conf; this.partition = partition; } + /** + * Return a Task that can be sent to a GroomServer for execution. + */ + public Task getTaskToRun(GroomServerStatus status) throws IOException { + Task t = null; + t = new BSPTask(conf); + + activeTasks.put(t.getTaskID(), status.getGroomName()); + + // Ask JobTracker to note that the task exists + bspMaster.createTaskEntry(t.getTaskID(), status, this); + return t; + } + // ////////////////////////////////// // Accessors // ////////////////////////////////// Index: src/java/org/apache/hama/bsp/TaskScheduler.java =================================================================== --- src/java/org/apache/hama/bsp/TaskScheduler.java (리비전 958440) +++ src/java/org/apache/hama/bsp/TaskScheduler.java (작업 사본) @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.mortbay.log.Log; /** * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer} @@ -43,6 +44,7 @@ public synchronized void setGroomServerManager( GroomServerManager groomServerManager) { + Log.info("TaskScheduler.setGroomServermanager()"); this.groomServerManager = groomServerManager; } Index: src/java/org/apache/hama/ipc/JobSubmissionProtocol.java =================================================================== --- src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (리비전 958440) +++ src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (작업 사본) @@ -44,11 +44,14 @@ * that job. * The job files should be submitted in system-dir/jobName. * - * @param jobName + * @param jobID + * @param jobFile * @return jobStatus * @throws IOException */ - public JobStatus submitJob(BSPJobID jobName) throws IOException; + //public JobStatus submitJob(BSPJobID jobName) throws IOException; + + public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; /** * Get the current status of the cluster @@ -104,7 +107,5 @@ */ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException; - - JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException; } Index: src/java/org/apache/hama/util/ClusterUtil.java =================================================================== --- src/java/org/apache/hama/util/ClusterUtil.java (리비전 958440) +++ src/java/org/apache/hama/util/ClusterUtil.java (작업 사본) @@ -4,6 +4,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPMaster; import org.apache.hama.bsp.GroomServer; import org.apache.log4j.Logger; @@ -76,7 +77,7 @@ public static String startup(final BSPMaster m, final List groomservers, Configuration conf) throws IOException, InterruptedException { if (m != null) { - m.start(); + m.startMaster((HamaConfiguration) conf); } if (groomservers != null) {