diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/Job.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/Job.java new file mode 100644 index 00000000000..149b136247d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/Job.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; +import org.apache.hadoop.applications.mawo.server.common.TeardownTask; +import org.apache.log4j.Logger; + +public class Job { + + private final static Logger LOG = Logger.getLogger(Job.class); + private JobId jobId; + + private Task setupTask; + private List tasks = new ArrayList(); + private TeardownTask teardownTask; + + private final AtomicInteger taskCounter = new AtomicInteger(-1); + + public TaskId getNextTaskId() { + return new TaskId(jobId, taskCounter.incrementAndGet()); + } + + public Job(MawoConfiguration conf) throws Exception { + LOG.debug("Job Constructor called"); + setJobId(JobId.newJobId()); + } + + public JobId getJobId() { + return jobId; + } + + void setJobId(JobId jobId) { + this.jobId = jobId; + } + + public List getTasks() { + return Collections.unmodifiableList(tasks); + } + + public void setTasks(List tasks) { + this.tasks = tasks; + } + + public Task getSetupTask() { + return this.setupTask; + } + + public void setSetupTask(Task setupTask) { + this.setupTask = setupTask; + } + + public TeardownTask getTeardownTask() { + return this.teardownTask; + } + + public void setTeardownTask(TeardownTask teardownTask) { + this.teardownTask = teardownTask; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilder.java new file mode 100644 index 00000000000..36ddd31b5d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilder.java @@ -0,0 +1,84 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.io.IOException; + +import org.apache.log4j.Logger; + +public abstract class JobBuilder { + + protected final static Logger LOG = Logger.getLogger(JobBuilder.class); + + public JobBuilder() { + } + + protected abstract void startJobSerializing(JobDescriptor jobDescriptor, + Job job) throws Exception; + + protected abstract void finishJobSerializing(JobDescriptor jobDescriptor, + Job job) throws Exception; + + protected abstract void startJobDeserializing(JobDescriptor jobDescriptor, + Job job) throws Exception; + + protected abstract void finishJobDeserializing(JobDescriptor jobDescriptor, + Job job) throws Exception; + + protected abstract void serializeSetupTask(JobDescriptor jobDescriptor, + Job job) throws IOException; + + protected abstract void deserializeSetupTask(JobDescriptor jobDescriptor, + Job job) throws IOException; + + protected abstract void serializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException; + + protected abstract void deserializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException; + + protected abstract void serializeTeardownTask(JobDescriptor jobDescriptor, + Job job); + + protected abstract void deserializeTeardownTask(JobDescriptor jobDescriptor, + Job job) throws IOException; + + public void serializeJob(Job job, JobDescriptor jobDescriptor) + throws Exception { + try { + LOG.info("Starting serialization of job .."); + startJobSerializing(jobDescriptor, job); + LOG.info("Serializing setup-task .."); + serializeSetupTask(jobDescriptor, job); + LOG.info("Serializing tasks .."); + serializeTasks(jobDescriptor, job); + LOG.info("Serializing teardown-task .."); + serializeTeardownTask(jobDescriptor, job); + } finally { + finishJobSerializing(jobDescriptor, job); + LOG.info("Done serializing the job .."); + } + } + + /** + * Build a job from the jobDescriptor + * + * @param jobDescriptor + * @param job + * @throws Exception + */ + public void deserializeJob(JobDescriptor jobDescriptor, Job job) + throws Exception { + try { + LOG.info("Starting deserialization of job .."); + startJobDeserializing(jobDescriptor, job); + LOG.info("Deserializing setup-task .."); + deserializeSetupTask(jobDescriptor, job); + LOG.info("Deserializing tasks .."); + deserializeTasks(jobDescriptor, job); + LOG.info("Deserializing teardown-task .."); + deserializeTeardownTask(jobDescriptor, job); + } finally { + finishJobDeserializing(jobDescriptor, job); + LOG.info("Done deserializing the job .."); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilderFactory.java new file mode 100644 index 00000000000..7c538e5456d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobBuilderFactory.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.lang.reflect.Constructor; + +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; + +public final class JobBuilderFactory { + + public static JobBuilder createJobBuilder(MawoConfiguration mawoConf) + throws Exception { + + String className = mawoConf.getJobBuilderClass(); + Class clazz = Class.forName(className); + + Constructor constructor = clazz.getConstructor(); + return (JobBuilder) constructor.newInstance(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobDescriptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobDescriptor.java new file mode 100644 index 00000000000..c3cf31728a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/JobDescriptor.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +public class JobDescriptor { + private JobId jobId; + private String filePath; + + public JobDescriptor(JobId jobId, String filePath) { + setJobId(jobId); + setFilePath(filePath); + } + + public JobId getJobId() { + return jobId; + } + + public void setJobId(JobId jobId) { + this.jobId = jobId; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJobBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJobBuilder.java new file mode 100644 index 00000000000..ae5f1808ba2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJobBuilder.java @@ -0,0 +1,101 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TaskId; + +public class SimpleTaskJobBuilder extends JobBuilder { + + private BufferedReader reader = null; + private BufferedWriter writer = null; + + public SimpleTaskJobBuilder() { + super(); + } + + @Override + protected void startJobSerializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + String filePath = jobDescriptor.getFilePath(); + FileWriter fileWriter = new FileWriter(filePath); + writer = new BufferedWriter(fileWriter); + } + + @Override + protected void finishJobSerializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + IOUtils.closeQuietly(writer); + } + + @Override + protected void startJobDeserializing(JobDescriptor jobDescriptor, Job job) + throws FileNotFoundException { + String filePath = jobDescriptor.getFilePath(); + FileReader fileReader = new FileReader(filePath); + reader = new BufferedReader(fileReader); + } + + @Override + protected void finishJobDeserializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + IOUtils.closeQuietly(reader); + } + + @Override + protected void serializeSetupTask(JobDescriptor jobDescriptor, Job job) + throws IOException { + LOG.error(this.getClass().getName() + " doesn't support setup-tasks"); + } + + @Override + protected void deserializeSetupTask(JobDescriptor jobDescriptor, Job job) + throws IOException { + LOG.error(this.getClass().getName() + " doesn't support setup-tasks"); + } + + @Override + protected void serializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException { + for (Task task : job.getTasks()) { + writer.write(task.getTaskCmd()); + writer.newLine(); + } + } + + @Override + protected void deserializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException { + List tasks = new ArrayList(); + String line; + int index = 0; + while ((line = reader.readLine()) != null) { + if (!line.trim().isEmpty()) { + index++; + TaskId taskId = new TaskId(jobDescriptor.getJobId(), index); + Task task = new SimpleTask(taskId, null, line, 10); // 10mins hardcoded + tasks.add(task); + } + } + job.setTasks(tasks); + } + + @Override + protected void serializeTeardownTask(JobDescriptor jobDescriptor, Job job) { + LOG.error(this.getClass().getName() + " doesn't support teardown-tasks"); + } + + @Override + protected void deserializeTeardownTask(JobDescriptor jobDescriptor, Job job) { + LOG.error(this.getClass().getName() + " doesn't support teardown-tasks"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJsonJobBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJsonJobBuilder.java new file mode 100644 index 00000000000..84cd7b57609 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/master/job/SimpleTaskJsonJobBuilder.java @@ -0,0 +1,223 @@ +package org.apache.hadoop.applications.mawo.server.master.job; + +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.applications.mawo.server.common.NullTask; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TeardownTask; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +@SuppressWarnings("unchecked") +public class SimpleTaskJsonJobBuilder extends JobBuilder { + + private static final String SETUP_TASK_KEY = "SetupTask"; + private static final String TASKS_KEY = "Tasks"; + private static final String ENV_KEY = "TaskEnv"; + private static final String CMD_KEY = "TaskCmd"; + private static final String TIMEOUT_KEY = "TaskTimeout"; + private static final String TEARDOWN_TASK_KEY = "TeardownTask"; + + private FileReader reader = null; + private FileWriter writer = null; + + private JSONObject jsonObject = null; + + public SimpleTaskJsonJobBuilder() throws Exception { + super(); + } + + @Override + protected void startJobSerializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + writer = new FileWriter(jobDescriptor.getFilePath()); + jsonObject = new JSONObject(); + } + + @Override + protected void finishJobSerializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + try { + writer.write(jsonObject.toJSONString()); + } finally { + IOUtils.closeQuietly(writer); + } + } + + @Override + protected void startJobDeserializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + reader = new FileReader(jobDescriptor.getFilePath()); + JSONParser jsonParser = new JSONParser(); + jsonObject = (JSONObject) jsonParser.parse(reader); + } + + @Override + protected void finishJobDeserializing(JobDescriptor jobDescriptor, Job job) + throws Exception { + IOUtils.closeQuietly(reader); + } + + private static JSONObject createEnvJSONObject(Task task) { + JSONObject taskEnvObj = new JSONObject(); + if (task.getEnvironment() != null) { + for (Entry envEntry : task.getEnvironment().entrySet()) { + taskEnvObj.put(envEntry.getKey(), envEntry.getValue()); + } + } + return taskEnvObj; + } + + private static void readAndSetTimeoutInTask(JSONObject taskObj, Task task) + throws IOException { + Object timeoutObj = taskObj.get(TIMEOUT_KEY); + if (timeoutObj != null) { + long timeoutValue; + if (timeoutObj instanceof Long) { + timeoutValue = (Long) timeoutObj; + } else if (timeoutObj instanceof Double) { + Double timeoutDoubleObj = (Double) timeoutObj; + timeoutValue = timeoutDoubleObj.longValue(); + } else { + throw new IOException("Invalid timeout value " + timeoutObj.toString()); + } + task.setTimeout(timeoutValue * 1000); // seconds to millis; + } + } + + @Override + protected void serializeSetupTask(JobDescriptor jobDescriptor, Job job) + throws IOException { + JSONObject taskObj = new JSONObject(); + Task task = job.getSetupTask(); + String taskCmd = task.getTaskCmd(); + if (taskCmd != null) { + taskObj.put(CMD_KEY, taskCmd); + } + taskObj.put(ENV_KEY, createEnvJSONObject(task)); + taskObj.put(TIMEOUT_KEY, task.getTimeout() / 1000); // millis to seconds + jsonObject.put(SETUP_TASK_KEY, taskObj); + } + + @Override + protected void deserializeSetupTask(JobDescriptor jobDescriptor, Job job) + throws IOException { + Task task; + if (jsonObject.containsKey(SETUP_TASK_KEY)) { + task = new SimpleTask(); + task.setTaskId(job.getNextTaskId()); + + JSONObject taskObj = (JSONObject) jsonObject.get(SETUP_TASK_KEY); + Object taskCmd = taskObj.get(CMD_KEY); + if (taskCmd != null) { + task.setTaskCmd((String) taskCmd); + } + Object environment = taskObj.get(ENV_KEY); + if (environment != null) { + task.setEnvironment((JSONObject) environment); + } + readAndSetTimeoutInTask(taskObj, task); + } else { + task = new NullTask(); + } + LOG.info("Setup-task: " + task); + job.setSetupTask(task); + } + + @Override + protected void serializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException { + JSONArray tasksArray = new JSONArray(); + for (Task task : job.getTasks()) { + + JSONObject taskEnvObj = createEnvJSONObject(task); + + JSONObject taskObj = new JSONObject(); + String taskCmd = task.getTaskCmd(); + if (taskCmd != null) { + taskObj.put(CMD_KEY, taskCmd); + } + taskObj.put(ENV_KEY, taskEnvObj); + taskObj.put(TIMEOUT_KEY, task.getTimeout() / 1000); // millis to seconds + tasksArray.add(taskObj); + } + jsonObject.put(TASKS_KEY, tasksArray); + } + + @Override + protected void deserializeTasks(JobDescriptor jobDescriptor, Job job) + throws IOException { + List tasks = new ArrayList(); + if (jsonObject.containsKey(TASKS_KEY)) { + JSONArray taskArray = (JSONArray) jsonObject.get(TASKS_KEY); + Iterator i = taskArray.iterator(); + while (i.hasNext()) { + JSONObject taskObj = (JSONObject) i.next(); + + Task task = new SimpleTask(); + task.setTaskId(job.getNextTaskId()); + + Object cmdObject = taskObj.get(CMD_KEY); + if (cmdObject != null) { + task.setTaskCmd((String) cmdObject); + } + + Object envObject = taskObj.get(ENV_KEY); + if (envObject != null) { + task.setEnvironment((JSONObject) envObject); + } + + readAndSetTimeoutInTask(taskObj, task); + + tasks.add(task); + } + } + job.setTasks(tasks); + } + + @Override + protected void serializeTeardownTask(JobDescriptor jobDescriptor, Job job) { + JSONObject taskObj = new JSONObject(); + Task task = job.getTeardownTask(); + String taskCmd = task.getTaskCmd(); + if (taskCmd != null) { + taskObj.put(CMD_KEY, taskCmd); + } + taskObj.put(ENV_KEY, createEnvJSONObject(task)); + taskObj.put(TIMEOUT_KEY, task.getTimeout() / 1000); // millis to seconds + jsonObject.put(TEARDOWN_TASK_KEY, taskObj); + } + + @Override + protected void deserializeTeardownTask(JobDescriptor jobDescriptor, Job job) + throws IOException { + TeardownTask task = new TeardownTask(); + task.setTaskId(job.getNextTaskId()); + if (jsonObject.containsKey(TEARDOWN_TASK_KEY)) { + JSONObject taskObj = (JSONObject) jsonObject.get(TEARDOWN_TASK_KEY); + + Object cmdObject = taskObj.get(CMD_KEY); + if (cmdObject != null) { + task.setTaskCmd((String) cmdObject); + } + + Object envObject = taskObj.get(ENV_KEY); + if (envObject != null) { + task.setEnvironment((JSONObject) envObject); + } + + readAndSetTimeoutInTask(taskObj, task); + } + LOG.info("Teardown-task: " + task); + job.setTeardownTask(task); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestJobBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestJobBuilder.java new file mode 100644 index 00000000000..c928ace6520 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/master/TestJobBuilder.java @@ -0,0 +1,135 @@ +package org.apache.hadoop.applications.mawo; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.applications.mawo.server.common.MawoConfiguration; +import org.apache.hadoop.applications.mawo.server.common.SimpleTask; +import org.apache.hadoop.applications.mawo.server.common.Task; +import org.apache.hadoop.applications.mawo.server.common.TeardownTask; +import org.apache.hadoop.applications.mawo.server.master.job.Job; +import org.apache.hadoop.applications.mawo.server.master.job.JobBuilder; +import org.apache.hadoop.applications.mawo.server.master.job.JobBuilderFactory; +import org.apache.hadoop.applications.mawo.server.master.job.JobDescriptor; +import org.apache.hadoop.applications.mawo.server.master.job.SimpleTaskJsonJobBuilder; +import org.junit.Assert; +import org.junit.Test; + +public class TestJobBuilder { + + @Test + public void testSimpleJobBuilderSerialization() throws Exception { + + MawoConfiguration mawoConf = new MawoConfiguration(); + + // Job descriptor + File localJobDescriptorFile = + new File("target", TestJobBuilder.class.getSimpleName() + + "-testSimpleJobBuilderSerialization.txt"); + + try { + //////////////////// Test serialization + Job job = new Job(mawoConf); + JobDescriptor jobDescriptor = new JobDescriptor(job.getJobId(), + localJobDescriptorFile.getAbsolutePath()); + + String setup_cmd = "setup_task_cmd"; + String task_cmd = "regular_task_cmd"; + String teardown_cmd = "teardown_task_cmd"; + + job.setSetupTask(new SimpleTask(null, null, setup_cmd, 1)); + + List tasks = new ArrayList(); + tasks.add(new SimpleTask(null, null, task_cmd, 1)); + job.setTasks(tasks); + + job.setTeardownTask(new TeardownTask(null, null, teardown_cmd, 1)); + + JobBuilder jobBuilder = JobBuilderFactory.createJobBuilder(mawoConf); + jobBuilder.serializeJob(job, jobDescriptor); + + //////////////////// Test deserialization + job = new Job(mawoConf); + jobDescriptor = new JobDescriptor(job.getJobId(), + localJobDescriptorFile.getAbsolutePath()); + jobBuilder = JobBuilderFactory.createJobBuilder(mawoConf); + jobBuilder.deserializeJob(jobDescriptor, job); + Assert.assertEquals(null, job.getSetupTask()); + Assert.assertEquals(1, job.getTasks().size()); + Assert.assertEquals(task_cmd, job.getTasks().get(0).getTaskCmd()); + Assert.assertEquals(null, job.getTeardownTask()); + + } finally { + localJobDescriptorFile.delete(); + } + } + + @Test + public void testJSONJobBuilderSerialization() throws Exception { + + MawoConfiguration mawoConf = new MawoConfiguration(); + mawoConf.getConfigsMap().put(MawoConfiguration.JOB_BUILDER_CLASS, + SimpleTaskJsonJobBuilder.class.getName()); + + // Job descriptor + File localJobDescriptorFile = + new File("target", TestJobBuilder.class.getSimpleName() + + "-testJSONJobBuilderSerialization.json"); + + try { + //////////////////// Test serialization + Job job = new Job(mawoConf); + JobDescriptor jobDescriptor = new JobDescriptor(job.getJobId(), + localJobDescriptorFile.getAbsolutePath()); + + String setup_cmd = "setup_task_cmd"; + String task_cmd = "regular_task_cmd"; + String teardown_cmd = "teardown_task_cmd"; + + Map setup_env = new HashMap(); + setup_env.put("setup_env_key", "setup_env_value"); + Map task_env = new HashMap(); + setup_env.put("task_env_key", "task_env_value"); + Map teardown_env = new HashMap(); + setup_env.put("teardown_env_key", "teardown_env_value"); + + job.setSetupTask(new SimpleTask(null, setup_env, setup_cmd, 100000)); + + List tasks = new ArrayList(); + tasks.add(new SimpleTask(null, task_env, task_cmd, 10000)); + job.setTasks(tasks); + + job.setTeardownTask( + new TeardownTask(null, teardown_env, teardown_cmd, 1000)); + + JobBuilder jobBuilder = JobBuilderFactory.createJobBuilder(mawoConf); + jobBuilder.serializeJob(job, jobDescriptor); + + //////////////////// Test deserialization + job = new Job(mawoConf); + jobDescriptor = new JobDescriptor(job.getJobId(), + localJobDescriptorFile.getAbsolutePath()); + jobBuilder = JobBuilderFactory.createJobBuilder(mawoConf); + jobBuilder.deserializeJob(jobDescriptor, job); + Assert.assertNotNull(job.getSetupTask()); + Assert.assertEquals("setup_task_cmd", job.getSetupTask().getTaskCmd()); + Assert.assertEquals(100000, job.getSetupTask().getTimeout()); + + Assert.assertNotNull(job.getTasks()); + Assert.assertEquals(1, job.getTasks().size()); + Assert.assertEquals("regular_task_cmd", + job.getTasks().get(0).getTaskCmd()); + Assert.assertEquals(10000, job.getTasks().get(0).getTimeout()); + + Assert.assertNotNull(job.getTeardownTask()); + Assert.assertEquals("teardown_task_cmd", + job.getTeardownTask().getTaskCmd()); + Assert.assertEquals(1000, job.getTeardownTask().getTimeout()); + } finally { + localJobDescriptorFile.delete(); + } + } +} \ No newline at end of file