diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerReport.java index 31d28129781..3c6100ac17c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerReport.java @@ -235,4 +235,26 @@ public static ContainerReport newInstance(ContainerId containerId, @Private @Unstable public abstract void setExecutionType(ExecutionType executionType); + + /** + * Get the version of this container. The version will be incremented when + * a container is updated. + * + * @return version of this container. + */ + @Private + @Unstable + public int getVersion() { + return 0; + } + + /** + * Set the version of this container. + * @param version of this container. + */ + @Private + @Unstable + public void setVersion(int version) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d6138e865ff..a34a1e766ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -168,6 +168,7 @@ message ContainerReportProto { optional ContainerStateProto container_state = 10; optional string node_http_address = 11; optional ExecutionTypeProto executionType = 12 [default = GUARANTEED]; + optional int32 version = 13 [default = 0]; } enum YarnApplicationStateProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 7cb7ac7323f..a084969699c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -162,6 +162,35 @@ + + org.apache.hadoop + hadoop-maven-plugins + + + compile-protoc + generate-sources + + protoc + + + ${protobuf.version} + ${protoc.path} + + ${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto + ${basedir}/../../hadoop-yarn-api/src/main/proto + ${basedir}/src/main/proto + + + ${basedir}/src/main/proto + + distributedshell_service.proto + + + ${project.build.directory}/generated-sources/java + + + + maven-jar-plugin diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 76fa38f922a..db19d732f55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; @@ -106,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCService; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; @@ -122,9 +124,11 @@ import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.sun.jersey.api.client.ClientHandlerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,7 +260,7 @@ private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; // All other resources to request for the container // on which the shell command will run - private Map containerResources = new HashMap<>(); + private Map containerResources = new HashMap<>(); // Priority of the request private int requestPriority; // Execution type of the containers. @@ -357,8 +361,8 @@ private final AtomicLong allocIdCounter = new AtomicLong(1); @VisibleForTesting - protected final Set launchedContainers = - Collections.newSetFromMap(new ConcurrentHashMap()); + protected final ConcurrentHashMap launchedContainers = + new ConcurrentHashMap<>(); private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); @@ -373,6 +377,13 @@ return containerStartTimes; } + private AtomicInteger numUpdateAllocatedContainers = new AtomicInteger(); + private AtomicInteger numUpdateCompletedContainers = new AtomicInteger(); + + // IPC Server + DistributedShellIPCService ipcService; + private boolean enable_ipc = false; + /** * @param args Command line args */ @@ -494,6 +505,7 @@ public boolean init(String[] args) throws ParseException, IOException { + " the new application attempt "); opts.addOption("help", false, "Print usage"); + opts.addOption("enable_ipc", false, "Start up IPC service"); CommandLine cliParser = new GnuParser().parse(opts, args); if (args.length == 0) { @@ -532,6 +544,10 @@ public boolean init(String[] args) throws ParseException, IOException { } } + if (cliParser.hasOption("enable_ipc")) { + enable_ipc = true; + } + Map envs = System.getenv(); if (!envs.containsKey(Environment.CONTAINER_ID.name())) { @@ -646,9 +662,10 @@ public boolean init(String[] args) throws ParseException, IOException { "container_vcores", "-1")); containerResources = new HashMap<>(); if (cliParser.hasOption("container_resources")) { - Map resources = Client.parseResourcesString( + Map resources = Client.parseResourcesString( cliParser.getOptionValue("container_resources")); - for (Map.Entry entry : resources.entrySet()) { + for (Map.Entry entry : resources + .entrySet()) { containerResources.put(entry.getKey(), entry.getValue()); } } @@ -662,6 +679,7 @@ public boolean init(String[] args) throws ParseException, IOException { numTotalContainers = Integer.parseInt(cliParser.getOptionValue( "num_containers", "1")); } + if (numTotalContainers == 0) { throw new IllegalArgumentException( "Cannot run distributed shell with no containers"); @@ -766,6 +784,17 @@ public void run() throws YarnException, IOException, InterruptedException { nmClientAsync.init(conf); nmClientAsync.start(); + if (enable_ipc) { + try { + ipcService = startIPCServer(conf, this); + appMasterRpcPort = ipcService.getPort(); + LOG.info("IPC service started listening on port:" + appMasterRpcPort); + } catch (Exception e) { + throw new RuntimeException( + "Unable to start IPC server on port: " + appMasterRpcPort); + } + } + startTimelineClient(conf); if (timelineServiceV2Enabled) { // need to bind timelineClient @@ -831,7 +860,7 @@ public void run() throws YarnException, IOException, InterruptedException { LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() + " previous attempts' running containers on AM registration."); for(Container container: previousAMRunningContainers) { - launchedContainers.add(container.getId()); + launchedContainers.putIfAbsent(container.getId(), container); } numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); @@ -862,6 +891,32 @@ public void run() throws YarnException, IOException, InterruptedException { numRequestedContainers.set(numTotalContainers); } + public void updateContainer(ContainerId containerId, Resource resource) + throws IllegalArgumentException { + try { + Container container = launchedContainers.get(containerId); + if (container == null) { + LOG.error("Failed to change container resource. containerId=" + + containerId + " resource=" + resource + + ". The container does not exist."); + return; + } + + Preconditions.checkNotNull(resource, + "UpdateContainerRequest cannot be null!!"); + UpdateContainerRequest updateContainerRequest = + UpdateContainerRequest.newInstance(container.getVersion(), container.getId(), + Resources.fitsIn(resource, container.getResource()) ? + ContainerUpdateType.DECREASE_RESOURCE : ContainerUpdateType.INCREASE_RESOURCE, + resource, null); + amRMClient.requestContainerUpdate(container, updateContainerRequest); + } catch (Exception e) { + LOG.error( + "Failed to change container resource. containerId=" + containerId + + " resource=" + resource + ". Cause:" + e.getMessage()); + } + } + @VisibleForTesting void startTimelineClient(final Configuration conf) throws YarnException, IOException, InterruptedException { @@ -901,6 +956,27 @@ public Void run() throws Exception { } } + private DistributedShellIPCService startIPCServer( + final Configuration conf, final ApplicationMaster applicationMaster) + throws YarnException, IOException, InterruptedException { + try { + return appSubmitterUgi.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedShellIPCService run() throws Exception { + DistributedShellIPCService ipcService = + new DistributedShellIPCService(applicationMaster); + ipcService.init(conf); + ipcService.start(); + return ipcService; + } + }); + } catch (UndeclaredThrowableException e) { + LOG.error("Failed to start up IPC server: " + e.getCause()); + throw new YarnException(e.getCause()); + } + } + @VisibleForTesting NMCallbackHandler createNMCallbackHandler() { return new NMCallbackHandler(this); @@ -976,6 +1052,19 @@ protected boolean finish() { timelineV2Client.stop(); } + // Stop IPC server + if (ipcService != null) { + ipcService.stop(); + } + + if (numUpdateAllocatedContainers.get() != numUpdateCompletedContainers + .get()) { + LOG.error("Failed to successfully update container resource on NM. " + + "increaseAllocated=" + numUpdateAllocatedContainers.get() + + " increaseCompleted=" + numUpdateCompletedContainers.get()); + success = false; + } + return success; } @@ -1003,7 +1092,7 @@ public void onContainersCompleted(List completedContainers) { assert (containerStatus.getState() == ContainerState.COMPLETE); // ignore containers we know nothing about - probably from a previous // attempt - if (!launchedContainers.contains(containerStatus.getContainerId())) { + if (!launchedContainers.containsKey(containerStatus.getContainerId())) { LOG.info("Ignoring completed status of " + containerStatus.getContainerId() + "; unknown container(probably launched by previous attempt)"); @@ -1099,9 +1188,9 @@ public void onContainersAllocated(List allocatedContainers) { + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() - + ", containerResourceMemory" + + ", containerResourceMemory=" + allocatedContainer.getResource().getMemorySize() - + ", containerResourceVirtualCores" + + ", containerResourceVirtualCores=" + allocatedContainer.getResource().getVirtualCores()); Thread launchThread = @@ -1111,7 +1200,8 @@ public void onContainersAllocated(List allocatedContainers) { // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); - launchedContainers.add(allocatedContainer.getId()); + launchedContainers.putIfAbsent(allocatedContainer.getId(), + allocatedContainer); launchThread.start(); // Remove the corresponding request @@ -1144,6 +1234,7 @@ public void onContainersUpdated( // auto-update.containers is disabled, but this API is // under evolving and will need to be replaced by a proper new API. nmClientAsync.updateContainerResourceAsync(container.getContainer()); + numUpdateAllocatedContainers.incrementAndGet(); } } @@ -1307,11 +1398,13 @@ public void onContainerResourceIncreased( @Override public void onUpdateContainerResourceError( ContainerId containerId, Throwable t) { + LOG.error("Failed to update resource of Container " + containerId); } @Override public void onContainerResourceUpdated(ContainerId containerId, Resource resource) { + applicationMaster.numUpdateCompletedContainers.incrementAndGet(); } } @@ -1832,8 +1925,10 @@ private Resource getTaskResourceCapability() containerVirtualCores; resourceCapability.setMemorySize(containerMemory); resourceCapability.setVirtualCores(containerVirtualCores); - for (Map.Entry entry : containerResources.entrySet()) { - resourceCapability.setResourceValue(entry.getKey(), entry.getValue()); + for (Map.Entry entry : containerResources + .entrySet()) { + resourceCapability.setResourceInformation(entry.getKey(), + entry.getValue()); } return resourceCapability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index c8a71b320c0..bac2f6d2f2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -60,7 +62,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -76,9 +81,12 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCProxy; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -88,6 +96,7 @@ import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -153,7 +162,7 @@ // Amt. of virtual core resource to request for to run the App Master private int amVCores = DEFAULT_AM_VCORES; // Amount of resources to request to run the App Master - private Map amResources = new HashMap<>(); + private Map amResources = new HashMap<>(); // AM resource profile private String amResourceProfile = ""; @@ -179,7 +188,7 @@ private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; // Amt. of resources to request for container // in which shell script will be executed - private Map containerResources = new HashMap<>(); + private Map containerResources = new HashMap<>(); // container resource profile private String containerResourceProfile = ""; // No. of containers in which the shell script needs to be executed @@ -244,6 +253,27 @@ public static final String SCRIPT_PATH = "ExecScript"; + // Flag to indicate if IPC service is needed in application master + private boolean enable_ipc = false; + + // Flag to indicate if the client should be set in appmaster mode in order + // to talk to application master IPC server + private boolean appmaster = false; + + // The application ID of the application of interest in the appmaster mode + private String app_id; + + // The container ID of the container of interest in the appmaster mode + private String container_id; + + // The type of actions + private enum Action { + CHANGE_CONTAINER + } + + // The action to be performed in the appmaster mode + private Action action; + /** * @param args Command line arguments */ @@ -262,7 +292,12 @@ public static void main(String[] args) { client.printUsage(); System.exit(-1); } - result = client.run(); + + if (client.appmaster) { + result = client.runAppMasterMode(); + } else { + result = client.run(); + } } catch (Throwable t) { LOG.error("Error running Client", t); System.exit(1); @@ -390,6 +425,23 @@ public Client(Configuration conf) throws Exception { + " The \"num_containers\" option will be ignored. All requested" + " containers will be of type GUARANTEED" ); opts.addOption("application_tags", true, "Application tags."); + opts.addOption("enable_ipc", false, + "Start up IPC service in the " + + "application master of the distributed shell application. Once the " + + "IPC service is started, the user can create a separate client with " + + "--appmaster option specified to set the client to appmaster " + + "mode, and talk to the IPC service."); + opts.addOption("appmaster", false, + "Set the client to appmaster mode, " + + "and talk to the application master IPC service of the distributed " + + "shell application. The distributed shell application must have been " + + "started previously with --enable_ipc option specified."); + opts.addOption("app_id", true, + "The application Id of the application " + "(appmaster mode only)"); + opts.addOption("action", true, + "The action to be performed. " + "(appmaster mode only)"); + opts.addOption("container_id", true, + "The container Id of the container " + "(appmaster mode only)"); } /** @@ -433,9 +485,56 @@ public boolean init(String[] args) throws ParseException { return false; } + if (cliParser.hasOption("appmaster")) { + appmaster = true; + try { + action = Enum.valueOf(Action.class, cliParser.getOptionValue("action")); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid action specified, exiting. " + + "Supported actions are CHANGE_CONTAINER"); + } + app_id = cliParser.getOptionValue("app_id"); + container_id = cliParser.getOptionValue("container_id"); + if (app_id == null || container_id == null) { + throw new IllegalArgumentException( + "Invalid app_id/container_id specified, exiting. Specified " + + "app_id=" + app_id + ", container_id=" + container_id); + } + + containerMemory = + Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); + containerVirtualCores = + Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); + if (cliParser.hasOption("container_resources")) { + Map resources = parseResourcesString( + cliParser.getOptionValue("container_resources")); + for (Map.Entry entry : resources + .entrySet()) { + if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { + containerMemory = entry.getValue().getValue(); + } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { + containerVirtualCores = (int) entry.getValue().getValue(); + } else { + containerResources.put(entry.getKey(), entry.getValue()); + } + } + } + + if (containerMemory < 0 || containerVirtualCores < 0) { + throw new IllegalArgumentException( + "Invalid container memory/vcores specified, exiting." + + " Specified containerMemory=" + containerMemory + + ", containerVirtualCores=" + containerVirtualCores); + } + return true; + } + if (cliParser.hasOption("debug")) { debugFlag = true; + } + if (cliParser.hasOption("enable_ipc")) { + enable_ipc = true; } if (cliParser.hasOption("keep_containers_across_application_attempts")) { @@ -456,13 +555,14 @@ public boolean init(String[] args) throws ParseException { amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); if (cliParser.hasOption("master_resources")) { - Map masterResources = + Map masterResources = parseResourcesString(cliParser.getOptionValue("master_resources")); - for (Map.Entry entry : masterResources.entrySet()) { + for (Map.Entry entry : masterResources + .entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { - amMemory = entry.getValue(); + amMemory = entry.getValue().getValue(); } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { - amVCores = entry.getValue().intValue(); + amVCores = (int) entry.getValue().getValue(); } else { amResources.put(entry.getKey(), entry.getValue()); } @@ -522,18 +622,20 @@ public boolean init(String[] args) throws ParseException { if (cliParser.hasOption("promote_opportunistic_after_start")) { autoPromoteContainers = true; } + containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); if (cliParser.hasOption("container_resources")) { - Map resources = + Map resources = parseResourcesString(cliParser.getOptionValue("container_resources")); - for (Map.Entry entry : resources.entrySet()) { + for (Map.Entry entry : resources + .entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { - containerMemory = entry.getValue(); + containerMemory = entry.getValue().getValue(); } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { - containerVirtualCores = entry.getValue().intValue(); + containerVirtualCores = (int) entry.getValue().getValue(); } else { containerResources.put(entry.getKey(), entry.getValue()); } @@ -618,6 +720,59 @@ public boolean init(String[] args) throws ParseException { this.applicationTags.add(appTag.trim()); } } + + return true; + } + + public boolean runAppMasterMode() throws IOException, YarnException { + LOG.info("Starting Yarn Client"); + yarnClient.start(); + ApplicationId applicationId; + ContainerId containerId; + try { + applicationId = ConverterUtils.toApplicationId(app_id); + containerId = ConverterUtils.toContainerId(container_id); + } catch (Exception e) { + LOG.error("The specified application Id or container Id are invalid. " + + "applicationId=" + app_id + ", containerId=" + container_id); + return false; + } + ApplicationReport report = yarnClient.getApplicationReport(applicationId); + + ContainerReport containerReport = + yarnClient.getContainerReport(containerId); + + String appMasterHost = getAppMasterHost(report.getHost()); + int appMasterRpcPort = report.getRpcPort(); + if (appMasterHost == null || appMasterRpcPort == -1) { + LOG.error("Failed to get distributed shell RPC service address"); + return false; + } + LOG.info("Distributed shell RPC service is available at " + appMasterHost + + ":" + appMasterRpcPort); + DistributedShellIPC distributedShellIPCClient = DistributedShellIPCProxy + .getProxy(conf, appMasterHost, appMasterRpcPort); + Resource target = + Resource.newInstance(containerMemory, containerVirtualCores); + if (!containerResources.isEmpty()) { + for (Map.Entry entry : containerResources + .entrySet()) { + target.setResourceInformation(entry.getKey(), entry.getValue()); + } + } + UpdateContainerRequest updateRequest = UpdateContainerRequest.newInstance( + containerReport.getVersion(), containerReport.getContainerId(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE, target, + ExecutionType.GUARANTEED); + switch (action) { + case CHANGE_CONTAINER: + LOG.info("Sending request to change resource of container:" + containerId + + " to: " + target); + distributedShellIPCClient.updateContainer(updateRequest); + break; + default: + break; + } return true; } @@ -873,8 +1028,18 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); } if (!containerResources.isEmpty()) { - Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("="); - vargs.add("--container_resources " + joiner.join(containerResources)); + + StringBuffer containerResourcesBuffer = new StringBuffer(); + for (Map.Entry entry : containerResources + .entrySet()) { + containerResourcesBuffer.append(entry.getValue().getName()); + containerResourcesBuffer.append('='); + containerResourcesBuffer.append(entry.getValue().getValue()); + containerResourcesBuffer.append(entry.getValue().getUnits()); + containerResourcesBuffer.append(','); + } + vargs.add("--container_resources " + containerResourcesBuffer + .deleteCharAt(containerResourcesBuffer.length() - 1).toString()); } if (containerResourceProfile != null && !containerResourceProfile .isEmpty()) { @@ -909,6 +1074,10 @@ public boolean run() throws IOException, YarnException { vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + if (enable_ipc) { + vargs.add("--enable_ipc"); + } + // Get final commmand StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { @@ -1170,8 +1339,9 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, } validateResourceTypes(amResources.keySet(), resourceTypes); - for (Map.Entry entry : amResources.entrySet()) { - capability.setResourceValue(entry.getKey(), entry.getValue()); + for (Map.Entry entry : amResources + .entrySet()) { + capability.setResourceInformation(entry.getKey(), entry.getValue()); } // set amMemory because it's used to set Xmx param if (amMemory == -1) { @@ -1221,8 +1391,9 @@ private void validateResourceTypes(Iterable resourceNames, } } - static Map parseResourcesString(String resourcesStr) { - Map resources = new HashMap<>(); + static Map parseResourcesString( + String resourcesStr) { + Map resources = new HashMap<>(); // Ignore the grouping "[]" if (resourcesStr.startsWith("[")) { @@ -1246,13 +1417,44 @@ private void validateResourceTypes(Iterable resourceNames, 0, value.length() - units.length()).trim(); Long resourceValue = Long.valueOf(valueWithoutUnit); if (!units.isEmpty()) { + // Shouldn't new resource types conversion happens based on server side + // configuration rather than connverting it to "Mi" by default ? resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); } + + ResourceInformation ri = null; if (key.equals("memory")) { key = ResourceInformation.MEMORY_URI; + ri = ResourceInformation.newInstance(ResourceInformation.MEMORY_URI, + ResourceInformation.MEMORY_MB.getUnits(), resourceValue); + } else if (key.equals("vcores")) { + // key = ResourceInformation.VCORES_URI; + ri = ResourceInformation.newInstance(ResourceInformation.VCORES_URI, + ResourceInformation.VCORES.getUnits(), resourceValue); + } else { + // Taking 'Mi' as unit for all resource types as conversion happens + // before. But ideally it should get pulled from server side rm config + ri = ResourceInformation.newInstance(key, "Mi", resourceValue); } - resources.put(key, resourceValue); + resources.put(key, ri); } return resources; } + + /* + * NetUtils.getHostname() returns a string in the form "hostname/ip". Parse + * out the hostname part. + */ + private String getAppMasterHost(String hostname) { + if (hostname == null) { + return null; + } + String appHostname = hostname.split("/")[0]; + try { + return InetAddress.getByName(appHostname).getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.error("Failed to get IP address of host:" + appHostname); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java index e69de29bb2d..2fda614cd63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPC.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +public interface DistributedShellIPC { + + ContainerId updateContainer( + UpdateContainerRequest updateContainerRequest) + throws + YarnException, IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java index e69de29bb2d..64ed3ac897c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCPB.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.DistributedShellIPC.DistributedShellIPCService; + +@ProtocolInfo( + protocolName = "DistributedShellIPCPB", + protocolVersion = 1) +public interface DistributedShellIPCPB + extends DistributedShellIPCService.BlockingInterface { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java index e69de29bb2d..5a8e2c2a51f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCProxy.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; + +public class DistributedShellIPCProxy { + + private static InetSocketAddress getIPCAddress(String address, int port) { + return NetUtils.createSocketAddr(address, port); + } + + public static DistributedShellIPC getProxy( + final Configuration conf, final String address, final int port) + throws IOException { + return UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + @Override + public DistributedShellIPC run() { + return (DistributedShellIPC) YarnRPC.create(conf).getProxy( + DistributedShellIPC.class, getIPCAddress(address, port), conf); + } + }); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java index e69de29bb2d..247167fca6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/DistributedShellIPCService.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public class DistributedShellIPCService extends AbstractService + implements DistributedShellIPC { + + private static final Log LOG = + LogFactory.getLog(DistributedShellIPCService.class); + private static final int DEFAULT_IPC_PORT = 8686; + private static final String DEFAULT_IPC_ADDRESS = "0.0.0.0"; + private static final int DEFAULT_CLIENT_THREAD_COUNT = 10; + + private Server server; + private InetSocketAddress distributedShellIPCAddress; + private ApplicationMaster applicationMaster; + private String address = DEFAULT_IPC_ADDRESS; + private int port = DEFAULT_IPC_PORT; + + public DistributedShellIPCService(ApplicationMaster applicationMaster) { + super("DistributedShellIPCService"); + this.applicationMaster = applicationMaster; + } + + public int getPort() { + return this.port; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + distributedShellIPCAddress = NetUtils.createSocketAddr(address, port); + super.init(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + Configuration conf = getConfig(); + this.server = YarnRPC.create(conf).getServer( + DistributedShellIPC.class, this, distributedShellIPCAddress, + conf, null, DEFAULT_CLIENT_THREAD_COUNT); + this.server.start(); + } + + @Override + public ContainerId updateContainer( + UpdateContainerRequest updateContainerRequest) throws + YarnException, IOException{ + ContainerId containerId = updateContainerRequest.getContainerId(); + Resource capability = updateContainerRequest.getCapability(); + LOG.info("Update Container request received: containerID=" + + containerId + " resource=" + capability); + applicationMaster.updateContainer(containerId, capability); + return containerId; + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + super.serviceStop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java index e69de29bb2d..77d08ff6211 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/client/DistributedShellIPCPBClientImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc.impl.pb.client; + +import com.google.protobuf.ServiceException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCPB; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class DistributedShellIPCPBClientImpl implements DistributedShellIPC, + Closeable { + + private DistributedShellIPCPB proxy; + + public DistributedShellIPCPBClientImpl( + long clientVersion, InetSocketAddress addr, Configuration conf) + throws IOException { + RPC.setProtocolEngine( + conf, DistributedShellIPCPB.class, ProtobufRpcEngine.class); + proxy = RPC.getProxy( + DistributedShellIPCPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if(this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public ContainerId updateContainer( + UpdateContainerRequest request) + throws YarnException, IOException { + UpdateContainerRequestProto requestProto = + ((UpdateContainerRequestPBImpl) request).getProto(); + try { + return new ContainerIdPBImpl( + proxy.updateContainer(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java index e69de29bb2d..2848368477c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/rpc/impl/pb/service/DistributedShellIPCPBServiceImpl.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell.rpc.impl.pb.service; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPC; +import org.apache.hadoop.yarn.applications.distributedshell.rpc.DistributedShellIPCPB; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; + +import java.io.IOException; + +public class DistributedShellIPCPBServiceImpl + implements DistributedShellIPCPB { + + private DistributedShellIPC internal; + + public DistributedShellIPCPBServiceImpl(DistributedShellIPC internal) { + this.internal = internal; + } + + @Override + public ContainerIdProto updateContainer( + RpcController controller, UpdateContainerRequestProto proto) + throws ServiceException { + UpdateContainerRequestPBImpl request = + new UpdateContainerRequestPBImpl(proto); + try { + ContainerId containerId = internal.updateContainer(request); + return ((ContainerIdPBImpl) containerId).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto index e69de29bb2d..1e9ec329bab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/proto/distributedshell_service.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "DistributedShellIPC"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; +import "yarn_protos.proto"; + +service DistributedShellIPCService { + rpc updateContainer(UpdateContainerRequestProto) returns (ContainerIdProto); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index f2a80412f9c..dc288efea96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -62,7 +62,7 @@ protected Thread createLaunchContainerThread(Container allocatedContainer, String shellId) { threadsLaunched++; - launchedContainers.add(allocatedContainer.getId()); + launchedContainers.put(allocatedContainer.getId(), allocatedContainer); yarnShellIds.add(shellId); return new Thread(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 3a98a22ee40..85e24dafc53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -28,6 +28,7 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.InetAddress; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; @@ -92,6 +95,9 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; @@ -123,12 +129,13 @@ protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); + private static File dest; @Rule public TimelineVersionWatcher timelineVersionWatcher = new TimelineVersionWatcher(); @Rule - public Timeout globalTimeout = new Timeout(90000); + public Timeout globalTimeout = new Timeout(180000); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -174,6 +181,27 @@ private void setupInternal(int numNodeManager, float timelineVersion) conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); + ResourceUtils.resetResourceTypes(); + String resourceTypesFile = "resource-types-2.xml"; + InputStream source = + conf.getClassLoader().getResourceAsStream(resourceTypesFile); + dest = new File(conf.getClassLoader().getResource(".").getPath(), + "resource-types.xml"); + FileUtils.copyInputStreamToFile(source, dest); + ResourceUtils.getResourceTypes(); + + ResourceUtils.resetNodeResources(); + String nodeResourceTypesFile = "node-resources-2.xml"; + InputStream nodeSource = + conf.getClassLoader().getResourceAsStream(nodeResourceTypesFile); + File nodeResourceTypesXMLFile = new File( + conf.getClassLoader().getResource(".").getPath(), "node-resources.xml"); + FileUtils.copyInputStreamToFile(nodeSource, nodeResourceTypesXMLFile); + ResourceUtils.getNodeResourceInformation(conf); + + conf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); + // ATS version specific settings if (timelineVersion == 1.0f) { conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); @@ -276,6 +304,10 @@ public void tearDown() throws IOException { .delete( new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true); + + if (dest.exists()) { + dest.delete(); + } } @Test @@ -1623,4 +1655,251 @@ public void testDistributedShellAMResourcesWithUnknownResource() client.init(args); client.run(); } + + @Test(timeout = 180000) + @TimelineVersion(2.0f) + public void testDSShellWithIPCService() throws Exception { + LOG.info("Initializing DS Client"); + // Make sure ipc service is started with --enable_ipc option + // Submit an application with three containers (including app master) + String[] args = { "--jar", APPMASTER_JAR, "--num_containers", "2", + "--shell_command", "sleep 20", "--master_memory", "1024", + "--master_vcores", "1", "--container_memory", "1024", + "--container_vcores", "1", "--enable_ipc" }; + Configuration conf = new Configuration(yarnCluster.getConfig()); + final Client client = new Client(conf); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + LOG.info("Query Resource Manager"); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + ApplicationId applicationId; + List containers; + // Make sure applications and containers are all started + while (true) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + if (appReport.getHost().equals("N/A") || appReport.getRpcPort() == -1) { + Thread.sleep(10); + continue; + } + containers = + yarnClient.getContainers(appReport.getCurrentApplicationAttemptId()); + if (containers.size() < 3) { + Thread.sleep(10); + continue; + } + applicationId = appReport.getApplicationId(); + break; + } + // Sleep a while to make sure containers are in RUNNING state in NM + Thread.sleep(1000); + // Exclude the non-master containers + for (ContainerReport container : containers) { + if ((container.getContainerId().getContainerId() + & ContainerId.CONTAINER_ID_BITMASK) == 1L) { + containers.remove(container); + break; + } + } + // Make sure both containers have 1G memory originally + Assert.assertEquals(1024, + containers.get(0).getAllocatedResource().getMemory()); + Assert.assertEquals(1024, + containers.get(1).getAllocatedResource().getMemory()); + // Increase container1 memory to 2048 + testUpdateContainerThroughAppMasterIPCService(yarnClient, applicationId, + containers.get(0).getContainerId(), Resource.newInstance(2048, 1)); + // Decrease container2 memory to 512 + testUpdateContainerThroughAppMasterIPCService(yarnClient, applicationId, + containers.get(1).getContainerId(), Resource.newInstance(512, 1)); + // Wait for the distributed shell application to finish + t.join(); + Assert.assertTrue(result.get()); + } + + private void testUpdateContainerThroughAppMasterIPCService( + YarnClient yarnClient, ApplicationId applicationId, + ContainerId containerId, Resource target) throws Exception { + LOG.info("Initializing DS Application Master Client"); + // Create a client in appmaster mode + String[] args = { "--appmaster", "--action", "CHANGE_CONTAINER", "--app_id", + applicationId.toString(), "--container_id", containerId.toString(), + "--container_memory", Integer.toString(target.getMemory()), + "--container_vcores", Integer.toString(target.getVirtualCores()) }; + final Client client = new Client(conf); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Application Master Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.runAppMasterMode()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + t.join(); + Assert.assertTrue(result.get()); + // Verify resource allocation has been changed in RM + while (true) { + ContainerReport container = yarnClient.getContainerReport(containerId); + if (container.getAllocatedResource().getMemory() != target.getMemory()) { + Thread.sleep(10); + continue; + } + break; + } + } + + @Test(timeout = 180000) + @TimelineVersion(2.0f) + public void testDSShellWithIPCServiceForResourceTypes() throws Exception { + LOG.info("Initializing DS Client"); + // Make sure ipc service is started with --enable_ipc option + // Submit an application with three containers (including app master) + String[] args = { "--jar", APPMASTER_JAR, "--num_containers", "2", + "--shell_command", "sleep 20", "--master_resources", + "memory=512,vcores=1,resource1=1024Mi", "--container_resources", + "memory=1024,vcores=1,resource1=1024Mi", "--enable_ipc" }; + Configuration conf = new Configuration(yarnCluster.getConfig()); + final Client client = new Client(conf); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + LOG.info("Query Resource Manager"); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + ApplicationId applicationId; + List containers; + // Make sure applications and containers are all started + while (true) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + if (appReport.getHost().equals("N/A") || appReport.getRpcPort() == -1) { + Thread.sleep(10); + continue; + } + containers = + yarnClient.getContainers(appReport.getCurrentApplicationAttemptId()); + if (containers.size() < 3) { + Thread.sleep(10); + continue; + } + applicationId = appReport.getApplicationId(); + break; + } + // Sleep a while to make sure containers are in RUNNING state in NM + Thread.sleep(1000); + // Exclude the non-master containers + for (ContainerReport container : containers) { + if ((container.getContainerId().getContainerId() + & ContainerId.CONTAINER_ID_BITMASK) == 1L) { + containers.remove(container); + break; + } + } + // Make sure both containers have 1G memory originally + Assert.assertEquals(1024, + containers.get(0).getAllocatedResource().getMemory()); + Assert.assertEquals(1024, + containers.get(1).getAllocatedResource().getMemory()); + + Resource increasedNewResource = Resource.newInstance(2048, 1); + increasedNewResource.setResourceInformation("resource1", + ResourceInformation.newInstance("resource1", "Mi", 2048)); + + // Increase container1's resource 'resource1' to 2048Mi + testUpdateContainerThroughAppMasterIPCServiceForResourceTypes(yarnClient, + applicationId, containers.get(0).getContainerId(), + increasedNewResource); + + Resource decreasedNewResource = Resource.newInstance(1024, 1); + decreasedNewResource.setResourceInformation("resource1", + ResourceInformation.newInstance("resource1", "Mi", 1024)); + + // Decrease container2's resource 'resource1' to 1024Mi + testUpdateContainerThroughAppMasterIPCServiceForResourceTypes(yarnClient, + applicationId, containers.get(1).getContainerId(), + decreasedNewResource); + // Wait for the distributed shell application to finish + t.join(); + Assert.assertTrue(result.get()); + } + + private void testUpdateContainerThroughAppMasterIPCServiceForResourceTypes( + YarnClient yarnClient, ApplicationId applicationId, + ContainerId containerId, Resource target) throws Exception { + LOG.info("Initializing DS Application Master Client"); + // Create a client in appmaster mode + String[] args = { "--appmaster", "--action", "CHANGE_CONTAINER", "--app_id", + applicationId.toString(), "--container_id", containerId.toString(), + "--container_resources", + "memory=" + Integer.toString(target.getMemory()) + " Mi" + ",vcores=" + + Integer.toString(target.getVirtualCores()) + ",resource1=" + + target.getResourceInformation("resource1").getValue() + " " + + target.getResourceInformation("resource1").getUnits() }; + final Client client = new Client(conf); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Application Master Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.runAppMasterMode()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + t.join(); + Assert.assertTrue(result.get()); + // Verify resource allocation has been changed in RM + while (true) { + ContainerReport container = yarnClient.getContainerReport(containerId); + if (container.getAllocatedResource().getResourceInformation("resource1") + .getValue() != target.getResourceInformation("resource1") + .getValue()) { + Thread.sleep(10); + continue; + } + break; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/node-resources-2.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/node-resources-2.xml index e69de29bb2d..5be8feb2f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/node-resources-2.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/node-resources-2.xml @@ -0,0 +1,23 @@ + + + + + + + yarn.nodemanager.resource-type.resource1 + 5120Mi + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/resource-types-2.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/resource-types-2.xml index e69de29bb2d..ff5dd425bfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/resource-types-2.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/resource-types-2.xml @@ -0,0 +1,34 @@ + + + + + + + + yarn.resource-types + resource1 + + + + yarn.resource-types.resource1.units + Mi + + + + yarn.resource-types.resource1.value + 5120 + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerReportPBImpl.java index 2b58c707c7b..eeeadb60af7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; @@ -375,4 +376,16 @@ public void setExecutionType(ExecutionType executionType) { } builder.setExecutionType(ProtoUtils.convertToProtoFormat(executionType)); } + + @Override + public int getVersion() { + ContainerReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getVersion(); + } + + @Override + public void setVersion(int version) { + maybeInitBuilder(); + builder.setVersion(version); + } }