Index: src/examples/org/apache/hama/examples/PiEstimator.java =================================================================== --- src/examples/org/apache/hama/examples/PiEstimator.java (리비전 1024096) +++ src/examples/org/apache/hama/examples/PiEstimator.java (작업 사본) @@ -54,7 +54,7 @@ } } - byte[] tagName = Bytes.toBytes(getName().toString()); + byte[] tagName = Bytes.toBytes(bspPeer.getHostName()); byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations); BSPMessage estimate = new BSPMessage(tagName, myData); Index: src/java/org/apache/hama/bsp/BSP.java =================================================================== --- src/java/org/apache/hama/bsp/BSP.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/BSP.java (작업 사본) @@ -20,7 +20,7 @@ /** * This class provides an abstract implementation of the BSP interface */ -public abstract class BSP extends Thread implements BSPInterface { +public abstract class BSP implements BSPInterface { private BSPPeer bspPeer; /** @@ -32,8 +32,4 @@ public void runBSP() throws Exception { bsp(bspPeer); } - - public void setPeer(BSPPeer bspServer) { - this.bspPeer = bspServer; - } } Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/BSPTask.java (작업 사본) @@ -17,26 +17,21 @@ */ package org.apache.hama.bsp; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; - public class BSPTask extends Task { - private BSP bsp; - private Configuration conf; - - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) { + + public BSPTask() { + } + + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) { this.jobId = jobId; this.jobFile = jobFile; this.taskId = taskid; this.partition = partition; - this.conf = conf; } - public BSP getBSPClass() { - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class", - BSP.class), conf); - - return bsp; + @Override + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) { + return new BSPTaskRunner(this, bspPeer, conf); } } Index: src/java/org/apache/hama/bsp/BSPTaskRunner.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTaskRunner.java (리비전 0) +++ src/java/org/apache/hama/bsp/BSPTaskRunner.java (리비전 0) @@ -0,0 +1,45 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.zookeeper.KeeperException; + +public class BSPTaskRunner extends Thread { + + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class); + private Task task; + private BSPJob conf; + private BSPPeer bspPeer; + + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) { + this.task = bspTask; + this.conf = conf; + this.bspPeer = bspPeer; + } + + public Task getTask() { + return task; + } + + public void run() { + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass( + "bsp.work.class", BSP.class), conf.getConf()); + + try { + bsp.bsp(bspPeer); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (KeeperException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} Index: src/java/org/apache/hama/bsp/GroomServer.java =================================================================== --- src/java/org/apache/hama/bsp/GroomServer.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/GroomServer.java (작업 사본) @@ -23,9 +23,11 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.RunJar; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hama.Constants; @@ -50,6 +52,7 @@ public class GroomServer implements Runnable { public static final Log LOG = LogFactory.getLog(GroomServer.class); private static BSPPeer bspPeer; + static final String SUBDIR = "groomServer"; Configuration conf; @@ -281,12 +284,114 @@ } try { + localizeJob(tip); + } catch (Throwable e) { + String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils + .stringifyException(e)); + LOG.warn(msg); + } + } + + private void localizeJob(TaskInProgress tip) throws IOException { + Task task = tip.getTask(); + conf.addResource(task.getJobFile()); + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); + + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" + + task.getTaskID() + "/" + "job.xml"); + + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip); + BSPJob jobConf = null; + + synchronized (rjob) { + if (!rjob.localized) { + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" + + task.getTaskID() + "/" + "job.jar"); + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar")); + + HamaConfiguration conf = new HamaConfiguration(); + conf.addResource(localJobFile); + jobConf = new BSPJob(conf, task.getJobID().toString()); + jobConf.setJar(localJarFile.toString()); + + if (jarFile != null) { + systemFS.copyToLocalFile(jarFile, localJarFile); + + // also unjar the job.jar files in workdir + File workDir = new File( + new File(localJobFile.toString()).getParent(), "work"); + if (!workDir.mkdirs()) { + if (!workDir.isDirectory()) { + throw new IOException("Mkdirs failed to create " + + workDir.toString()); + } + } + RunJar.unJar(new File(localJarFile.toString()), workDir); + } + rjob.localized = true; + } + } + launchTaskForJob(tip, jobConf); + } + + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) { + try { + tip.setJobConf(jobConf); tip.launchTask(); } catch (Throwable ie) { - // TODO: when job failed. + tip.taskStatus.setRunState(TaskStatus.State.FAILED); + String error = StringUtils.stringifyException(ie); + LOG.info(error); } } + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile, + TaskInProgress tip) { + synchronized (runningJobs) { + RunningJob rJob = null; + if (!runningJobs.containsKey(jobId)) { + rJob = new RunningJob(jobId, localJobFile); + rJob.localized = false; + rJob.tasks = new HashSet(); + rJob.jobFile = localJobFile; + runningJobs.put(jobId, rJob); + } else { + rJob = runningJobs.get(jobId); + } + rJob.tasks.add(tip); + return rJob; + } + } + + /** + * The datastructure for initializing a job + */ + static class RunningJob { + private BSPJobID jobid; + private Path jobFile; + // keep this for later use + Set tasks; + boolean localized; + boolean keepJobFiles; + + RunningJob(BSPJobID jobid, Path jobFile) { + this.jobid = jobid; + localized = false; + tasks = new HashSet(); + this.jobFile = jobFile; + keepJobFiles = false; + } + + Path getJobFile() { + return jobFile; + } + + BSPJobID getJobId() { + return jobid; + } + } + private HeartbeatResponse transmitHeartBeat(long now) throws IOException { // // Check if the last heartbeat got through... @@ -410,6 +515,8 @@ // ///////////////////////////////////////////////////// class TaskInProgress { Task task; + BSPJob jobConf; + private BSPTaskRunner runner; volatile boolean done = false; volatile boolean wasKilled = false; private TaskStatus taskStatus; @@ -421,61 +528,29 @@ TaskStatus.Phase.STARTING); } - static final String SUBDIR = "groomServer"; + public void setJobConf(BSPJob jobConf) { + this.jobConf = jobConf; + } - public void launchTask() { + public void launchTask() throws IOException { taskStatus.setRunState(TaskStatus.State.RUNNING); + this.runner = task.createRunner(bspPeer, this.jobConf); + this.runner.start(); - try { - // TODO: need to move this code to TaskRunner - - task.getJobFile(); - conf.addResource(task.getJobFile()); - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf); - - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/" - + task.getTaskID() + "/" + "job.xml"); - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" - + task.getTaskID() + "/" + "job.jar"); - - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml", - ".jar")), localJarFile); - - HamaConfiguration conf = new HamaConfiguration(); - conf.addResource(localJobFile); - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString()); - jobConf.setJar(localJarFile.toString()); - - BSP bsp = (BSP) ReflectionUtils - .newInstance(jobConf.getBspClass(), conf); - bsp.setPeer(bspPeer); + // Check state of Task + while (true) { try { - bsp.runBSP(); - } catch (Exception e) { + Thread.sleep(1000); + } catch (InterruptedException e) { e.printStackTrace(); - taskStatus.setRunState(TaskStatus.State.FAILED); } - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - - while (true) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - // If local/outgoing queues are empty, task is done. - if (bspPeer.localQueue.size() == 0 - && bspPeer.outgoingQueues.size() == 0) { - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); - acceptNewTasks = true; - break; - } + // If local/outgoing queues are empty, task is done. + if (bspPeer.localQueue.size() == 0 + && bspPeer.outgoingQueues.size() == 0) { + taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + acceptNewTasks = true; + break; } } Index: src/java/org/apache/hama/bsp/LaunchTaskAction.java =================================================================== --- src/java/org/apache/hama/bsp/LaunchTaskAction.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/LaunchTaskAction.java (작업 사본) @@ -47,7 +47,7 @@ } public void readFields(DataInput in) throws IOException { - task = new Task(); + task = new BSPTask(); task.readFields(in); } Index: src/java/org/apache/hama/bsp/LocalJobRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalJobRunner.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/LocalJobRunner.java (작업 사본) @@ -172,7 +172,7 @@ try { GroomServer servers = new GroomServer(conf); - Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf); + Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i); // TODO not yet implemented Index: src/java/org/apache/hama/bsp/Task.java =================================================================== --- src/java/org/apache/hama/bsp/Task.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/Task.java (작업 사본) @@ -23,14 +23,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -/** - * - */ -public class Task implements Writable { +public abstract class Task implements Writable { public static final Log LOG = LogFactory.getLog(Task.class); //////////////////////////////////////////// // Fields @@ -109,5 +107,7 @@ taskId = Text.readString(in); partition = in.readInt(); } + + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf); } Index: src/java/org/apache/hama/bsp/TaskInProgress.java =================================================================== --- src/java/org/apache/hama/bsp/TaskInProgress.java (리비전 1024107) +++ src/java/org/apache/hama/bsp/TaskInProgress.java (작업 사본) @@ -101,7 +101,7 @@ return null; } - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf); + t = new BSPTask(jobId, jobFile, taskid, partition); activeTasks.put(taskid, status.getGroomName()); // Ask JobTracker to note that the task exists