diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml
new file mode 100644
index 0000000..f1bcc02
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml
@@ -0,0 +1,39 @@
+
+
+ hadoop-yarn-applications-masterworker
+ org.apache.hadoop
+ 3.0.0-SNAPSHOT
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-yarn-applications-masterworker-core
+ jar
+
+ hadoop-yarn-applications-masterworker-core
+ http://maven.apache.org
+
+
+ UTF-8
+
+
+
+
+ junit
+ junit
+ 3.8.1
+ test
+
+
+ org.apache.hadoop
+ hadoop-client
+ 0.23.1
+
+
+ org.apache.hadoop
+ hadoop-common
+ 0.22.0
+
+
+
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java
new file mode 100644
index 0000000..fbeb05b
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+public class AMRMProtocolWraper extends Thread {
+ private static final Log LOG = LogFactory.getLog(AMRMProtocolWraper.class);
+
+ Configuration configuration;
+ YarnRPC yarnRPC;
+ YarnConfiguration yarnConfiguration;
+
+ AMRMProtocol amrmProtocol;
+ ApplicationAttemptId appAttemptID;
+ AtomicInteger rmRequestID = new AtomicInteger();
+ AtomicInteger numCompletedContainers = new AtomicInteger();
+ CopyOnWriteArrayList releasedContainers = new CopyOnWriteArrayList();
+ volatile boolean run;
+ AtomicInteger containersToRequest = new AtomicInteger();
+ LinkedBlockingQueue receivedContainers;
+
+ public void init(ApplicationAttemptId appAttemptID) throws YarnRemoteException {
+ init(appAttemptID, "", 0, "");
+ }
+
+ public void init(ApplicationAttemptId appAttemptID,
+ String appMasterHostname,
+ int appMasterRpcPort,
+ String appMasterTrackingUrl) throws YarnRemoteException {
+
+ this.appAttemptID = appAttemptID;
+ receivedContainers = new LinkedBlockingQueue();
+
+ configuration = YarnRPCBuilder.getConf();
+ yarnRPC = YarnRPCBuilder.getYarnRPC();
+ yarnConfiguration = YarnRPCBuilder.getYarnConf();
+
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConfiguration.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ amrmProtocol = ((AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, configuration));
+ RegisterApplicationMasterRequest registerApplicationMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerApplicationMasterRequest.setApplicationAttemptId(appAttemptID);
+ registerApplicationMasterRequest.setHost(appMasterHostname);
+ registerApplicationMasterRequest.setRpcPort(appMasterRpcPort);
+ registerApplicationMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+ amrmProtocol.registerApplicationMaster(registerApplicationMasterRequest);
+ this.start();
+ }
+
+ public Container requestContainer() throws InterruptedException {
+ containersToRequest.incrementAndGet();
+ return receivedContainers.take();
+ }
+
+ void terminate() throws YarnRemoteException {
+ run = false;
+ try {
+ this.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setAppAttemptId(appAttemptID);
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ amrmProtocol.finishApplicationMaster(finishReq);
+ }
+
+ @Override
+ public void run() {
+ run = true;
+ while(run) {
+ try {
+ int toRequest = containersToRequest.get();
+ AMResponse amResponse = getContainers(toRequest);
+ int obtainedContainerCount = amResponse.getAllocatedContainers().size();
+ if (obtainedContainerCount == 1) {
+ containersToRequest.decrementAndGet();
+ List allocatedContainers = amResponse.getAllocatedContainers();
+ for (Container container : allocatedContainers) {
+ receivedContainers.put(container);
+ }
+ }
+ } catch (YarnRemoteException yarnRemoteException) {
+ yarnRemoteException.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private AMResponse getContainers(int numContainers) throws YarnRemoteException {
+ List resourceReq = new ArrayList();
+
+ ResourceRequest request = Records.newRecord(ResourceRequest.class);
+ request.setHostName("*");
+ request.setNumContainers(numContainers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ request.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(10);
+ request.setCapability(capability);
+ resourceReq.add(request);
+
+ AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ req.setResponseId(rmRequestID.incrementAndGet());
+ req.setApplicationAttemptId(appAttemptID);
+ req.addAllAsks(resourceReq);
+ req.addAllReleases(releasedContainers);
+ req.setProgress((float)numCompletedContainers.get());
+ AllocateResponse resp = amrmProtocol.allocate(req);
+ return resp.getAMResponse();
+ }
+
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java
new file mode 100644
index 0000000..62583b6
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.Records;
+
+public class ApplicationSubmissionContextBuilder {
+ private ApplicationSubmissionContext applicationSubmissionContext;
+
+ public void init() {
+ applicationSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class);
+ }
+
+ public void setApplicationID(ApplicationId applicationId) {
+ applicationSubmissionContext.setApplicationId(applicationId);
+ }
+
+ public void setApplicationName(String applicationName) {
+ applicationSubmissionContext.setApplicationName(applicationName);
+ }
+
+ public void setContainer(ContainerLaunchContext containerLaunchContext) {
+ applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
+ }
+
+ public void setQueue(String applicationQueue) {
+ applicationSubmissionContext.setQueue(applicationQueue);
+ }
+
+ public void setUser(String applicationUser) {
+ applicationSubmissionContext.setUser(applicationUser);
+ }
+
+ public void setPriority(int priorityValue) {
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(priorityValue);
+ applicationSubmissionContext.setPriority(priority);
+ }
+
+ public SubmitApplicationRequest build() {
+ SubmitApplicationRequest submitApplicationRequest = Records.newRecord(SubmitApplicationRequest.class);
+ submitApplicationRequest.setApplicationSubmissionContext(applicationSubmissionContext);
+ return submitApplicationRequest;
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java
new file mode 100644
index 0000000..b40b925
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ClasspathUtility {
+
+ private static final Log LOG = LogFactory.getLog(ClasspathUtility.class);
+
+ public static String getClasspath() {
+ String classPathEnv = "${CLASSPATH}"
+ + ":./*"
+ + ":$HADOOP_CONF_DIR"
+ + ":$HADOOP_COMMON_HOME/share/hadoop/common/*"
+ + ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*"
+ + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*"
+ + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*"
+ + ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*"
+ + ":./log4j.properties:";
+ classPathEnv += ":" + ClasspathUtility.getTestRuntimeClasspath();
+ return classPathEnv;
+ }
+
+ public static String getTestRuntimeClasspath() {
+
+ InputStream classpathFileStream = null;
+ BufferedReader reader = null;
+ String envClassPath = "";
+
+ LOG.info("Trying to generate classpath for app master from current thread's classpath");
+ try {
+
+ // Create classpath from generated classpath
+ // Check maven ppom.xml for generated classpath info
+ // Works if compile time env is same as runtime. Mainly tests.
+ ClassLoader thisClassLoader = Thread.currentThread()
+ .getContextClassLoader();
+ String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
+ classpathFileStream = thisClassLoader
+ .getResourceAsStream(generatedClasspathFile);
+ if (classpathFileStream == null) {
+ LOG.info("Could not classpath resource from class loader");
+ return envClassPath;
+ }
+ LOG.info("Readable bytes from stream=" + classpathFileStream.available());
+ reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+ String cp = reader.readLine();
+ if (cp != null) {
+ envClassPath += cp.trim() + ":";
+ }
+ // Put the file itself on classpath for tasks.
+ envClassPath += thisClassLoader.getResource(generatedClasspathFile)
+ .getFile();
+ } catch (IOException e) {
+ LOG.info("Could not find the necessary resource to generate class path for tests. Error="
+ + e.getMessage());
+ }
+
+ try {
+ if (classpathFileStream != null) {
+ classpathFileStream.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to close class path file stream or reader. Error="
+ + e.getMessage());
+ }
+
+ return envClassPath;
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java
new file mode 100644
index 0000000..0af178e
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.net.InetSocketAddress;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+public class ClientRMProtocolWrapper {
+ private static final Log LOG = LogFactory.getLog(ClientRMProtocolWrapper.class);
+ private Configuration configuration;
+ private YarnRPC yarnRPC;
+ private ClientRMProtocol clientRMProtocol;
+
+ public void init() {
+
+ configuration = YarnRPCBuilder.getConf();
+ yarnRPC = YarnRPCBuilder.getYarnRPC();
+ YarnConfiguration yarnConf = YarnRPCBuilder.getYarnConf();
+
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS,YarnConfiguration.DEFAULT_RM_ADDRESS));
+ clientRMProtocol = ((ClientRMProtocol) yarnRPC.getProxy(ClientRMProtocol.class, rmAddress, configuration));
+ }
+
+ public ApplicationId getNewApplicationId() throws YarnRemoteException {
+ GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
+ GetNewApplicationResponse response = clientRMProtocol.getNewApplication(request);
+ return response.getApplicationId();
+ }
+
+ public void submitApplication(SubmitApplicationRequest submitApplicationRequest) throws YarnRemoteException {
+ clientRMProtocol.submitApplication(submitApplicationRequest);
+ }
+
+ public ApplicationReport getApplicationReport(ApplicationId applicationId) throws YarnRemoteException {
+ GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+ reportRequest.setApplicationId(applicationId);
+ GetApplicationReportResponse reportResponse = clientRMProtocol.getApplicationReport(reportRequest);
+ return reportResponse.getApplicationReport();
+ }
+
+ public void killApplication(ApplicationId applicationId) throws YarnRemoteException {
+ KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
+ killRequest.setApplicationId(applicationId);
+ clientRMProtocol.forceKillApplication(killRequest);
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java
new file mode 100644
index 0000000..179f033
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class ContainerLaunchContextBuilder {
+ private static final Log LOG = LogFactory.getLog(ContainerLaunchContextBuilder.class);
+ Configuration configuration;
+ YarnRPC yarnRPC;
+ YarnConfiguration yarnConfiguration;
+ ContainerLaunchContext containerLaunchContext;
+ Map resources;
+ Map environment;
+ Vector commandFagments;
+ void init() {
+
+ configuration = YarnRPCBuilder.getConf();
+ yarnRPC = YarnRPCBuilder.getYarnRPC();
+ yarnConfiguration = YarnRPCBuilder.getYarnConf();
+
+ containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
+ resources = new HashMap();
+ environment = new HashMap();
+ commandFagments = new Vector();
+ }
+
+ public void addLocalResource(String localResourceName,
+ String remoteResourceName,
+ String applicationName,
+ String environmentPrefix,
+ ApplicationId applicationID) throws IOException {
+ FileSystem fs = FileSystem.get(configuration);
+ Path src = new Path(localResourceName);
+ String pathSuffix = applicationName + "/" + applicationID.getId() + "/" + remoteResourceName;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ fs.copyFromLocalFile(false, true, src, dst);
+
+
+ String fileLocation = dst.toUri().toString();
+ FileStatus jarFileStatus = fs.getFileStatus(dst);
+ long fileLen = jarFileStatus.getLen();
+ long fileTimestamp = jarFileStatus.getModificationTime();
+
+ LOG.info("Transfering local resource, " + fileLocation + " " + fileLen + " " + fileTimestamp);
+
+
+ this.addEnvironmentVariable(environmentPrefix + "LOCATION", fileLocation);
+ this.addEnvironmentVariable(environmentPrefix + "TIMESTAMP", Long.toString(fileTimestamp));
+ this.addEnvironmentVariable(environmentPrefix + "LEN", Long.toString(fileLen));
+
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ amJarRsrc.setType(LocalResourceType.FILE);
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ amJarRsrc.setTimestamp(fileTimestamp);
+ amJarRsrc.setSize(fileLen);
+ resources.put(remoteResourceName, amJarRsrc);
+ }
+
+ public void addDFSResource(String dfsResourceName,
+ long timestamp,
+ long length,
+ String destinationName) throws URISyntaxException {
+ LocalResource shellRsrc = Records.newRecord(LocalResource.class);
+ shellRsrc.setType(LocalResourceType.FILE);
+ shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dfsResourceName)));
+ shellRsrc.setTimestamp(timestamp);
+ shellRsrc.setSize(length);
+ resources.put(destinationName, shellRsrc);
+ }
+
+ private void finalizeResources() {
+ containerLaunchContext.setLocalResources(resources);
+ }
+
+ public void addEnvironmentVariable(String key, String value) {
+ LOG.info("Added Environment Variable, " + key + " " + value);
+ environment.put(key, value);
+ }
+
+ private void finalizeEnvironment() {
+ containerLaunchContext.setEnvironment(environment);
+ }
+
+ public void addCommandFragment(CharSequence given) {
+ commandFagments.add(given);
+ }
+
+ private void finalizeCommand() {
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : commandFagments) {
+ command.append(str).append(" ");
+ }
+ List commands = new ArrayList();
+ commands.add(command.toString());
+ containerLaunchContext.setCommands(commands);
+ }
+
+ public void setMemory(int memory) {
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memory);
+ containerLaunchContext.setResource(capability);
+ }
+
+ public ContainerLaunchContext build() {
+ finalizeCommand();
+ finalizeEnvironment();
+ finalizeResources();
+ return containerLaunchContext;
+ }
+
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java
new file mode 100644
index 0000000..a6e8d02
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+
+public class ContainerManagerWrapper extends Thread {
+ private static final Log LOG = LogFactory.getLog(ContainerManagerWrapper.class);
+ private ContainerManager containerManager;
+ private Container container;
+
+ private Configuration conf;
+ private YarnRPC yarnRPC;
+ private YarnConfiguration yarnConfiguration;
+
+ public void init(Container container, ContainerLaunchContext containerLaunchContext) throws IOException {
+ conf = YarnRPCBuilder.getConf();
+ yarnRPC = YarnRPCBuilder.getYarnRPC();
+ yarnConfiguration = YarnRPCBuilder.getYarnConf();
+
+ containerLaunchContext.setContainerId(container.getId());
+ containerLaunchContext.setResource(container.getResource());
+ containerLaunchContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+
+ this.container = container;
+
+ String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort();
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ containerManager = ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConfiguration));
+ StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class);
+ startContainerRequest.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(startContainerRequest);
+ this.start();
+ }
+
+ @Override
+ public void run() {
+ while(true) {
+ GetContainerStatusRequest getContainerStatusRequest = Records.newRecord(GetContainerStatusRequest.class);
+ getContainerStatusRequest.setContainerId(container.getId());
+ try {
+ GetContainerStatusResponse getContainerStatusResponse = containerManager.getContainerStatus(getContainerStatusRequest);
+ ContainerStatus containerStatus = getContainerStatusResponse.getStatus();
+ ContainerState containerState = containerStatus.getState();
+ if(containerState == ContainerState.COMPLETE)
+ break;
+ } catch (YarnRemoteException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/DSConstants.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/DSConstants.java
new file mode 100644
index 0000000..2a70754
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/DSConstants.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DSConstants {
+
+ /**
+ * Environment key name pointing to the shell script's location
+ */
+ public static final String DISTRIBUTEDSHELLSCRIPTLOCATION = "DISTRIBUTEDSHELLSCRIPTLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the shell script.
+ * Used to validate the local resource.
+ */
+ public static final String DISTRIBUTEDSHELLSCRIPTTIMESTAMP = "DISTRIBUTEDSHELLSCRIPTTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the shell script.
+ * Used to validate the local resource.
+ */
+ public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
+}
\ No newline at end of file
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java
new file mode 100644
index 0000000..ff3b338
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class MWApplicationMaster extends MWMasterRunner {
+
+ private static final Log LOG = LogFactory.getLog(MWApplicationMaster.class);
+ private ApplicationAttemptId appAttemptID;
+ private String workerClass = "";
+ private String workerJarPath = "";
+ private long workerJarTimestamp;
+ private long workerJarLen;
+ private String masterJarPath = "";
+ private long masterJarTimestamp;
+ private long masterJarLen;
+ private String hostname;
+ private int port;
+ private List containerManagerWrapperList = new ArrayList();
+ private AMRMProtocolWraper amrmProtocolWraper;
+
+ public void initiate(String[] args) throws ParseException, IOException {
+ LOG.info("Starting Application Master");
+ processEnvironmentVariables(args);
+ amrmProtocolWraper = new AMRMProtocolWraper();
+ amrmProtocolWraper.init(appAttemptID);
+ hostname = Utility.getHostname();
+ port = super.init(hostname);
+ LOG.info("Hostname:" + hostname + " Port: " + port);
+ super.start();
+ }
+
+ public void addWorker() throws URISyntaxException, InterruptedException, IOException {
+ Container container = amrmProtocolWraper.requestContainer();
+
+ ContainerLaunchContextBuilder containerLaunchContextBuilder = new ContainerLaunchContextBuilder();
+ containerLaunchContextBuilder.init();
+
+ containerLaunchContextBuilder.addDFSResource(masterJarPath,masterJarTimestamp,masterJarLen,"AppMaster.jar");
+ containerLaunchContextBuilder.addDFSResource(workerJarPath,workerJarTimestamp,workerJarLen,"AppWorker.jar");
+
+ containerLaunchContextBuilder.addEnvironmentVariable("MASTERHOSTNAME", hostname);
+ containerLaunchContextBuilder.addEnvironmentVariable("MASTERPORT", Integer.toString(port));
+ containerLaunchContextBuilder.addEnvironmentVariable("CLASSPATH", ClasspathUtility.getClasspath());
+ containerLaunchContextBuilder.addCommandFragment("${JAVA_HOME}" + "/bin/java");
+ containerLaunchContextBuilder.addCommandFragment("-Xmx10m");
+ containerLaunchContextBuilder.addCommandFragment(workerClass);
+
+ containerLaunchContextBuilder.addCommandFragment("1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stdout");
+ containerLaunchContextBuilder.addCommandFragment("2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stderr");
+
+ containerLaunchContextBuilder.setMemory(10);
+ ContainerLaunchContext containerLaunchContext = containerLaunchContextBuilder.build();
+ ContainerManagerWrapper containerManagerWrapper = new ContainerManagerWrapper();
+ containerManagerWrapper.init(container, containerLaunchContext);
+ containerManagerWrapperList.add(containerManagerWrapper);
+ }
+
+ public void terminate() throws YarnRemoteException, InterruptedException {
+ for (ContainerManagerWrapper containerManagerWrapper : containerManagerWrapperList) {
+ try {
+ containerManagerWrapper.join();
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ super.join();
+ amrmProtocolWraper.terminate();
+ }
+
+
+ private boolean processEnvironmentVariables(String[] args) throws ParseException, IOException {
+
+ Map envs = System.getenv();
+
+ ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
+ appAttemptID = containerId.getApplicationAttemptId();
+
+ String workerJarLocationVariable = new String("WORKERJARLOCATION");
+ String workerJarTimestampVariable = new String("WORKERJARTIMESTAMP");
+ String workerJarLenVariable = new String("WORKERJARLEN");
+
+ workerJarPath = envs.get(workerJarLocationVariable);
+ workerJarTimestamp = Long.valueOf(envs.get(workerJarTimestampVariable));
+ workerJarLen = Long.valueOf(envs.get(workerJarLenVariable));
+
+ String masterJarLocationVariable = new String("MASTERJARLOCATION");
+ String masterJarTimestampVariable = new String("MASTERJARTIMESTAMP");
+ String masterJarLenVariable = new String("MASTERJARLEN");
+
+ masterJarPath = envs.get(masterJarLocationVariable);
+ masterJarTimestamp = Long.valueOf(envs.get(masterJarTimestampVariable));
+ masterJarLen = Long.valueOf(envs.get(masterJarLenVariable));
+
+ String workerClassVariable = new String("WORKERCLASS");
+ workerClass = envs.get(workerClassVariable);
+
+ return true;
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java
new file mode 100644
index 0000000..53796e0
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+public class MWClient {
+ private static final Log LOG = LogFactory.getLog(MWClient.class);
+ private static String masterWorkerLib;
+ private static String masterWorkerApp;
+ private static String masterClass;
+ private static String workerClass;
+
+ public static void main(String[] args) throws IOException, InterruptedException, ParseException {
+
+ processCommandlineArguments(args);
+
+ ClientRMProtocolWrapper clientRMProtocolWrapper = new ClientRMProtocolWrapper();
+ clientRMProtocolWrapper.init();
+
+ ApplicationId applicationId = clientRMProtocolWrapper.getNewApplicationId();
+
+ ContainerLaunchContextBuilder containerLaunchContextBuilder = new ContainerLaunchContextBuilder();
+ containerLaunchContextBuilder.init();
+ containerLaunchContextBuilder.addLocalResource(masterWorkerLib, "AppMaster.jar", "MasterWorker", "MASTERJAR", applicationId);
+ containerLaunchContextBuilder.addLocalResource(masterWorkerApp, "AppWorker.jar", "MasterWorker", "WORKERJAR", applicationId);
+
+ containerLaunchContextBuilder.addEnvironmentVariable("CLASSPATH", ClasspathUtility.getClasspath());
+ containerLaunchContextBuilder.addEnvironmentVariable("WORKERCLASS", workerClass);
+
+ containerLaunchContextBuilder.addCommandFragment("${JAVA_HOME}" + "/bin/java");
+ containerLaunchContextBuilder.addCommandFragment("-Xmx10m");
+ containerLaunchContextBuilder.addCommandFragment(masterClass);
+ containerLaunchContextBuilder.addCommandFragment("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/master.stdout");
+ containerLaunchContextBuilder.addCommandFragment("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/master.stderr");
+ containerLaunchContextBuilder.setMemory(10);
+ ContainerLaunchContext containerLaunchContext = containerLaunchContextBuilder.build();
+
+ ApplicationSubmissionContextBuilder applicationSubmissionContextBuilder = new ApplicationSubmissionContextBuilder();
+ applicationSubmissionContextBuilder.init();
+ applicationSubmissionContextBuilder.setApplicationID(applicationId);
+ applicationSubmissionContextBuilder.setApplicationName("MasterWorker");
+ applicationSubmissionContextBuilder.setContainer(containerLaunchContext);
+ SubmitApplicationRequest submitApplicationRequest = applicationSubmissionContextBuilder.build();
+
+ clientRMProtocolWrapper.submitApplication(submitApplicationRequest);
+
+ while(true) {
+ Thread.sleep(1000);
+ ApplicationReport applicationReport = clientRMProtocolWrapper.getApplicationReport(applicationId);
+
+ LOG.info("Got application report from ASM for"
+ + ", appId=" + applicationId.getId()
+ + ", clientToken=" + applicationReport.getClientToken()
+ + ", appDiagnostics=" + applicationReport.getDiagnostics()
+ + ", appMasterHost=" + applicationReport.getHost()
+ + ", appQueue=" + applicationReport.getQueue()
+ + ", appMasterRpcPort=" + applicationReport.getRpcPort()
+ + ", appStartTime=" + applicationReport.getStartTime()
+ + ", yarnAppState=" + applicationReport.getYarnApplicationState().toString()
+ + ", distributedFinalState=" + applicationReport.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + applicationReport.getTrackingUrl()
+ + ", appUser=" + applicationReport.getUser());
+
+ YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
+ FinalApplicationStatus finalApplicationStatus = applicationReport.getFinalApplicationStatus();
+ if (yarnApplicationState == YarnApplicationState.FINISHED) {
+ if (finalApplicationStatus == FinalApplicationStatus.SUCCEEDED) {
+ LOG.info("Application has completed successfully.");
+ break;
+ }
+ else {
+ LOG.info("Application has completed unsuccessfully.");
+ break;
+ }
+ }
+ if (yarnApplicationState == YarnApplicationState.FAILED) {
+ LOG.info("Application master failed.");
+ break;
+ }
+ }
+ }
+
+ public static void processCommandlineArguments(String[] args) throws ParseException, IOException {
+ Options opts = new Options();
+ opts.addOption("masterworkerlib", true, "Path of the Master-Worker Library jar");
+ opts.addOption("masterworkerapp", true, "Path of the Master-Worker Application jar");
+ opts.addOption("masterclass", true, "Class for Master");
+ opts.addOption("workerclass", true, "Class for Worker");
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (!cliParser.hasOption("masterworkerlib")) {
+ throw new IllegalArgumentException("Path of the Master-Worker Library jar not specified");
+ }
+ if (!cliParser.hasOption("masterworkerapp")) {
+ throw new IllegalArgumentException("Path of the Master-Worker Application jar not specified");
+ }
+ if (!cliParser.hasOption("masterclass")) {
+ throw new IllegalArgumentException("Class for Master not specified");
+ }
+ if (!cliParser.hasOption("workerclass")) {
+ throw new IllegalArgumentException("Class for Worker not specified");
+ }
+
+ masterWorkerLib = cliParser.getOptionValue("masterworkerlib");
+ masterWorkerApp = cliParser.getOptionValue("masterworkerapp");
+ masterClass = cliParser.getOptionValue("masterclass");
+ workerClass = cliParser.getOptionValue("workerclass");
+ }
+
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java
new file mode 100644
index 0000000..e8230a7
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+
+public abstract class MWMasterRunner extends Thread implements MWProtocol {
+ private static final Log LOG = LogFactory.getLog(MWMasterRunner.class);
+ private Server server;
+ private BlockingQueue workUnitQueue;
+ private BlockingQueue resultUnitQueue;
+
+ public int init(String hostname) throws IOException {
+ Configuration conf = new Configuration();
+ workUnitQueue = new LinkedBlockingQueue();
+ resultUnitQueue = new LinkedBlockingQueue();
+ int linuxEphemeralPortRangeStart = 32768;
+ int linuxEphemeralPortRangeEnd = 61000;
+ int port = -1;
+ for(int i = linuxEphemeralPortRangeStart; i <= linuxEphemeralPortRangeEnd; i++) {
+ try {
+ server = RPC.getServer(MWProtocol.class, this, hostname, i, conf);
+ port = i;
+ break;
+ } catch (BindException e) {
+ }
+ }
+ if (port == -1)
+ throw new BindException();
+ else {
+ server.start();
+ return port;
+ }
+ }
+
+ @Override
+ public void run() {
+ manageWorkers();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ server.stop();
+ }
+
+ @Override
+ public MWMessage getWorkUnit() throws InterruptedException {
+ MWMessage curr = workUnitQueue.poll();
+ if(curr == null) {
+ curr = new MWMessage();
+ curr.makeInvalid();
+ }
+ return curr;
+ }
+
+ @Override
+ public void putResultUnit(MWMessage result) {
+ resultUnitQueue.add(result);
+ }
+
+ public void addWork(MWMessage workUnit) {
+ workUnitQueue.add(workUnit);
+ }
+
+ public void killWorker() {
+ MWMessage curr = new MWMessage();
+ curr.makePoison();
+ workUnitQueue.add(curr);
+ }
+
+ public MWMessage waitForResult() throws InterruptedException {
+ return resultUnitQueue.take();
+ }
+
+ public abstract void manageWorkers();
+
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java
new file mode 100644
index 0000000..8753fdd
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class MWMessage implements Writable {
+ boolean isPoison;
+ boolean isValid;
+
+ public MWMessage() {
+ isValid = true;
+ isPoison = false;
+ }
+
+ public boolean isValid() {
+ return isValid;
+ }
+
+ public void makeInvalid() {
+ isValid = false;
+ }
+
+ public boolean isPoison() {
+ return isPoison;
+ }
+
+ public void makePoison() {
+ isPoison = true;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(isValid);
+ out.writeBoolean(isPoison);
+ writeWorkUnit(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ isValid = in.readBoolean();
+ isPoison = in.readBoolean();
+ readFieldsWorkUnit(in);
+ }
+
+ public void writeWorkUnit(DataOutput out) throws IOException {
+
+ }
+
+ public void readFieldsWorkUnit(DataInput in) throws IOException {
+
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java
new file mode 100644
index 0000000..fbc5557
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+public interface MWProtocol {
+ public static final long versionID = 1L;
+
+ MWMessage getWorkUnit() throws InterruptedException;
+
+ void putResultUnit(MWMessage result);
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java
new file mode 100644
index 0000000..bc18a07
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+import org.apache.commons.logging.Log;
+
+public abstract class MWWorkerRunner {
+ private static final Log LOG = LogFactory.getLog(MWWorkerRunner.class);
+ private String hostname;
+ private int port;
+
+ public void init() throws InterruptedException, ParseException, IOException {
+ processEnvironmentVariables();
+ Configuration conf = new Configuration();
+ InetSocketAddress addr = new InetSocketAddress(hostname, port);
+ MWProtocol client = null;
+
+ try {
+ client = (MWProtocol) RPC.waitForProxy(MWProtocol.class,
+ MWProtocol.versionID, addr, conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ while (true) {
+ MWMessage workUnit = client.getWorkUnit();
+ if (! workUnit.isValid())
+ continue;
+ if (workUnit.isPoison())
+ break;
+ else {
+ MWMessage resultUnit = doWork(workUnit);
+ client.putResultUnit(resultUnit);
+ }
+ }
+ }
+
+ private void processEnvironmentVariables() throws ParseException, IOException {
+ Map envs = System.getenv();
+ String masterHostnameVariable = new String("MASTERHOSTNAME");
+ hostname = envs.get(masterHostnameVariable);
+ String masterPortVariable = new String("MASTERPORT");
+ port = Integer.parseInt(envs.get(masterPortVariable));
+ LOG.info("Hostname:" + hostname + " Port: " + port);
+ }
+
+ public abstract MWMessage doWork(MWMessage workUnit);
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java
new file mode 100644
index 0000000..ea5a41b
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java
@@ -0,0 +1,11 @@
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class Utility {
+ public static String getHostname() throws UnknownHostException {
+ InetAddress address = InetAddress.getLocalHost();
+ return address.getHostName();
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java
new file mode 100644
index 0000000..3658714
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class YarnRPCBuilder {
+ private static Configuration configuration;
+ private static YarnRPC yarnRPC;
+ private static YarnConfiguration yarnConf;
+ private static boolean initialized = false;
+
+ public static synchronized Configuration getConf() {
+ if(! initialized)
+ init();
+ return configuration;
+ }
+
+ public static synchronized YarnConfiguration getYarnConf() {
+ if(! initialized)
+ init();
+ return yarnConf;
+ }
+
+ public static synchronized YarnRPC getYarnRPC() {
+ if(! initialized)
+ init();
+ return yarnRPC;
+ }
+
+ private static synchronized void init() {
+ if(! initialized) {
+ initialized = true;
+ configuration = new Configuration();
+ yarnRPC = YarnRPC.create(configuration);
+ yarnConf = new YarnConfiguration(configuration);
+ }
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml
new file mode 100644
index 0000000..fa1adc6
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml
@@ -0,0 +1,17 @@
+
+
+ hadoop-yarn-applications-masterworker
+ org.apache.hadoop
+ 3.0.0-SNAPSHOT
+
+ 4.0.0
+ org.apache.hadoop
+ hadoop-yarn-applications-masterworker-example
+
+
+ org.apache.hadoop
+ hadoop-yarn-applications-masterworker-core
+ 3.0.0-SNAPSHOT
+
+
+
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java
new file mode 100644
index 0000000..716e701
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworkerexample;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.applications.masterworker.MWMasterRunner;
+import org.apache.hadoop.yarn.applications.masterworker.MWApplicationMaster;
+
+public class MWMaster extends MWApplicationMaster {
+ private static final Log LOG = LogFactory.getLog(MWMaster.class);
+
+ public static void main(String[] args) throws InterruptedException, ParseException, IOException, URISyntaxException {
+ MWMaster curr = new MWMaster();
+ curr.initiate(args);
+ curr.terminate();
+ }
+
+ @Override
+ public void manageWorkers() {
+
+ try {
+ addWorker();
+ addWorker();
+ } catch (URISyntaxException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (InterruptedException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ for(int i = 0; i < 100; i++) {
+ WorkUnit curr = new WorkUnit();
+ curr.setData(i);
+ addWork(curr);
+ }
+ for(int i = 0; i < 100; i++) {
+ try {
+ ResultUnit curr = (ResultUnit) waitForResult();
+ LOG.info("Receiveing Result" + curr.getData());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ killWorker();
+ killWorker();
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java
new file mode 100644
index 0000000..50433c9
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworkerexample;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.applications.masterworker.MWMessage;
+import org.apache.hadoop.yarn.applications.masterworker.MWWorkerRunner;
+
+public class MWWorker extends MWWorkerRunner {
+ public static void main(String[] args) {
+ MWWorker curr = new MWWorker();
+ try {
+ curr.init();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public MWMessage doWork(MWMessage workUnit) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ ResultUnit result = new ResultUnit();
+ int got = ((WorkUnit) workUnit).getData();
+ System.out.println(got);
+ result.setData(got);
+ return result;
+ }
+
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java
new file mode 100644
index 0000000..b84f7a8
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworkerexample;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.applications.masterworker.MWMessage;
+
+
+public class ResultUnit extends MWMessage {
+ int data;
+
+ public int getData() {
+ return data;
+ }
+
+ public void setData(int data) {
+ this.data = data;
+ }
+
+ @Override
+ public void writeWorkUnit(DataOutput out) throws IOException {
+ out.writeInt(data);
+ }
+
+ @Override
+ public void readFieldsWorkUnit(DataInput in) throws IOException {
+ data = in.readInt();
+ }
+}
\ No newline at end of file
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java
new file mode 100644
index 0000000..d34d185
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.masterworkerexample;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.applications.masterworker.MWMessage;
+
+public class WorkUnit extends MWMessage {
+ int data;
+
+ public int getData() {
+ return data;
+ }
+
+ public void setData(int data) {
+ this.data = data;
+ }
+
+ @Override
+ public void writeWorkUnit(DataOutput out) throws IOException {
+ out.writeInt(data);
+ }
+
+ @Override
+ public void readFieldsWorkUnit(DataInput in) throws IOException {
+ data = in.readInt();
+ }
+}
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml
new file mode 100644
index 0000000..4a769e0
--- /dev/null
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+
+ hadoop-yarn-applications
+ org.apache.hadoop
+ 3.0.0-SNAPSHOT
+
+ 4.0.0
+ org.apache.hadoop
+ hadoop-yarn-applications-masterworker
+ 3.0.0-SNAPSHOT
+ hadoop-yarn-applications-masterworker
+ pom
+
+
+ hadoop-yarn-applications-masterworker-core
+ hadoop-yarn-applications-masterworker-example
+
+
diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
index fd51584..799101f 100644
--- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
+++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml
@@ -27,5 +27,6 @@
hadoop-yarn-applications-distributedshell
+ hadoop-yarn-applications-masterworker