diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/pom.xml
index e69de29bb2d..4e2218c9c30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/pom.xml
@@ -0,0 +1,185 @@
+
+ 4.0.0
+
+ org.apache.hadoop.applications.mawo
+ hadoop-applications-mawo
+ 0.0.1-SNAPSHOT
+ jar
+
+ hadoop-applications-mawo
+ http://maven.apache.org
+
+
+ UTF-8
+
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.2
+
+
+
+ com.google.inject
+ guice
+ 4.0
+
+
+
+ org.apache.curator
+ curator-framework
+ 2.7.1
+
+
+
+ org.apache.curator
+ curator-client
+ 2.7.1
+
+
+
+ commons-io
+ commons-io
+ 2.4
+
+
+
+ commons-logging
+ commons-logging
+ 1.1.3
+
+
+ avalon-framework
+ avalon-framework
+
+
+ logkit
+ logkit
+
+
+ javax.servlet
+ servlet-api
+
+
+
+
+
+ commons-lang
+ commons-lang
+ 2.6
+
+
+
+ commons-cli
+ commons-cli
+ 1.2
+
+
+
+ com.google.guava
+ guava
+ 11.0.2
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.6
+
+
+
+ junit
+ junit
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ com.sun.jmx
+ jmxri
+
+
+ org.jboss.netty
+ netty
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.10
+
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1
+
+
+
+ log4j
+ log4j
+ 1.2.17
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.1.2
+
+
+ org.apache.httpcomponents
+ httpcore
+ 4.1.2
+
+
+
+ org.mockito
+ mockito-all
+ 1.8.5
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 2.2-beta-1
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+ src/assembly/bin.xml
+ hadoop-applications-mawo-${project.version}
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/assembly/bin.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/assembly/bin.xml
index e69de29bb2d..eeee578606e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/assembly/bin.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/assembly/bin.xml
@@ -0,0 +1,43 @@
+
+
+ bin
+
+ tar.gz
+
+
+
+
+ README*
+
+
+
+
+
+ src/main/bin
+ bin
+
+
+
+
+
+ target
+ /
+
+ *.jar
+
+
+
+
+
+
+
+
+ /lib
+ runtime
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/common/Utils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/common/Utils.java
index e69de29bb2d..aff1e6ae968 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/common/Utils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/common/Utils.java
@@ -0,0 +1,5 @@
+package org.apache.hadoop.applications.mawo.common;
+
+public class Utils {
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java
index e69de29bb2d..29ef12a7b4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/AbstractTask.java
@@ -0,0 +1,137 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractTask implements Task {
+
+ private TaskId taskID = new TaskId();
+ private Map environment = new HashMap();
+ private String taskCmd;
+ protected TaskType taskType;
+ private long timeout;
+ final static Logger LOG = Logger.getLogger(AbstractTask.class);
+
+ public AbstractTask() {
+ }
+
+ public AbstractTask(TaskId taskId, Map environment,
+ String taskCMD, long timeout) {
+ this();
+ setTaskId(taskId);
+ setEnvironment(environment);
+ setTaskCmd(taskCMD);
+ setTimeout(timeout);
+ LOG.info("Created Task - type: " + this.taskType + ", TaskId: "
+ + this.taskID.toString() + ", cmd: '" + taskCMD + "' Timeout: "
+ + timeout);
+ }
+
+ @Override
+ public Map getEnvironment() {
+ return environment;
+ }
+
+ @Override
+ public void setEnvironment(Map environment) {
+ this.environment = environment;
+ }
+
+ @Override
+ public String getTaskCmd() {
+ return taskCmd;
+ }
+
+ @Override
+ public void setTaskCmd(String taskCMD) {
+ this.taskCmd = taskCMD;
+ }
+
+ @Override
+ public TaskId getTaskId() {
+ return taskID;
+ }
+
+ @Override
+ public void setTaskId(TaskId taskId) {
+ if (taskId != null) {
+ this.taskID = taskId;
+ }
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(TaskType type) {
+ this.taskType = type;
+ }
+
+ @Override
+ public long getTimeout() {
+ return this.timeout;
+ }
+
+ @Override
+ public void setTimeout(long taskTimeout) {
+ this.timeout = taskTimeout;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskID.write(out);
+ int environmentSize = (environment == null ? 0 : environment.size());
+ new IntWritable(environmentSize).write(out);
+ if (environmentSize != 0) {
+ for (Entry envEntry : environment.entrySet()) {
+ new Text(envEntry.getKey()).write(out);
+ new Text(envEntry.getValue()).write(out);
+ }
+ }
+ Text taskCmdText;
+ if (taskCmd == null) {
+ taskCmdText = new Text("");
+ } else {
+ taskCmdText = new Text(taskCmd);
+ }
+ taskCmdText.write(out);
+ WritableUtils.writeEnum(out, taskType);
+ WritableUtils.writeVLong(out, timeout);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.taskID = new TaskId();
+ taskID.readFields(in);
+ IntWritable envSize = new IntWritable(0);
+ envSize.readFields(in);
+ for (int i = 0; i < envSize.get(); i++) {
+ Text key = new Text();
+ Text value = new Text();
+ key.readFields(in);
+ value.readFields(in);
+ environment.put(key.toString(), value.toString());
+ }
+ Text taskCmdText = new Text();
+ taskCmdText.readFields(in);
+ taskCmd = taskCmdText.toString();
+ taskType = WritableUtils.readEnum(in, TaskType.class);
+ timeout = WritableUtils.readVLong(in);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskId: " + this.taskID.toString() + ", TaskType: " + this.taskType
+ + ", cmd: '" + taskCmd + "'";
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java
index e69de29bb2d..31fb1a1d8ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/CompositeTask.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+public class CompositeTask extends AbstractTask {
+ public CompositeTask() {
+ super();
+ setTaskType(TaskType.COMPOSITE);
+ }
+
+ public CompositeTask(Task task) {
+ super(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(),
+ task.getTimeout());
+ this.setTaskType(TaskType.COMPOSITE);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java
index e69de29bb2d..9260aeab80e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/DieTask.java
@@ -0,0 +1,8 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+public class DieTask extends AbstractTask {
+ public DieTask() {
+ super();
+ setTaskType(TaskType.DIE);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java
index e69de29bb2d..774c269ada2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/MawoConfiguration.java
@@ -0,0 +1,274 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+import com.google.inject.Singleton;
+
+@Singleton
+public final class MawoConfiguration {
+ final static Logger logger = Logger.getLogger(MawoConfiguration.class);
+ final static String COMMA_SPLITTER = ",";
+
+ public static final String CONFIG_FILE = "mawo.properties";
+
+ private static final String RPC_SERVER_HOSTNAME = "rpc.server.hostname";
+ private static final String RPC_SERVER_PORT = "rpc.server.port";
+
+ // Default values
+ private static final String RPC_SERVER_HOSTNAME_DEFAULT = "localhost";
+ private static final String RPC_SERVER_PORT_DEFAULT = "5121";
+
+ // Curator related Configurations
+ private static final String JOB_QUEUE_STORAGE_ENABLED =
+ "mawo.job-queue-storage.enabled";
+
+ private static final String ZK_PREFIX = "zookeeper.";
+ private static final String ZK_ADDRESS = ZK_PREFIX + "address";
+ private static final String ZK_ADDRESS_DEFAULT = "localhost:2181";
+
+ private static final String ZK_PARENT_PATH = ZK_PREFIX + "parent.path";
+ private static final String ZK_PARENT_PATH_DEFAULT = "/mawoRoot";
+
+ private static final String ZK_RETRY_INTERVAL_MS =
+ ZK_PREFIX + "retry.interval.ms";
+ private static final String ZK_RETRY_INTERVAL_MS_DEFAULT = "1000";
+
+ private static final String ZK_SESSION_TIMEOUT_MS =
+ ZK_PREFIX + "session.timeout.ms";
+ private static final String ZK_SESSION_TIMEOUT_MS_DEFAULT = "10000";
+
+ private static final String ZK_RETRIES_NUM = ZK_PREFIX + "retries.num";
+ private static final String ZK_RETRIES_NUM_DEFAULT = "1000";
+
+ private static final String ZK_ACL = ZK_PREFIX + "acl";
+ private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+
+ private static final String WORKER_NUM_TASKS = "worker.num.tasks";
+ private static final String WORKER_NUM_TASKS_DEFAULT = "10";
+
+ public static final String JOB_BUILDER_CLASS = "mawo.job-builder.class";
+ private static final String JOB_BUILDER_CLASS_DEFAULT =
+ "org.apache.hadoop.applications.mawo.server.master.job.SimpleTaskJobBuilder";
+
+ private static final String WORKER_WORK_SPACE = "worker.workspace";
+ private static final String WORKER_WORK_SPACE_DEFAULT = "/tmp";
+
+ public static final String CLUSTER_MANAGER_URL = "ycloud.url";
+ private static final String DEFAULT_CLUSTER_MANAGER_URL = "0.0.0.0:9191";
+
+ public static final String AUTO_SHUTDOWN_WORKERS =
+ "mawo.master.auto-shutdown-workers";
+ private static final boolean DEFAULT_AUTO_SHUTDOWN_WORKERS = false;
+
+ public static final String MASTER_TASKS_STATUS_LOG_PATH
+ = "master.tasks-status.log.path";
+ private static final String MASTER_TASKS_STATUS_LOG_PATH_DEFAULT = "/tmp";
+
+ private static final String MASTER_DRAIN_EVENTS_TIMEOUT =
+ "master.drain-events.timeout";
+ private static final long MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 60000;
+
+ private static final String WORKER_WHITELIST_ENV ="worker.whitelist.env";
+ private static final String WORKER_WHITELIST_ENV_DEFAULT = "";
+
+ private static final String MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS =
+ "master.teardown-worker.validity-interval.ms";
+ private static final String MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT
+ = "120000";
+
+ private Map configsMap;
+
+ public MawoConfiguration() {
+ this(readConfigFile());
+ }
+
+ private MawoConfiguration(Properties properties) {
+
+ configsMap = new HashMap();
+
+ configsMap.put(RPC_SERVER_HOSTNAME, properties
+ .getProperty(RPC_SERVER_HOSTNAME, RPC_SERVER_HOSTNAME_DEFAULT));
+ configsMap.put(RPC_SERVER_PORT,
+ properties.getProperty(RPC_SERVER_PORT, RPC_SERVER_PORT_DEFAULT));
+
+ configsMap.put(ZK_ADDRESS,
+ properties.getProperty(ZK_ADDRESS, ZK_ADDRESS_DEFAULT));
+ configsMap.put(ZK_PARENT_PATH,
+ properties.getProperty(ZK_PARENT_PATH, ZK_PARENT_PATH_DEFAULT));
+ configsMap.put(ZK_RETRY_INTERVAL_MS, properties
+ .getProperty(ZK_RETRY_INTERVAL_MS, ZK_RETRY_INTERVAL_MS_DEFAULT));
+ configsMap.put(ZK_SESSION_TIMEOUT_MS, properties
+ .getProperty(ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS_DEFAULT));
+ configsMap.put(ZK_RETRIES_NUM,
+ properties.getProperty(ZK_RETRIES_NUM, ZK_RETRIES_NUM_DEFAULT));
+ configsMap.put(ZK_ACL, properties.getProperty(ZK_ACL, ZK_ACL_DEFAULT));
+
+ configsMap.put(JOB_BUILDER_CLASS,
+ properties.getProperty(JOB_BUILDER_CLASS, JOB_BUILDER_CLASS_DEFAULT));
+
+ configsMap.put(JOB_QUEUE_STORAGE_ENABLED,
+ properties.getProperty(JOB_QUEUE_STORAGE_ENABLED, "false"));
+
+ configsMap.put(CLUSTER_MANAGER_URL, properties
+ .getProperty(CLUSTER_MANAGER_URL, DEFAULT_CLUSTER_MANAGER_URL));
+
+ configsMap.put(WORKER_NUM_TASKS,
+ properties.getProperty(WORKER_NUM_TASKS, WORKER_NUM_TASKS_DEFAULT));
+
+ configsMap.put(WORKER_WORK_SPACE,
+ properties.getProperty(WORKER_WORK_SPACE, WORKER_WORK_SPACE_DEFAULT));
+
+ configsMap.put(AUTO_SHUTDOWN_WORKERS, properties.getProperty(
+ AUTO_SHUTDOWN_WORKERS, String.valueOf(DEFAULT_AUTO_SHUTDOWN_WORKERS)));
+
+ configsMap.put(MASTER_TASKS_STATUS_LOG_PATH, properties.getProperty(
+ MASTER_TASKS_STATUS_LOG_PATH,
+ String.valueOf(MASTER_TASKS_STATUS_LOG_PATH_DEFAULT)));
+
+ configsMap.put(MASTER_DRAIN_EVENTS_TIMEOUT,
+ properties.getProperty(MASTER_DRAIN_EVENTS_TIMEOUT,
+ String.valueOf(MASTER_DRAIN_EVENTS_TIMEOUT_DEFAULT)));
+
+ configsMap.put(WORKER_WHITELIST_ENV, properties.getProperty(
+ WORKER_WHITELIST_ENV, WORKER_WHITELIST_ENV_DEFAULT));
+
+ configsMap.put(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS,
+ properties.getProperty(MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS,
+ MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS_DEFAULT));
+
+ }
+
+ public Map getConfigsMap() {
+ return configsMap;
+ }
+
+ /**
+ * Find, read, and parse the configuration file.
+ *
+ * @return the properties that were found or empty if no file was found
+ */
+ private static Properties readConfigFile() {
+ Properties properties = new Properties();
+
+ // Get property file stream from classpath
+ logger.info("Configuration file being loaded: " + CONFIG_FILE
+ + ". Found in classpath at "
+ + MawoConfiguration.class.getClassLoader().getResource(CONFIG_FILE));
+ InputStream inputStream = MawoConfiguration.class.getClassLoader()
+ .getResourceAsStream(CONFIG_FILE);
+
+ if (inputStream == null) {
+ throw new RuntimeException(CONFIG_FILE + " not found in classpath");
+ }
+
+ // load the properties
+ try {
+ properties.load(inputStream);
+ inputStream.close();
+ } catch (FileNotFoundException fnf) {
+ logger.error(
+ "No configuration file " + CONFIG_FILE + " found in classpath.");
+ } catch (IOException ie) {
+ throw new IllegalArgumentException(
+ "Can't read configuration file " + CONFIG_FILE, ie);
+ }
+
+ return properties;
+ }
+
+ public int getRpcServerPort() {
+ return Integer.parseInt(configsMap.get(RPC_SERVER_PORT));
+ }
+
+ public String getRpcHostName() {
+ return configsMap.get(RPC_SERVER_HOSTNAME);
+ }
+
+ public boolean getJobQueueStorageEnabled() {
+ return Boolean.parseBoolean(configsMap.get(JOB_QUEUE_STORAGE_ENABLED));
+ }
+
+ public String getZKAddress() {
+ return configsMap.get(ZK_ADDRESS);
+ }
+
+ public String getZKParentPath() {
+ return configsMap.get(ZK_PARENT_PATH);
+ }
+
+ public int getZKRetryIntervalMS() {
+ return Integer.parseInt(configsMap.get(ZK_RETRY_INTERVAL_MS));
+ }
+
+ public int getZKSessionTimeoutMS() {
+ return Integer.parseInt(configsMap.get(ZK_SESSION_TIMEOUT_MS));
+ }
+
+ public int getZKRetriesNum() {
+ return Integer.parseInt(configsMap.get(ZK_RETRIES_NUM));
+ }
+
+ public String getZKAcl() {
+ return configsMap.get(ZK_ACL);
+ }
+
+ public int getWorkerConcurrentTasksLimit() {
+ return Integer.parseInt(configsMap.get(WORKER_NUM_TASKS));
+ }
+
+ public String getJobBuilderClass() {
+ return configsMap.get(JOB_BUILDER_CLASS);
+ }
+
+ public String getWorkerWorkSpace() {
+ return configsMap.get(WORKER_WORK_SPACE);
+ }
+
+ public String getClusterManagerURL() {
+ return configsMap.get(CLUSTER_MANAGER_URL);
+ }
+
+ public boolean getAutoShutdownWorkers() {
+ return Boolean.parseBoolean(configsMap.get(AUTO_SHUTDOWN_WORKERS));
+ }
+
+ public String getMasterTasksStatusLogPath() {
+ return configsMap.get(MASTER_TASKS_STATUS_LOG_PATH);
+ }
+
+ public long getMasterDrainEventsTimeout() {
+ return Long.parseLong(configsMap.get(MASTER_DRAIN_EVENTS_TIMEOUT));
+ }
+
+ public List getWorkerWhiteListEnv() {
+ List whiteList = new ArrayList();
+ String env = configsMap.get(WORKER_WHITELIST_ENV);
+ if (env != null && !env.isEmpty()) {
+ String[] variables = env.split(COMMA_SPLITTER);
+ for (String variable : variables) {
+ variable = variable.trim();
+ if (variable.startsWith("$")) {
+ variable = variable.substring(1);
+ }
+ if (!variable.isEmpty()) {
+ whiteList.add(variable);
+ }
+ }
+ }
+ return whiteList;
+ }
+
+ public long getTeardownWorkerValidityInterval() {
+ return Long.parseLong(configsMap.get(
+ MASTER_TEARDOWN_WORKER_VALIDITY_INTERVAL_MS));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java
index e69de29bb2d..299e8591283 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/NullTask.java
@@ -0,0 +1,9 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+public class NullTask extends AbstractTask {
+
+ public NullTask() {
+ super();
+ this.setTaskType(TaskType.NULL);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java
index e69de29bb2d..7d252724ebc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/SimpleTask.java
@@ -0,0 +1,21 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.util.Map;
+
+public class SimpleTask extends AbstractTask {
+ public SimpleTask() {
+ super();
+ this.setTaskType(TaskType.SIMPLE);
+ }
+
+ public SimpleTask(Task task) {
+ this(task.getTaskId(), task.getEnvironment(), task.getTaskCmd(),
+ task.getTimeout());
+ }
+
+ public SimpleTask(TaskId taskId, Map environment,
+ String taskCMD, long timeout) {
+ super(taskId, environment, taskCMD, timeout);
+ this.setTaskType(TaskType.SIMPLE);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java
index e69de29bb2d..de9c31a615c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/Task.java
@@ -0,0 +1,26 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+public interface Task extends Writable {
+
+ TaskId getTaskId();
+
+ Map getEnvironment();
+
+ String getTaskCmd();
+
+ TaskType getTaskType();
+
+ void setTaskId(TaskId taskId);
+
+ void setEnvironment(Map environment);
+
+ void setTaskCmd(String taskCMD);
+
+ long getTimeout();
+
+ void setTimeout(long timeout);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java
index e69de29bb2d..5ebda5f983d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskId.java
@@ -0,0 +1,77 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.applications.mawo.server.master.job.JobId;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TaskId implements Writable {
+
+ static final String TASK_ID_PREFIX = "mawo_task_";
+
+ private JobId jobId = new JobId();
+ private long taskId;
+
+ public TaskId() {
+ }
+
+ public TaskId(JobId jobId, int ID) {
+ this.jobId = jobId;
+ this.taskId = ID;
+ }
+
+ public int getJobId() {
+ return jobId.getID();
+ }
+
+ public long getId() {
+ return taskId;
+ }
+
+ public String toString() {
+ return TASK_ID_PREFIX + jobId.getID() + "_" + taskId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+ result = prime * result + (int) (taskId ^ (taskId >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TaskId other = (TaskId) obj;
+ if (jobId == null) {
+ if (other.jobId != null)
+ return false;
+ } else if (!jobId.equals(other.jobId))
+ return false;
+ if (taskId != other.taskId)
+ return false;
+ return true;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ jobId.write(out);
+ WritableUtils.writeVLong(out, taskId);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ jobId = new JobId();
+ jobId.readFields(in);
+ this.taskId = WritableUtils.readVLong(in);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java
index e69de29bb2d..c4cb7e17685 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskStatus.java
@@ -0,0 +1,160 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.applications.mawo.server.worker.WorkerId;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.log4j.Logger;
+
+public class TaskStatus implements Writable, Cloneable {
+
+ private final static Logger LOG = Logger.getLogger(TaskStatus.class);
+
+ private TaskId taskId = new TaskId();
+ private long startTime;
+ private long endTime;
+ private WorkerId workerId = new WorkerId();
+ private int exitCode = -1;
+ private String taskCMD;
+ private String taskType;
+
+ public static enum State {
+ INIT, RUNNING, SUCCEEDED, FAILED, KILLED, EXPIRE
+ }
+
+ private volatile State runState;
+
+ public TaskStatus() {
+ }
+
+ public TaskStatus(WorkerId workerId, TaskId taskId, String taskCMD, String taskType) {
+ this(workerId, taskId, TaskStatus.State.INIT, taskCMD, taskType);
+ }
+
+ public TaskStatus(WorkerId workerId, TaskId taskId, State runState,
+ String taskCMD, String taskType) {
+ setWorkerId(workerId);
+ setTaskId(taskId);
+ setRunState(runState);
+ setTaskCMD(taskCMD);
+ setTaskType(taskType);
+ }
+
+ public State getRunState() {
+ return runState;
+ }
+
+ public void setRunState(State runState) {
+ this.runState = runState;
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void setTaskCMD(String cmd) {
+ this.taskCMD = cmd;
+ }
+
+ public String getTaskCMD() {
+ return taskCMD;
+ }
+
+ public void setTaskType(String type) {
+ this.taskType = type;
+ }
+
+ public String getTaskType() {
+ return taskType;
+ }
+
+ public TaskId getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(TaskId taskId) {
+ if (taskId != null) {
+ this.taskId = taskId;
+ }
+ }
+
+ public void setTaskState(TaskId taskId, State runState) {
+ setTaskId(taskId);
+ setRunState(runState);
+ }
+
+ public State getTaskState(TaskId taskId) {
+ if (taskId.equals(this.taskId)) {
+ return getRunState();
+ } else {
+ return null;
+ }
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime() {
+ this.startTime = getCurrentTime();
+ LOG.debug("Start Time for " + this.taskId + " is " + this.startTime);
+ }
+
+ private void setStartTime(long time) {
+ this.startTime = time;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime() {
+ this.setEndTime(getCurrentTime());
+ }
+
+ private void setEndTime(long time) {
+ this.endTime = time;
+ LOG.debug("End Time for " + this.taskId + " is " + this.endTime);
+ }
+
+ private long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
+ public void write(DataOutput dataOutput) throws IOException {
+ workerId.write(dataOutput);
+ taskId.write(dataOutput);
+ WritableUtils.writeEnum(dataOutput, runState);
+ WritableUtils.writeVLong(dataOutput, getStartTime());
+ WritableUtils.writeVLong(dataOutput, getEndTime());
+ WritableUtils.writeString(dataOutput, taskCMD);
+ WritableUtils.writeString(dataOutput, taskType);
+ WritableUtils.writeVInt(dataOutput, exitCode);
+ }
+
+ public void readFields(DataInput dataInput) throws IOException {
+ workerId.readFields(dataInput);
+ taskId.readFields(dataInput);
+ setRunState(WritableUtils.readEnum(dataInput, State.class));
+ setStartTime(WritableUtils.readVLong(dataInput));
+ setEndTime(WritableUtils.readVLong(dataInput));
+ setTaskCMD(WritableUtils.readString(dataInput));
+ setTaskType(WritableUtils.readString(dataInput));
+ setExitCode(WritableUtils.readVInt(dataInput));
+ }
+
+ public WorkerId getWorkerId() {
+ return workerId;
+ }
+
+ public void setWorkerId(WorkerId workerId) {
+ this.workerId = workerId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java
index e69de29bb2d..27331406c00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TaskType.java
@@ -0,0 +1,5 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+public enum TaskType {
+ SIMPLE, COMPOSITE, NULL, DIE, TEARDOWN
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java
index e69de29bb2d..00573936d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/TeardownTask.java
@@ -0,0 +1,16 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import java.util.Map;
+
+public class TeardownTask extends SimpleTask {
+ public TeardownTask() {
+ super();
+ setTaskType(TaskType.TEARDOWN);
+ }
+
+ public TeardownTask(TaskId taskId, Map environment,
+ String taskCMD, long timeout) {
+ super(taskId, environment, taskCMD, timeout);
+ setTaskType(TaskType.TEARDOWN);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java
index e69de29bb2d..3973a2c18a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/java/org/apache/hadoop/applications/mawo/server/common/WorkAssignmentProtocol.java
@@ -0,0 +1,19 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+import org.apache.hadoop.applications.mawo.server.worker.WorkerId;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolInfo;
+
+@ProtocolInfo(protocolName = "WorkAssignmentProtocol", protocolVersion = 1)
+public interface WorkAssignmentProtocol {
+
+ Text getNewWorkerId();
+
+ Task registerWorker(WorkerId workerId);
+
+ void deRegisterWorker(WorkerId workerId);
+
+ Task sendHeartbeat(WorkerId workerId, TaskStatus[] taskStatusList);
+
+ void addTask(Task task);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/log4j.properties
index e69de29bb2d..393e0877ec1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/log4j.properties
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/mawo-default.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/mawo-default.properties
index e69de29bb2d..b1b1c53b94d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/mawo-default.properties
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/main/resources/mawo-default.properties
@@ -0,0 +1,25 @@
+rpc.server.hostname=localhost
+rpc.server.port=5120
+
+#curator related configurations
+zookeeper.parent.path=/mawoRoot
+zookeeper.address=localhost:2181
+zookeeper.retry.interval.ms=1000
+zookeeper.session.timeout.ms=10000
+zookeeper.retries.num=1000
+zookeeper.acl=world:anyone:rwcda
+worker.num.tasks=10
+mawo.job-queue-storage.enabled=true
+mawo.job-builder.class=org.apache.hadoop.applications.mawo.server.master.job.SimpleTaskJsonJobBuilder
+worker.workspace=/tmp
+
+ycloud.url=0.0.0.0:9191
+
+task.block-size.min=10
+task.uber-time.min=12
+
+master.tasks-status.log.path=/tmp
+
+master.teardown-worker.validity-interval.ms=120000
+#a comma-separated list of environment variables needed by all the tasks
+worker.whitelist.env=
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java
index e69de29bb2d..8ab9b22a147 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/java/org/apache/hadoop/applications/mawo/server/common/TestMaWoConfiguration.java
@@ -0,0 +1,41 @@
+package org.apache.hadoop.applications.mawo.server.common;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.ArrayComparisonFailure;
+
+import java.lang.*;
+import java.util.ArrayList;
+import java.util.Properties;
+
+public class TestMaWoConfiguration {
+
+ @Test
+ public void testMaWoConfiguration() {
+
+ MawoConfiguration mawoConf = new MawoConfiguration();
+
+ // validate Rpc server port
+ Assert.assertEquals(mawoConf.getRpcServerPort(), 5120);
+
+ // validate Rpc hostname
+ Assert.assertTrue("localhost".equals(mawoConf.getRpcHostName()));
+
+ // validate job queue storage conf
+ boolean jobQueueStorage = mawoConf.getJobQueueStorageEnabled();
+ Assert.assertTrue(jobQueueStorage);
+
+ // validate default teardownWorkerValidity Interval
+ Assert.assertEquals(mawoConf.getTeardownWorkerValidityInterval(), 120000);
+
+ // validate Zk related configs
+ Assert.assertTrue("/mawoRoot".equals(mawoConf.getZKParentPath()));
+ Assert.assertTrue("localhost:2181".equals(mawoConf.getZKAddress()));
+ Assert.assertEquals(1000, mawoConf.getZKRetryIntervalMS());
+ Assert.assertEquals(10000, mawoConf.getZKSessionTimeoutMS());
+ Assert.assertEquals(1000, mawoConf.getZKRetriesNum());
+ }
+
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/log4j.properties
index e69de29bb2d..393e0877ec1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/log4j.properties
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/mawo.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/mawo.properties
index e69de29bb2d..263e0acf72e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/mawo.properties
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-applications-mawo/src/test/resources/mawo.properties
@@ -0,0 +1,12 @@
+rpc.server.hostname=localhost
+rpc.server.port=5120
+
+#curator related configurations
+zookeeper.parent.path=/mawoRoot
+zookeeper.address=localhost:2181
+zookeeper.retry.interval.ms=1000
+zookeeper.session.timeout.ms=10000
+zookeeper.retries.num=1000
+zookeeper.acl=world:anyone:rwcda
+worker.num.tasks=10
+mawo.job-queue-storage.enabled=true
\ No newline at end of file