diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 09a56ea..4d18936 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -126,6 +126,35 @@ + org.apache.hadoop + hadoop-maven-plugins + + + compile-protoc + generate-sources + + protoc + + + ${protobuf.version} + ${protoc.path} + + ${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto + ${basedir}/../../hadoop-yarn-api/src/main/proto + ${basedir}/src/main/proto + + + ${basedir}/src/main/proto + + distributedshell_service.proto + + + ${project.build.directory}/generated-sources/java + + + + + maven-jar-plugin diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..de2d4fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCService; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -238,6 +240,9 @@ @VisibleForTesting protected AtomicInteger numRequestedContainers = new AtomicInteger(); + private AtomicInteger numIncreaseAllocatedContainers = new AtomicInteger(); + private AtomicInteger numIncreaseCompletedContainers = new AtomicInteger(); + // Shell command to be executed private String shellCommand = ""; // Args to be passed to the shell command @@ -278,14 +283,19 @@ @VisibleForTesting TimelineClient timelineClient; + // IPC Server + DistributedShellIPCService ipcService; + private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; private int yarnShellIdCounter = 1; + private boolean enable_ipc = false; + @VisibleForTesting - protected final Set launchedContainers = - Collections.newSetFromMap(new ConcurrentHashMap()); + protected final ConcurrentHashMap launchedContainers = + new ConcurrentHashMap<>(); /** * @param args Command line args @@ -372,7 +382,7 @@ public boolean init(String[] args) throws ParseException, IOException { "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("debug", false, "Dump out debug information"); - + opts.addOption("enable_ipc", false, "Start up IPC service"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -401,6 +411,10 @@ public boolean init(String[] args) throws ParseException, IOException { dumpOutDebugInfo(); } + if (cliParser.hasOption("enable_ipc")) { + enable_ipc = true; + } + Map envs = System.getenv(); if (!envs.containsKey(Environment.CONTAINER_ID.name())) { @@ -574,10 +588,13 @@ public void run() throws YarnException, IOException, InterruptedException { } // Setup local RPC Server to accept status requests directly from clients - // TODO need to setup a protocol for client to be able to communicate to - // the RPC server - // TODO use the rpc port info to register with the RM for the client to - // send requests to this app master + if (enable_ipc) { + ipcService = startIPCServer(conf, this); + if (ipcService != null) { + appMasterRpcPort = ipcService.getPort(); + LOG.info("IPC service started listening on port:" + appMasterRpcPort); + } + } // Register self with ResourceManager // This will start heartbeating to the RM @@ -613,7 +630,7 @@ public void run() throws YarnException, IOException, InterruptedException { LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() + " previous attempts' running containers on AM registration."); for(Container container: previousAMRunningContainers) { - launchedContainers.add(container.getId()); + launchedContainers.putIfAbsent(container.getId(), container); } numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); @@ -633,6 +650,25 @@ public void run() throws YarnException, IOException, InterruptedException { numRequestedContainers.set(numTotalContainers); } + public void changeContainerResource( + ContainerId containerId, Resource resource) + throws IllegalArgumentException { + try { + Container container = launchedContainers.get(containerId); + if (container == null) { + LOG.error("Failed to change container resource. containerId=" + + containerId + " resource=" + resource + + ". The container does not exist."); + return; + } + amRMClient.requestContainerResourceChange(container, resource); + } catch (Exception e) { + LOG.error("Failed to change container resource. containerId=" + + containerId + " resource=" + resource + + ". Cause:" + e.getMessage()); + } + } + @VisibleForTesting void startTimelineClient(final Configuration conf) throws YarnException, IOException, InterruptedException { @@ -658,6 +694,27 @@ public Void run() throws Exception { } } + private DistributedShellIPCService startIPCServer( + final Configuration conf, final ApplicationMaster applicationMaster) + throws YarnException, IOException, InterruptedException { + try { + return appSubmitterUgi.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedShellIPCService run() throws Exception { + DistributedShellIPCService ipcService = + new DistributedShellIPCService(applicationMaster); + ipcService.init(conf); + ipcService.start(); + return ipcService; + } + }); + } catch (UndeclaredThrowableException e) { + LOG.error("Failed to start up IPC server: " + e.getCause()); + throw new YarnException(e.getCause()); + } + } + @VisibleForTesting NMCallbackHandler createNMCallbackHandler() { return new NMCallbackHandler(this); @@ -678,6 +735,11 @@ protected boolean finish() { DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } + // Stop the IPC server + if (ipcService != null) { + ipcService.stop(); + } + // Join all launched threads // needed for when we time out // and we need to release containers @@ -728,6 +790,14 @@ protected boolean finish() { timelineClient.stop(); } + if (numIncreaseAllocatedContainers.get() != + numIncreaseCompletedContainers.get()) { + LOG.error("Failed to successfully increase container resource on NM. " + + "increaseAllocated=" + numIncreaseAllocatedContainers.get() + + " increaseCompleted=" + numIncreaseCompletedContainers.get()); + success = false; + } + return success; } @@ -749,7 +819,7 @@ public void onContainersCompleted(List completedContainers) { assert (containerStatus.getState() == ContainerState.COMPLETE); // ignore containers we know nothing about - probably from a previous // attempt - if (!launchedContainers.contains(containerStatus.getContainerId())) { + if (!launchedContainers.containsKey(containerStatus.getContainerId())) { LOG.info("Ignoring completed status of " + containerStatus.getContainerId() + "; unknown container(probably launched by previous attempt)"); @@ -816,9 +886,9 @@ public void onContainersAllocated(List allocatedContainers) { + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() - + ", containerResourceMemory" + + ", containerResourceMemory=" + allocatedContainer.getResource().getMemory() - + ", containerResourceVirtualCores" + + ", containerResourceVirtualCores=" + allocatedContainer.getResource().getVirtualCores()); // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); @@ -830,13 +900,34 @@ public void onContainersAllocated(List allocatedContainers) { // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); - launchedContainers.add(allocatedContainer.getId()); + launchedContainers.putIfAbsent( + allocatedContainer.getId(), allocatedContainer); launchThread.start(); } } @Override - public void onContainersResourceChanged(List containers) {} + public void onContainersResourceChanged(List containers) { + LOG.info("Got response from RM for container resource change," + + " changedCnt=" + containers.size()); + int increased = 0; + for (Container changed : containers) { + Container original = launchedContainers.get(changed.getId()); + if (original == null) { + continue; + } + if (Resources.fitsIn(changed.getResource(), original.getResource())) { + LOG.info("Container " + changed.getId() + " resource allocation is" + + " decreased to " + changed.getResource()); + } else { + LOG.info("Container " + changed.getId() + " resource allocation is" + + " increased to " + changed.getResource()); + increased++; + nmClientAsync.increaseContainerResourceAsync(changed); + } + } + numIncreaseAllocatedContainers.addAndGet(increased); + } @Override public void onShutdownRequest() { @@ -894,6 +985,12 @@ public void onContainerStatusReceived(ContainerId containerId, } @Override + public void onContainerResourceIncreased( + ContainerId containerId, Resource resource) { + applicationMaster.numIncreaseCompletedContainers.incrementAndGet(); + } + + @Override public void onContainerStarted(ContainerId containerId, Map allServiceResponse) { if (LOG.isDebugEnabled()) { @@ -911,10 +1008,6 @@ public void onContainerStarted(ContainerId containerId, } @Override - public void onContainerResourceIncreased( - ContainerId containerId, Resource resource) {} - - @Override public void onStartContainerError(ContainerId containerId, Throwable t) { LOG.error("Failed to start Container " + containerId); containers.remove(containerId); @@ -936,7 +1029,9 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { @Override public void onIncreaseContainerResourceError( - ContainerId containerId, Throwable t) {} + ContainerId containerId, Throwable t) { + LOG.error("Failed to increase resource of Container " + containerId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 5a90880..6a62c63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +58,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -71,6 +75,8 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCProxy; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; @@ -115,7 +121,7 @@ public class Client { private static final Log LOG = LogFactory.getLog(Client.class); - + // Configuration private Configuration conf; private YarnClient yarnClient; @@ -183,6 +189,27 @@ // Timeline domain writer access control private String modifyACLs = null; + // Flag to indicate if IPC service is needed in application master + private boolean enable_ipc = false; + + // Flag to indicate if the client should be set in appmaster mode in order + // to talk to application master IPC server + private boolean appmaster = false; + + // The application ID of the application of interest in the appmaster mode + private String app_id; + + // The container ID of the container of interest in the appmaster mode + private String container_id; + + // The type of actions + private enum Action { + CHANGE_CONTAINER + } + + // The action to be performed in the appmaster mode + private Action action; + // Command line options private Options opts; @@ -212,7 +239,11 @@ public static void main(String[] args) { client.printUsage(); System.exit(-1); } - result = client.run(); + if (client.appmaster) { + result = client.runAppMasterMode(); + } else { + result = client.run(); + } } catch (Throwable t) { LOG.fatal("Error running Client", t); System.exit(1); @@ -287,6 +318,21 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("enable_ipc", false, "Start up IPC service in the " + + "application master of the distributed shell application. Once the " + + "IPC service is started, the user can create a separate client with " + + "--appmaster option specified to set the client to appmaster" + + "mode, and talk to the IPC service."); + opts.addOption("appmaster", false, "Set the client to appmaster mode, " + + "and talk to the application master IPC service of the distributed" + + "shell application. The distributed shell application must have been" + + "started previously with --enable_ipc option specified."); + opts.addOption("app_id", true, "The application Id of the application " + + "(appmaster mode only)"); + opts.addOption("action", true, "The action to be performed. " + + "(appmaster mode only)"); + opts.addOption("container_id", true, "The container Id of the container " + + "(appmaster mode only)"); } /** @@ -330,9 +376,40 @@ public boolean init(String[] args) throws ParseException { return false; } + if (cliParser.hasOption("appmaster")) { + appmaster = true; + try { + action = Enum.valueOf(Action.class, cliParser.getOptionValue("action")); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid action specified, exiting. " + + "Supported actions are CHANGE_CONTAINER"); + } + app_id = cliParser.getOptionValue("app_id"); + container_id = cliParser.getOptionValue("container_id"); + if (app_id == null || container_id == null) { + throw new IllegalArgumentException( + "Invalid app_id/container_id specified, exiting. Specified " + + "app_id=" + app_id + ", container_id=" + container_id); + } + containerMemory = Integer.parseInt( + cliParser.getOptionValue("container_memory")); + containerVirtualCores = Integer.parseInt( + cliParser.getOptionValue("container_vcores")); + if (containerMemory < 0 || containerVirtualCores < 0) { + throw new IllegalArgumentException( + "Invalid container memory/vcores specified, exiting." + + " Specified containerMemory=" + containerMemory + + ", containerVirtualCores=" + containerVirtualCores); + } + return true; + } + if (cliParser.hasOption("debug")) { debugFlag = true; + } + if (cliParser.hasOption("enable_ipc")) { + enable_ipc = true; } if (cliParser.hasOption("keep_containers_across_application_attempts")) { @@ -432,6 +509,48 @@ public boolean init(String[] args) throws ParseException { return true; } + public boolean runAppMasterMode() throws IOException, YarnException { + LOG.info("Starting Yarn Client"); + yarnClient.start(); + ApplicationId applicationId; + ContainerId containerId; + try { + applicationId = ConverterUtils.toApplicationId(app_id); + containerId = ConverterUtils.toContainerId(container_id); + } catch (Exception e) { + LOG.error("The specified application Id or container Id are invalid. " + + "applicationId=" + app_id + ", containerId=" + container_id); + return false; + } + ApplicationReport report = + yarnClient.getApplicationReport(applicationId); + String appMasterHost = getAppMasterHost(report.getHost()); + int appMasterRpcPort = report.getRpcPort(); + if (appMasterHost == null || appMasterRpcPort == -1) { + LOG.error("Failed to get distributed shell RPC service address"); + return false; + } + LOG.info("Distributed shell RPC service is available at " + + appMasterHost + ":" + appMasterRpcPort); + DistributedShellIPC distributedShellIPCClient = + DistributedShellIPCProxy.getProxy( + conf, appMasterHost, appMasterRpcPort); + Resource target = + Resource.newInstance(containerMemory, containerVirtualCores); + ContainerResourceChangeRequest changeRequest = + ContainerResourceChangeRequest.newInstance(containerId, target); + switch(action) { + case CHANGE_CONTAINER: + LOG.info("Sending request to change resource of container:" + + containerId + " to: " + target); + distributedShellIPCClient.changeContainerResource(changeRequest); + break; + default: + break; + } + return true; + } + /** * Main run function for the client * @return true if application completed successfully @@ -637,7 +756,9 @@ public boolean run() throws IOException, YarnException { if (debugFlag) { vargs.add("--debug"); } - + if (enable_ipc) { + vargs.add("--enable_ipc"); + } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); @@ -858,4 +979,21 @@ private void prepareTimelineDomain() { timelineClient.stop(); } } + + /* + * NetUtils.getHostname() returns a string in the form "hostname/ip". Parse + * out the hostname part. + */ + private String getAppMasterHost(String hostname) { + if (hostname == null) { + return null; + } + String appHostname = hostname.split("/")[0]; + try { + return InetAddress.getByName(appHostname).getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.error("Failed to get IP address of host:" + appHostname); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java new file mode 100644 index 0000000..b43b0df --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +public interface DistributedShellIPC { + + ContainerId changeContainerResource( + ContainerResourceChangeRequest containerResourceChangeRequest) throws + YarnException, IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java new file mode 100644 index 0000000..64ed3ac --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.DistributedShellIPC.DistributedShellIPCService; + +@ProtocolInfo( + protocolName = "DistributedShellIPCPB", + protocolVersion = 1) +public interface DistributedShellIPCPB + extends DistributedShellIPCService.BlockingInterface { +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java new file mode 100644 index 0000000..5a8e2c2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; + +public class DistributedShellIPCProxy { + + private static InetSocketAddress getIPCAddress(String address, int port) { + return NetUtils.createSocketAddr(address, port); + } + + public static DistributedShellIPC getProxy( + final Configuration conf, final String address, final int port) + throws IOException { + return UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + @Override + public DistributedShellIPC run() { + return (DistributedShellIPC) YarnRPC.create(conf).getProxy( + DistributedShellIPC.class, getIPCAddress(address, port), conf); + } + }); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java new file mode 100644 index 0000000..6627da4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public class DistributedShellIPCService extends AbstractService + implements DistributedShellIPC { + + private static final Log LOG = + LogFactory.getLog(DistributedShellIPCService.class); + private static final int DEFAULT_IPC_PORT = 8686; + private static final String DEFAULT_IPC_ADDRESS = "0.0.0.0"; + private static final int DEFAULT_CLIENT_THREAD_COUNT = 10; + + private Server server; + private InetSocketAddress distributedShellIPCAddress; + private ApplicationMaster applicationMaster; + private String address = DEFAULT_IPC_ADDRESS; + private int port = DEFAULT_IPC_PORT; + + public DistributedShellIPCService(ApplicationMaster applicationMaster) { + super("DistributedShellIPCService"); + this.applicationMaster = applicationMaster; + } + + public int getPort() { + return this.port; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + distributedShellIPCAddress = NetUtils.createSocketAddr(address, port); + super.init(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + Configuration conf = getConfig(); + this.server = YarnRPC.create(conf).getServer( + DistributedShellIPC.class, this, distributedShellIPCAddress, + conf, null, DEFAULT_CLIENT_THREAD_COUNT); + this.server.start(); + } + + @Override + public ContainerId changeContainerResource( + ContainerResourceChangeRequest containerResourceChangeRequest) throws + YarnException, IOException{ + ContainerId containerId = containerResourceChangeRequest.getContainerId(); + Resource capability = containerResourceChangeRequest.getCapability(); + LOG.info("Container resource change request received: containerID=" + + containerId + " resource=" + capability); + applicationMaster.changeContainerResource(containerId, capability); + return containerId; + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + super.serviceStop(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java new file mode 100644 index 0000000..1898805 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc.impl.pb.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCPB; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class DistributedShellIPCPBClientImpl implements DistributedShellIPC, + Closeable { + + private DistributedShellIPCPB proxy; + + public DistributedShellIPCPBClientImpl( + long clientVersion, InetSocketAddress addr, Configuration conf) + throws IOException { + RPC.setProtocolEngine( + conf, DistributedShellIPCPB.class, ProtobufRpcEngine.class); + proxy = RPC.getProxy( + DistributedShellIPCPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if(this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public ContainerId changeContainerResource( + ContainerResourceChangeRequest request) + throws YarnException, IOException { + ContainerResourceChangeRequestProto requestProto = + ((ContainerResourceChangeRequestPBImpl) request).getProto(); + try { + return new ContainerIdPBImpl( + proxy.changeContainerResource(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java new file mode 100644 index 0000000..5f2981d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc.impl.pb.service; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCPB; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; + +import java.io.IOException; + +public class DistributedShellIPCPBServiceImpl + implements DistributedShellIPCPB { + + private DistributedShellIPC internal; + + public DistributedShellIPCPBServiceImpl(DistributedShellIPC internal) { + this.internal = internal; + } + + @Override + public ContainerIdProto changeContainerResource( + RpcController controller, ContainerResourceChangeRequestProto proto) + throws ServiceException { + ContainerResourceChangeRequestPBImpl request = + new ContainerResourceChangeRequestPBImpl(proto); + try { + ContainerId containerId = internal.changeContainerResource(request); + return ((ContainerIdPBImpl) containerId).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto new file mode 100644 index 0000000..c2cc2c7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "DistributedShellIPC"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_protos.proto"; + +service DistributedShellIPCService { + rpc changeContainerResource(ContainerResourceChangeRequestProto) returns (ContainerIdProto); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 2789d04..0c9abfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -58,7 +58,7 @@ protected Thread createLaunchContainerThread(Container allocatedContainer, String shellId) { threadsLaunched++; - launchedContainers.add(allocatedContainer.getId()); + launchedContainers.put(allocatedContainer.getId(), allocatedContainer); yarnShellIds.add(shellId); return new Thread(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 967d172..4dc110c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -41,7 +41,11 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -148,6 +152,146 @@ public void testDSShellWithoutDomain() throws Exception { testDSShell(false); } + @Test(timeout=90000) + public void testDSShellWithIPCService() throws Exception { + LOG.info("Initializing DS Client"); + // Make sure ipc service is started with --enable_ipc option + // Submit an application with three containers (including app master) + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "sleep 20", + "--master_memory", + "1024", + "--master_vcores", + "1", + "--container_memory", + "1024", + "--container_vcores", + "1", + "--enable_ipc" + }; + Configuration conf = new Configuration(yarnCluster.getConfig()); + final Client client = new Client(conf); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + LOG.info("Query Resource Manager"); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + ApplicationId applicationId; + List containers; + // Make sure applications and containers are all started + while(true) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + if (appReport.getHost().equals("N/A") || appReport.getRpcPort() == -1) { + Thread.sleep(10); + continue; + } + containers = yarnClient.getContainers( + appReport.getCurrentApplicationAttemptId()); + if (containers.size() < 3) { + Thread.sleep(10); + continue; + } + applicationId = appReport.getApplicationId(); + break; + } + // Sleep a while to make sure containers are in RUNNING state in NM + Thread.sleep(1000); + // Exclude the non-master containers + for (ContainerReport container : containers) { + if ((container.getContainerId().getContainerId() + & ContainerId.CONTAINER_ID_BITMASK) == 1L) { + containers.remove(container); + break; + } + } + // Make sure both containers have 1G memory originally + Assert.assertEquals( + 1024, containers.get(0).getAllocatedResource().getMemory()); + Assert.assertEquals( + 1024, containers.get(1).getAllocatedResource().getMemory()); + // Increase container1 memory to 2048 + testContainerResourceChangeThroughAppMasterIPCService( + yarnClient, applicationId, containers.get(0).getContainerId(), + Resource.newInstance(2048, 1)); + // Decrease container2 memory to 512 + testContainerResourceChangeThroughAppMasterIPCService( + yarnClient, applicationId, containers.get(1).getContainerId(), + Resource.newInstance(512, 1)); + // Wait for the distributed shell application to finish + t.join(); + Assert.assertTrue(result.get()); + } + + private void testContainerResourceChangeThroughAppMasterIPCService( + YarnClient yarnClient, ApplicationId applicationId, + ContainerId containerId, Resource target) + throws Exception { + LOG.info("Initializing DS Application Master Client"); + // Create a client in appmaster mode + String[] args = { + "--appmaster", + "--action", + "CHANGE_CONTAINER", + "--app_id", + applicationId.toString(), + "--container_id", + containerId.toString(), + "--container_memory", + Integer.toString(target.getMemory()), + "--container_vcores", + Integer.toString(target.getVirtualCores()) + }; + final Client client = new Client(conf); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Application Master Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.runAppMasterMode()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + t.join(); + Assert.assertTrue(result.get()); + // Verify resource allocation has been changed in RM + while(true) { + ContainerReport container = + yarnClient.getContainerReport(containerId); + if (container.getAllocatedResource().getMemory() != target.getMemory()) { + Thread.sleep(10); + continue; + } + break; + } + } + public void testDSShell(boolean haveDomain) throws Exception { String[] args = { "--jar",