diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml new file mode 100644 index 0000000..f1bcc02 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/pom.xml @@ -0,0 +1,39 @@ + + + hadoop-yarn-applications-masterworker + org.apache.hadoop + 3.0.0-SNAPSHOT + + 4.0.0 + + org.apache.hadoop + hadoop-yarn-applications-masterworker-core + jar + + hadoop-yarn-applications-masterworker-core + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 3.8.1 + test + + + org.apache.hadoop + hadoop-client + 0.23.1 + + + org.apache.hadoop + hadoop-common + 0.22.0 + + + diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java new file mode 100644 index 0000000..fbeb05b --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/AMRMProtocolWraper.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; + +public class AMRMProtocolWraper extends Thread { + private static final Log LOG = LogFactory.getLog(AMRMProtocolWraper.class); + + Configuration configuration; + YarnRPC yarnRPC; + YarnConfiguration yarnConfiguration; + + AMRMProtocol amrmProtocol; + ApplicationAttemptId appAttemptID; + AtomicInteger rmRequestID = new AtomicInteger(); + AtomicInteger numCompletedContainers = new AtomicInteger(); + CopyOnWriteArrayList releasedContainers = new CopyOnWriteArrayList(); + volatile boolean run; + AtomicInteger containersToRequest = new AtomicInteger(); + LinkedBlockingQueue receivedContainers; + + public void init(ApplicationAttemptId appAttemptID) throws YarnRemoteException { + init(appAttemptID, "", 0, ""); + } + + public void init(ApplicationAttemptId appAttemptID, + String appMasterHostname, + int appMasterRpcPort, + String appMasterTrackingUrl) throws YarnRemoteException { + + this.appAttemptID = appAttemptID; + receivedContainers = new LinkedBlockingQueue(); + + configuration = YarnRPCBuilder.getConf(); + yarnRPC = YarnRPCBuilder.getYarnRPC(); + yarnConfiguration = YarnRPCBuilder.getYarnConf(); + + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConfiguration.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + amrmProtocol = ((AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress, configuration)); + RegisterApplicationMasterRequest registerApplicationMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); + registerApplicationMasterRequest.setApplicationAttemptId(appAttemptID); + registerApplicationMasterRequest.setHost(appMasterHostname); + registerApplicationMasterRequest.setRpcPort(appMasterRpcPort); + registerApplicationMasterRequest.setTrackingUrl(appMasterTrackingUrl); + amrmProtocol.registerApplicationMaster(registerApplicationMasterRequest); + this.start(); + } + + public Container requestContainer() throws InterruptedException { + containersToRequest.incrementAndGet(); + return receivedContainers.take(); + } + + void terminate() throws YarnRemoteException { + run = false; + try { + this.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setAppAttemptId(appAttemptID); + finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + amrmProtocol.finishApplicationMaster(finishReq); + } + + @Override + public void run() { + run = true; + while(run) { + try { + int toRequest = containersToRequest.get(); + AMResponse amResponse = getContainers(toRequest); + int obtainedContainerCount = amResponse.getAllocatedContainers().size(); + if (obtainedContainerCount == 1) { + containersToRequest.decrementAndGet(); + List allocatedContainers = amResponse.getAllocatedContainers(); + for (Container container : allocatedContainers) { + receivedContainers.put(container); + } + } + } catch (YarnRemoteException yarnRemoteException) { + yarnRemoteException.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + private AMResponse getContainers(int numContainers) throws YarnRemoteException { + List resourceReq = new ArrayList(); + + ResourceRequest request = Records.newRecord(ResourceRequest.class); + request.setHostName("*"); + request.setNumContainers(numContainers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + request.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(10); + request.setCapability(capability); + resourceReq.add(request); + + AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(rmRequestID.incrementAndGet()); + req.setApplicationAttemptId(appAttemptID); + req.addAllAsks(resourceReq); + req.addAllReleases(releasedContainers); + req.setProgress((float)numCompletedContainers.get()); + AllocateResponse resp = amrmProtocol.allocate(req); + return resp.getAMResponse(); + } + +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java new file mode 100644 index 0000000..62583b6 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ApplicationSubmissionContextBuilder.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.Records; + +public class ApplicationSubmissionContextBuilder { + private ApplicationSubmissionContext applicationSubmissionContext; + + public void init() { + applicationSubmissionContext = Records.newRecord(ApplicationSubmissionContext.class); + } + + public void setApplicationID(ApplicationId applicationId) { + applicationSubmissionContext.setApplicationId(applicationId); + } + + public void setApplicationName(String applicationName) { + applicationSubmissionContext.setApplicationName(applicationName); + } + + public void setContainer(ContainerLaunchContext containerLaunchContext) { + applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); + } + + public void setQueue(String applicationQueue) { + applicationSubmissionContext.setQueue(applicationQueue); + } + + public void setUser(String applicationUser) { + applicationSubmissionContext.setUser(applicationUser); + } + + public void setPriority(int priorityValue) { + Priority priority = Records.newRecord(Priority.class); + priority.setPriority(priorityValue); + applicationSubmissionContext.setPriority(priority); + } + + public SubmitApplicationRequest build() { + SubmitApplicationRequest submitApplicationRequest = Records.newRecord(SubmitApplicationRequest.class); + submitApplicationRequest.setApplicationSubmissionContext(applicationSubmissionContext); + return submitApplicationRequest; + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java new file mode 100644 index 0000000..b40b925 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClasspathUtility.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class ClasspathUtility { + + private static final Log LOG = LogFactory.getLog(ClasspathUtility.class); + + public static String getClasspath() { + String classPathEnv = "${CLASSPATH}" + + ":./*" + + ":$HADOOP_CONF_DIR" + + ":$HADOOP_COMMON_HOME/share/hadoop/common/*" + + ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" + + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" + + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" + + ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*" + + ":./log4j.properties:"; + classPathEnv += ":" + ClasspathUtility.getTestRuntimeClasspath(); + return classPathEnv; + } + + public static String getTestRuntimeClasspath() { + + InputStream classpathFileStream = null; + BufferedReader reader = null; + String envClassPath = ""; + + LOG.info("Trying to generate classpath for app master from current thread's classpath"); + try { + + // Create classpath from generated classpath + // Check maven ppom.xml for generated classpath info + // Works if compile time env is same as runtime. Mainly tests. + ClassLoader thisClassLoader = Thread.currentThread() + .getContextClassLoader(); + String generatedClasspathFile = "yarn-apps-ds-generated-classpath"; + classpathFileStream = thisClassLoader + .getResourceAsStream(generatedClasspathFile); + if (classpathFileStream == null) { + LOG.info("Could not classpath resource from class loader"); + return envClassPath; + } + LOG.info("Readable bytes from stream=" + classpathFileStream.available()); + reader = new BufferedReader(new InputStreamReader(classpathFileStream)); + String cp = reader.readLine(); + if (cp != null) { + envClassPath += cp.trim() + ":"; + } + // Put the file itself on classpath for tasks. + envClassPath += thisClassLoader.getResource(generatedClasspathFile) + .getFile(); + } catch (IOException e) { + LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + + e.getMessage()); + } + + try { + if (classpathFileStream != null) { + classpathFileStream.close(); + } + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.info("Failed to close class path file stream or reader. Error=" + + e.getMessage()); + } + + return envClassPath; + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java new file mode 100644 index 0000000..0af178e --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ClientRMProtocolWrapper.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.net.InetSocketAddress; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; + +public class ClientRMProtocolWrapper { + private static final Log LOG = LogFactory.getLog(ClientRMProtocolWrapper.class); + private Configuration configuration; + private YarnRPC yarnRPC; + private ClientRMProtocol clientRMProtocol; + + public void init() { + + configuration = YarnRPCBuilder.getConf(); + yarnRPC = YarnRPCBuilder.getYarnRPC(); + YarnConfiguration yarnConf = YarnRPCBuilder.getYarnConf(); + + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS,YarnConfiguration.DEFAULT_RM_ADDRESS)); + clientRMProtocol = ((ClientRMProtocol) yarnRPC.getProxy(ClientRMProtocol.class, rmAddress, configuration)); + } + + public ApplicationId getNewApplicationId() throws YarnRemoteException { + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = clientRMProtocol.getNewApplication(request); + return response.getApplicationId(); + } + + public void submitApplication(SubmitApplicationRequest submitApplicationRequest) throws YarnRemoteException { + clientRMProtocol.submitApplication(submitApplicationRequest); + } + + public ApplicationReport getApplicationReport(ApplicationId applicationId) throws YarnRemoteException { + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(applicationId); + GetApplicationReportResponse reportResponse = clientRMProtocol.getApplicationReport(reportRequest); + return reportResponse.getApplicationReport(); + } + + public void killApplication(ApplicationId applicationId) throws YarnRemoteException { + KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class); + killRequest.setApplicationId(applicationId); + clientRMProtocol.forceKillApplication(killRequest); + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java new file mode 100644 index 0000000..179f033 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerLaunchContextBuilder.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +public class ContainerLaunchContextBuilder { + private static final Log LOG = LogFactory.getLog(ContainerLaunchContextBuilder.class); + Configuration configuration; + YarnRPC yarnRPC; + YarnConfiguration yarnConfiguration; + ContainerLaunchContext containerLaunchContext; + Map resources; + Map environment; + Vector commandFagments; + void init() { + + configuration = YarnRPCBuilder.getConf(); + yarnRPC = YarnRPCBuilder.getYarnRPC(); + yarnConfiguration = YarnRPCBuilder.getYarnConf(); + + containerLaunchContext = Records.newRecord(ContainerLaunchContext.class); + resources = new HashMap(); + environment = new HashMap(); + commandFagments = new Vector(); + } + + public void addLocalResource(String localResourceName, + String remoteResourceName, + String applicationName, + String environmentPrefix, + ApplicationId applicationID) throws IOException { + FileSystem fs = FileSystem.get(configuration); + Path src = new Path(localResourceName); + String pathSuffix = applicationName + "/" + applicationID.getId() + "/" + remoteResourceName; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + fs.copyFromLocalFile(false, true, src, dst); + + + String fileLocation = dst.toUri().toString(); + FileStatus jarFileStatus = fs.getFileStatus(dst); + long fileLen = jarFileStatus.getLen(); + long fileTimestamp = jarFileStatus.getModificationTime(); + + LOG.info("Transfering local resource, " + fileLocation + " " + fileLen + " " + fileTimestamp); + + + this.addEnvironmentVariable(environmentPrefix + "LOCATION", fileLocation); + this.addEnvironmentVariable(environmentPrefix + "TIMESTAMP", Long.toString(fileTimestamp)); + this.addEnvironmentVariable(environmentPrefix + "LEN", Long.toString(fileLen)); + + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + amJarRsrc.setType(LocalResourceType.FILE); + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + amJarRsrc.setTimestamp(fileTimestamp); + amJarRsrc.setSize(fileLen); + resources.put(remoteResourceName, amJarRsrc); + } + + public void addDFSResource(String dfsResourceName, + long timestamp, + long length, + String destinationName) throws URISyntaxException { + LocalResource shellRsrc = Records.newRecord(LocalResource.class); + shellRsrc.setType(LocalResourceType.FILE); + shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dfsResourceName))); + shellRsrc.setTimestamp(timestamp); + shellRsrc.setSize(length); + resources.put(destinationName, shellRsrc); + } + + private void finalizeResources() { + containerLaunchContext.setLocalResources(resources); + } + + public void addEnvironmentVariable(String key, String value) { + LOG.info("Added Environment Variable, " + key + " " + value); + environment.put(key, value); + } + + private void finalizeEnvironment() { + containerLaunchContext.setEnvironment(environment); + } + + public void addCommandFragment(CharSequence given) { + commandFagments.add(given); + } + + private void finalizeCommand() { + StringBuilder command = new StringBuilder(); + for (CharSequence str : commandFagments) { + command.append(str).append(" "); + } + List commands = new ArrayList(); + commands.add(command.toString()); + containerLaunchContext.setCommands(commands); + } + + public void setMemory(int memory) { + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + containerLaunchContext.setResource(capability); + } + + public ContainerLaunchContext build() { + finalizeCommand(); + finalizeEnvironment(); + finalizeResources(); + return containerLaunchContext; + } + +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java new file mode 100644 index 0000000..a6e8d02 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/ContainerManagerWrapper.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; + +public class ContainerManagerWrapper extends Thread { + private static final Log LOG = LogFactory.getLog(ContainerManagerWrapper.class); + private ContainerManager containerManager; + private Container container; + + private Configuration conf; + private YarnRPC yarnRPC; + private YarnConfiguration yarnConfiguration; + + public void init(Container container, ContainerLaunchContext containerLaunchContext) throws IOException { + conf = YarnRPCBuilder.getConf(); + yarnRPC = YarnRPCBuilder.getYarnRPC(); + yarnConfiguration = YarnRPCBuilder.getYarnConf(); + + containerLaunchContext.setContainerId(container.getId()); + containerLaunchContext.setResource(container.getResource()); + containerLaunchContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); + + this.container = container; + + String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); + InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); + containerManager = ((ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddress, yarnConfiguration)); + StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class); + startContainerRequest.setContainerLaunchContext(containerLaunchContext); + containerManager.startContainer(startContainerRequest); + this.start(); + } + + @Override + public void run() { + while(true) { + GetContainerStatusRequest getContainerStatusRequest = Records.newRecord(GetContainerStatusRequest.class); + getContainerStatusRequest.setContainerId(container.getId()); + try { + GetContainerStatusResponse getContainerStatusResponse = containerManager.getContainerStatus(getContainerStatusRequest); + ContainerStatus containerStatus = getContainerStatusResponse.getStatus(); + ContainerState containerState = containerStatus.getState(); + if(containerState == ContainerState.COMPLETE) + break; + } catch (YarnRemoteException e) { + e.printStackTrace(); + } + } + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java new file mode 100644 index 0000000..ff3b338 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWApplicationMaster.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +public abstract class MWApplicationMaster extends MWMasterRunner { + + private static final Log LOG = LogFactory.getLog(MWApplicationMaster.class); + private ApplicationAttemptId appAttemptID; + private String workerClass = ""; + private String workerJarPath = ""; + private long workerJarTimestamp; + private long workerJarLen; + private String masterJarPath = ""; + private long masterJarTimestamp; + private long masterJarLen; + private String hostname; + private int port; + private List containerManagerWrapperList = new ArrayList(); + private AMRMProtocolWraper amrmProtocolWraper; + + public void initiate(String[] args) throws ParseException, IOException { + LOG.info("Starting Application Master"); + processEnvironmentVariables(args); + amrmProtocolWraper = new AMRMProtocolWraper(); + amrmProtocolWraper.init(appAttemptID); + hostname = Utility.getHostname(); + port = super.init(hostname); + LOG.info("Hostname:" + hostname + " Port: " + port); + super.start(); + } + + public void addWorker() throws URISyntaxException, InterruptedException, IOException { + Container container = amrmProtocolWraper.requestContainer(); + + ContainerLaunchContextBuilder containerLaunchContextBuilder = new ContainerLaunchContextBuilder(); + containerLaunchContextBuilder.init(); + + containerLaunchContextBuilder.addDFSResource(masterJarPath,masterJarTimestamp,masterJarLen,"AppMaster.jar"); + containerLaunchContextBuilder.addDFSResource(workerJarPath,workerJarTimestamp,workerJarLen,"AppWorker.jar"); + + containerLaunchContextBuilder.addEnvironmentVariable("MASTERHOSTNAME", hostname); + containerLaunchContextBuilder.addEnvironmentVariable("MASTERPORT", Integer.toString(port)); + containerLaunchContextBuilder.addEnvironmentVariable("CLASSPATH", ClasspathUtility.getClasspath()); + containerLaunchContextBuilder.addCommandFragment("${JAVA_HOME}" + "/bin/java"); + containerLaunchContextBuilder.addCommandFragment("-Xmx10m"); + containerLaunchContextBuilder.addCommandFragment(workerClass); + + containerLaunchContextBuilder.addCommandFragment("1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stdout"); + containerLaunchContextBuilder.addCommandFragment("2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stderr"); + + containerLaunchContextBuilder.setMemory(10); + ContainerLaunchContext containerLaunchContext = containerLaunchContextBuilder.build(); + ContainerManagerWrapper containerManagerWrapper = new ContainerManagerWrapper(); + containerManagerWrapper.init(container, containerLaunchContext); + containerManagerWrapperList.add(containerManagerWrapper); + } + + public void terminate() throws YarnRemoteException, InterruptedException { + for (ContainerManagerWrapper containerManagerWrapper : containerManagerWrapperList) { + try { + containerManagerWrapper.join(); + } catch (InterruptedException e) { + LOG.info("Exception thrown in thread join: " + e.getMessage()); + e.printStackTrace(); + } + } + super.join(); + amrmProtocolWraper.terminate(); + } + + + private boolean processEnvironmentVariables(String[] args) throws ParseException, IOException { + + Map envs = System.getenv(); + + ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); + appAttemptID = containerId.getApplicationAttemptId(); + + String workerJarLocationVariable = new String("WORKERJARLOCATION"); + String workerJarTimestampVariable = new String("WORKERJARTIMESTAMP"); + String workerJarLenVariable = new String("WORKERJARLEN"); + + workerJarPath = envs.get(workerJarLocationVariable); + workerJarTimestamp = Long.valueOf(envs.get(workerJarTimestampVariable)); + workerJarLen = Long.valueOf(envs.get(workerJarLenVariable)); + + String masterJarLocationVariable = new String("MASTERJARLOCATION"); + String masterJarTimestampVariable = new String("MASTERJARTIMESTAMP"); + String masterJarLenVariable = new String("MASTERJARLEN"); + + masterJarPath = envs.get(masterJarLocationVariable); + masterJarTimestamp = Long.valueOf(envs.get(masterJarTimestampVariable)); + masterJarLen = Long.valueOf(envs.get(masterJarLenVariable)); + + String workerClassVariable = new String("WORKERCLASS"); + workerClass = envs.get(workerClassVariable); + + return true; + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java new file mode 100644 index 0000000..53796e0 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWClient.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; + +public class MWClient { + private static final Log LOG = LogFactory.getLog(MWClient.class); + private static String masterWorkerLib; + private static String masterWorkerApp; + private static String masterClass; + private static String workerClass; + + public static void main(String[] args) throws IOException, InterruptedException, ParseException { + + processCommandlineArguments(args); + + ClientRMProtocolWrapper clientRMProtocolWrapper = new ClientRMProtocolWrapper(); + clientRMProtocolWrapper.init(); + + ApplicationId applicationId = clientRMProtocolWrapper.getNewApplicationId(); + + ContainerLaunchContextBuilder containerLaunchContextBuilder = new ContainerLaunchContextBuilder(); + containerLaunchContextBuilder.init(); + containerLaunchContextBuilder.addLocalResource(masterWorkerLib, "AppMaster.jar", "MasterWorker", "MASTERJAR", applicationId); + containerLaunchContextBuilder.addLocalResource(masterWorkerApp, "AppWorker.jar", "MasterWorker", "WORKERJAR", applicationId); + + containerLaunchContextBuilder.addEnvironmentVariable("CLASSPATH", ClasspathUtility.getClasspath()); + containerLaunchContextBuilder.addEnvironmentVariable("WORKERCLASS", workerClass); + + containerLaunchContextBuilder.addCommandFragment("${JAVA_HOME}" + "/bin/java"); + containerLaunchContextBuilder.addCommandFragment("-Xmx10m"); + containerLaunchContextBuilder.addCommandFragment(masterClass); + containerLaunchContextBuilder.addCommandFragment("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/master.stdout"); + containerLaunchContextBuilder.addCommandFragment("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/master.stderr"); + containerLaunchContextBuilder.setMemory(10); + ContainerLaunchContext containerLaunchContext = containerLaunchContextBuilder.build(); + + ApplicationSubmissionContextBuilder applicationSubmissionContextBuilder = new ApplicationSubmissionContextBuilder(); + applicationSubmissionContextBuilder.init(); + applicationSubmissionContextBuilder.setApplicationID(applicationId); + applicationSubmissionContextBuilder.setApplicationName("MasterWorker"); + applicationSubmissionContextBuilder.setContainer(containerLaunchContext); + SubmitApplicationRequest submitApplicationRequest = applicationSubmissionContextBuilder.build(); + + clientRMProtocolWrapper.submitApplication(submitApplicationRequest); + + while(true) { + Thread.sleep(1000); + ApplicationReport applicationReport = clientRMProtocolWrapper.getApplicationReport(applicationId); + + LOG.info("Got application report from ASM for" + + ", appId=" + applicationId.getId() + + ", clientToken=" + applicationReport.getClientToken() + + ", appDiagnostics=" + applicationReport.getDiagnostics() + + ", appMasterHost=" + applicationReport.getHost() + + ", appQueue=" + applicationReport.getQueue() + + ", appMasterRpcPort=" + applicationReport.getRpcPort() + + ", appStartTime=" + applicationReport.getStartTime() + + ", yarnAppState=" + applicationReport.getYarnApplicationState().toString() + + ", distributedFinalState=" + applicationReport.getFinalApplicationStatus().toString() + + ", appTrackingUrl=" + applicationReport.getTrackingUrl() + + ", appUser=" + applicationReport.getUser()); + + YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState(); + FinalApplicationStatus finalApplicationStatus = applicationReport.getFinalApplicationStatus(); + if (yarnApplicationState == YarnApplicationState.FINISHED) { + if (finalApplicationStatus == FinalApplicationStatus.SUCCEEDED) { + LOG.info("Application has completed successfully."); + break; + } + else { + LOG.info("Application has completed unsuccessfully."); + break; + } + } + if (yarnApplicationState == YarnApplicationState.FAILED) { + LOG.info("Application master failed."); + break; + } + } + } + + public static void processCommandlineArguments(String[] args) throws ParseException, IOException { + Options opts = new Options(); + opts.addOption("masterworkerlib", true, "Path of the Master-Worker Library jar"); + opts.addOption("masterworkerapp", true, "Path of the Master-Worker Application jar"); + opts.addOption("masterclass", true, "Class for Master"); + opts.addOption("workerclass", true, "Class for Worker"); + + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (!cliParser.hasOption("masterworkerlib")) { + throw new IllegalArgumentException("Path of the Master-Worker Library jar not specified"); + } + if (!cliParser.hasOption("masterworkerapp")) { + throw new IllegalArgumentException("Path of the Master-Worker Application jar not specified"); + } + if (!cliParser.hasOption("masterclass")) { + throw new IllegalArgumentException("Class for Master not specified"); + } + if (!cliParser.hasOption("workerclass")) { + throw new IllegalArgumentException("Class for Worker not specified"); + } + + masterWorkerLib = cliParser.getOptionValue("masterworkerlib"); + masterWorkerApp = cliParser.getOptionValue("masterworkerapp"); + masterClass = cliParser.getOptionValue("masterclass"); + workerClass = cliParser.getOptionValue("workerclass"); + } + +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java new file mode 100644 index 0000000..f7859aa --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMasterRunner.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; + +public abstract class MWMasterRunner extends Thread implements MWProtocol { + private static final Log LOG = LogFactory.getLog(MWMasterRunner.class); + private Server server; + private BlockingQueue workUnitQueue; + private BlockingQueue resultUnitQueue; + + public int init(String hostname) throws IOException { + Configuration conf = new Configuration(); + workUnitQueue = new LinkedBlockingQueue(); + resultUnitQueue = new LinkedBlockingQueue(); + server = RPC.getServer(MWProtocol.class, this, hostname, 0, conf); + server.start(); + InetSocketAddress inetSocketAddress = server.getListenerAddress(); + return inetSocketAddress.getPort(); + } + + @Override + public void run() { + manageWorkers(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + server.stop(); + } + + @Override + public MWMessage getWorkUnit() throws InterruptedException { + MWMessage curr = workUnitQueue.poll(); + if(curr == null) { + curr = new MWMessage(); + curr.makeInvalid(); + } + return curr; + } + + @Override + public void putResultUnit(MWMessage result) { + resultUnitQueue.add(result); + } + + public void addWork(MWMessage workUnit) { + workUnitQueue.add(workUnit); + } + + public void killWorker() { + MWMessage curr = new MWMessage(); + curr.makePoison(); + workUnitQueue.add(curr); + } + + public MWMessage waitForResult() throws InterruptedException { + return resultUnitQueue.take(); + } + + public abstract void manageWorkers(); + +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java new file mode 100644 index 0000000..8753fdd --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWMessage.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class MWMessage implements Writable { + boolean isPoison; + boolean isValid; + + public MWMessage() { + isValid = true; + isPoison = false; + } + + public boolean isValid() { + return isValid; + } + + public void makeInvalid() { + isValid = false; + } + + public boolean isPoison() { + return isPoison; + } + + public void makePoison() { + isPoison = true; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(isValid); + out.writeBoolean(isPoison); + writeWorkUnit(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + isValid = in.readBoolean(); + isPoison = in.readBoolean(); + readFieldsWorkUnit(in); + } + + public void writeWorkUnit(DataOutput out) throws IOException { + + } + + public void readFieldsWorkUnit(DataInput in) throws IOException { + + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java new file mode 100644 index 0000000..fbc5557 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWProtocol.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +public interface MWProtocol { + public static final long versionID = 1L; + + MWMessage getWorkUnit() throws InterruptedException; + + void putResultUnit(MWMessage result); +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java new file mode 100644 index 0000000..bc18a07 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/MWWorkerRunner.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +import org.apache.commons.logging.Log; + +public abstract class MWWorkerRunner { + private static final Log LOG = LogFactory.getLog(MWWorkerRunner.class); + private String hostname; + private int port; + + public void init() throws InterruptedException, ParseException, IOException { + processEnvironmentVariables(); + Configuration conf = new Configuration(); + InetSocketAddress addr = new InetSocketAddress(hostname, port); + MWProtocol client = null; + + try { + client = (MWProtocol) RPC.waitForProxy(MWProtocol.class, + MWProtocol.versionID, addr, conf); + } catch (IOException e) { + e.printStackTrace(); + } + + while (true) { + MWMessage workUnit = client.getWorkUnit(); + if (! workUnit.isValid()) + continue; + if (workUnit.isPoison()) + break; + else { + MWMessage resultUnit = doWork(workUnit); + client.putResultUnit(resultUnit); + } + } + } + + private void processEnvironmentVariables() throws ParseException, IOException { + Map envs = System.getenv(); + String masterHostnameVariable = new String("MASTERHOSTNAME"); + hostname = envs.get(masterHostnameVariable); + String masterPortVariable = new String("MASTERPORT"); + port = Integer.parseInt(envs.get(masterPortVariable)); + LOG.info("Hostname:" + hostname + " Port: " + port); + } + + public abstract MWMessage doWork(MWMessage workUnit); +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java new file mode 100644 index 0000000..ea5a41b --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/Utility.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.yarn.applications.masterworker; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class Utility { + public static String getHostname() throws UnknownHostException { + InetAddress address = InetAddress.getLocalHost(); + return address.getHostName(); + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java new file mode 100644 index 0000000..3658714 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-core/src/main/java/org/apache/hadoop/yarn/applications/masterworker/YarnRPCBuilder.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +public class YarnRPCBuilder { + private static Configuration configuration; + private static YarnRPC yarnRPC; + private static YarnConfiguration yarnConf; + private static boolean initialized = false; + + public static synchronized Configuration getConf() { + if(! initialized) + init(); + return configuration; + } + + public static synchronized YarnConfiguration getYarnConf() { + if(! initialized) + init(); + return yarnConf; + } + + public static synchronized YarnRPC getYarnRPC() { + if(! initialized) + init(); + return yarnRPC; + } + + private static synchronized void init() { + if(! initialized) { + initialized = true; + configuration = new Configuration(); + yarnRPC = YarnRPC.create(configuration); + yarnConf = new YarnConfiguration(configuration); + } + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml new file mode 100644 index 0000000..fa1adc6 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/pom.xml @@ -0,0 +1,17 @@ + + + hadoop-yarn-applications-masterworker + org.apache.hadoop + 3.0.0-SNAPSHOT + + 4.0.0 + org.apache.hadoop + hadoop-yarn-applications-masterworker-example + + + org.apache.hadoop + hadoop-yarn-applications-masterworker-core + 3.0.0-SNAPSHOT + + + diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java new file mode 100644 index 0000000..716e701 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWMaster.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworkerexample; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.applications.masterworker.MWMasterRunner; +import org.apache.hadoop.yarn.applications.masterworker.MWApplicationMaster; + +public class MWMaster extends MWApplicationMaster { + private static final Log LOG = LogFactory.getLog(MWMaster.class); + + public static void main(String[] args) throws InterruptedException, ParseException, IOException, URISyntaxException { + MWMaster curr = new MWMaster(); + curr.initiate(args); + curr.terminate(); + } + + @Override + public void manageWorkers() { + + try { + addWorker(); + addWorker(); + } catch (URISyntaxException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } catch (IOException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + + for(int i = 0; i < 100; i++) { + WorkUnit curr = new WorkUnit(); + curr.setData(i); + addWork(curr); + } + for(int i = 0; i < 100; i++) { + try { + ResultUnit curr = (ResultUnit) waitForResult(); + LOG.info("Receiveing Result" + curr.getData()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + killWorker(); + killWorker(); + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java new file mode 100644 index 0000000..50433c9 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/MWWorker.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworkerexample; + +import java.io.IOException; + +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.yarn.applications.masterworker.MWMessage; +import org.apache.hadoop.yarn.applications.masterworker.MWWorkerRunner; + +public class MWWorker extends MWWorkerRunner { + public static void main(String[] args) { + MWWorker curr = new MWWorker(); + try { + curr.init(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ParseException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public MWMessage doWork(MWMessage workUnit) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + ResultUnit result = new ResultUnit(); + int got = ((WorkUnit) workUnit).getData(); + System.out.println(got); + result.setData(got); + return result; + } + +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java new file mode 100644 index 0000000..b84f7a8 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/ResultUnit.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworkerexample; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.yarn.applications.masterworker.MWMessage; + + +public class ResultUnit extends MWMessage { + int data; + + public int getData() { + return data; + } + + public void setData(int data) { + this.data = data; + } + + @Override + public void writeWorkUnit(DataOutput out) throws IOException { + out.writeInt(data); + } + + @Override + public void readFieldsWorkUnit(DataInput in) throws IOException { + data = in.readInt(); + } +} \ No newline at end of file diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java new file mode 100644 index 0000000..d34d185 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/hadoop-yarn-applications-masterworker-example/src/main/java/org/apache/hadoop/yarn/applications/masterworkerexample/WorkUnit.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.masterworkerexample; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.yarn.applications.masterworker.MWMessage; + +public class WorkUnit extends MWMessage { + int data; + + public int getData() { + return data; + } + + public void setData(int data) { + this.data = data; + } + + @Override + public void writeWorkUnit(DataOutput out) throws IOException { + out.writeInt(data); + } + + @Override + public void readFieldsWorkUnit(DataInput in) throws IOException { + data = in.readInt(); + } +} diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml new file mode 100644 index 0000000..4a769e0 --- /dev/null +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-masterworker/pom.xml @@ -0,0 +1,32 @@ + + + + + hadoop-yarn-applications + org.apache.hadoop + 3.0.0-SNAPSHOT + + 4.0.0 + org.apache.hadoop + hadoop-yarn-applications-masterworker + 3.0.0-SNAPSHOT + hadoop-yarn-applications-masterworker + pom + + + hadoop-yarn-applications-masterworker-core + hadoop-yarn-applications-masterworker-example + + diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml index fd51584..799101f 100644 --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml @@ -27,5 +27,6 @@ hadoop-yarn-applications-distributedshell + hadoop-yarn-applications-masterworker