diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 7667157..cb254fa 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -153,7 +153,25 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { return list; } - @Override + @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + + @Override public String getNodeManagerVersion() { // TODO Auto-generated method stub return null; diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index bbe24c8..31159c4 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -135,6 +135,24 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { return list; } + @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + List getContainerUpdates() { return updates; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 1ee04f0..916a3fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -62,7 +63,7 @@ public static ApplicationSubmissionContext newInstance( Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType, - boolean keepContainers) { + boolean keepContainers, boolean cpuEnforceCeilingEnabled) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -76,6 +77,7 @@ public static ApplicationSubmissionContext newInstance( context.setResource(resource); context.setApplicationType(applicationType); context.setKeepContainersAcrossApplicationAttempts(keepContainers); + context.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); return context; } @@ -85,6 +87,19 @@ public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers) { + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, false); + } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType) { return newInstance(applicationId, applicationName, queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, @@ -338,4 +353,27 @@ public abstract void setKeepContainersAcrossApplicationAttempts( @Public @Stable public abstract void setApplicationTags(Set tags); + + /** + * Get the flag which indicates whether to enable cpu enforce ceiling for + * this application or not. + * + * @return the flag which indicates whether to enable cpu enforce ceiling + * or not. + */ + @Public + @Evolving + public abstract boolean getCpuEnforceCeilingEnabled(); + + /** + * Set the flat which indicates whether to enable cpu enforce ceiling for + * this application. + * + * @param ceilingEnabled + * the flag which indicates whether to enable cpu enforce ceiling. + */ + @Public + @Evolving + public abstract void setCpuEnforceCeilingEnabled(boolean ceilingEnabled); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3f1fa6c..1e80fee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -288,6 +288,7 @@ message ApplicationSubmissionContextProto { optional string applicationType = 10 [default = "YARN"]; optional bool keep_containers_across_application_attempts = 11 [default = false]; repeated string applicationTags = 12; + optional bool cpu_enforce_ceiling_enabled = 13 [default = false]; } enum ApplicationAccessTypeProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 3336ed9..f6b24b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -151,6 +151,9 @@ // No. of containers in which the shell script needs to be executed private int numContainers = 1; + // cpu_enforce_ceiling enabled or not + private boolean cpuEnforceCeilingEnabled = false; + // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -242,6 +245,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("cpu_enforce_ceiling_enabled", true, "Whether enable cpu_enforce_ceiling (true/false)"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + @@ -362,6 +366,8 @@ public boolean init(String[] args) throws ParseException { containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); + cpuEnforceCeilingEnabled = Boolean.parseBoolean(cliParser.getOptionValue("cpu_enforce_ceiling_enabled", "false")); + if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," + " exiting." @@ -451,6 +457,7 @@ public boolean run() throws IOException, YarnException { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + appContext.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index c4a3a72..b281964 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -369,6 +369,18 @@ public boolean getKeepContainersAcrossApplicationAttempts() { return p.getKeepContainersAcrossApplicationAttempts(); } + @Override + public void setCpuEnforceCeilingEnabled(boolean ceilingEnabled) { + maybeInitBuilder(); + builder.setCpuEnforceCeilingEnabled(ceilingEnabled); + } + + @Override + public boolean getCpuEnforceCeilingEnabled() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpuEnforceCeilingEnabled(); + } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..b78802d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -52,4 +52,8 @@ String getDiagnosticsMessage(); void setDiagnosticsMessage(String diagnosticsMessage); + + List getContainersWithCpuEnforceCeilingEnabled(); + + void setContainersWithCpuEnforceCeilingEnabled(List containerIds); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..1a5890a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -49,6 +49,7 @@ private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private List containersWithCpuCeilingEnabled = null; public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); @@ -81,6 +82,9 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containersWithCpuCeilingEnabled != null) { + addContainersWithCpuEnforceCeilingEnabledToProto(); + } } private void mergeLocalToProto() { @@ -331,6 +335,69 @@ public void setNextHeartBeatInterval(long nextHeartBeatInterval) { builder.setNextHeartBeatInterval(nextHeartBeatInterval); } + @Override + public List getContainersWithCpuEnforceCeilingEnabled() { + initContainerssWithCpuEnforceCeilingEnabled(); + return this.containersWithCpuCeilingEnabled; + } + + private void initContainerssWithCpuEnforceCeilingEnabled() { + if (this.containersWithCpuCeilingEnabled != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getContainersWithCpuEnforceCeilingEnabledList(); + this.containersWithCpuCeilingEnabled = new ArrayList(); + + for (ContainerIdProto c : list) { + this.containersWithCpuCeilingEnabled.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setContainersWithCpuEnforceCeilingEnabled( + List containerIds) { + if (containersWithCpuCeilingEnabled == null) { + builder.clearContainersWithCpuEnforceCeilingEnabled(); + } + this.containersWithCpuCeilingEnabled = containerIds; + } + + private void addContainersWithCpuEnforceCeilingEnabledToProto() { + maybeInitBuilder(); + builder.clearContainersWithCpuEnforceCeilingEnabled(); + if (containersWithCpuCeilingEnabled == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = + containersWithCpuCeilingEnabled.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + }; + builder.addAllContainersWithCpuEnforceCeilingEnabled(iterable); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e..b1b0ed4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto containers_with_cpu_enforce_ceiling_enabled = 9; } message NMContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index ee72fbc..671be34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -111,13 +111,15 @@ public abstract void startLocalizer(Path nmPrivateContainerTokens, * @param containerWorkDir the work dir for the container * @param localDirs nm-local-dirs to be used for this container * @param logDirs nm-log-dirs to be used for this container + * @param cpuEnforceCeilingEnabled whether enable cpu ceiling, + * only used by {@link LinuxContainerExecutor} * @return the return status of the launch * @throws IOException */ public abstract int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, List localDirs, - List logDirs) throws IOException; + List logDirs, boolean cpuEnforceCeilingEnabled) throws IOException; public abstract boolean signalContainer(String user, String pid, Signal signal) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 956ea33..80332a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.List; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; @@ -56,6 +57,10 @@ ConcurrentMap getContainers(); + List getContainersWithCpuEnforceCeilingEnabled(); + + void setContainersWithCpuEnforceCeilingEnabled(List containerIds); + NMContainerTokenSecretManager getContainerTokenSecretManager(); NMTokenSecretManagerInNM getNMTokenSecretManager(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9e2e111..9fdb37a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -111,7 +111,8 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { FsPermission dirPerm = new FsPermission(APPDIR_PERM); ContainerId containerId = container.getContainerId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index cbdcb13..ed21395 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -239,7 +239,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { verifyUsernamePattern(user); String runAsUser = getRunAsUser(user); @@ -248,7 +249,7 @@ public int launchContainer(Container container, String containerIdStr = ConverterUtils.toString(containerId); resourcesHandler.preExecute(containerId, - container.getResource()); + container.getResource(), cpuEnforceCeilingEnabled); String resourcesOptions = resourcesHandler.getResourcesOption( containerId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 65988a2..9fc2c13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -315,6 +316,7 @@ public void run() { new ConcurrentHashMap(); protected final ConcurrentMap containers = new ConcurrentSkipListMap(); + private List containersWithCpuEnforceCeilingEnabled; private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -365,6 +367,16 @@ public int getHttpPort() { } @Override + public List getContainersWithCpuEnforceCeilingEnabled() { + return this.containersWithCpuEnforceCeilingEnabled; + } + + @Override + public void setContainersWithCpuEnforceCeilingEnabled(List containerIds) { + this.containersWithCpuEnforceCeilingEnabled = containerIds; + } + + @Override public NMContainerTokenSecretManager getContainerTokenSecretManager() { return this.containerTokenSecretManager; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0b8f5b4..c745fca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -561,6 +561,10 @@ public void run() { new CMgrCompletedAppsEvent(appsToCleanup, CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } + + // Update list of containers with cpu_enforce_ceiling enabled + context.setContainersWithCpuEnforceCeilingEnabled( + response.getContainersWithCpuEnforceCeilingEnabled()); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index e252e35..7daa96c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -297,9 +297,13 @@ public Integer call() { } else { exec.activateContainer(containerID, pidFilePath); + boolean cpuEnforceCeilingEnabled = + context.getContainersWithCpuEnforceCeilingEnabled() != null && + context.getContainersWithCpuEnforceCeilingEnabled() + .contains(containerID); ret = exec.launchContainer(container, nmPrivateContainerScriptPath, nmPrivateTokensPath, user, appIdStr, containerWorkDir, - localDirs, logDirs); + localDirs, logDirs, cpuEnforceCeilingEnabled); } } catch (Throwable e) { LOG.warn("Failed to launch container.", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index d5bd225..5b0203d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -206,8 +206,8 @@ boolean deleteCgroup(String cgroupPath) { * Next three functions operate on all the resources we are enforcing. */ - private void setupLimits(ContainerId containerId, - Resource containerResource) throws IOException { + private void setupLimits(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { String containerName = containerId.toString(); if (isCpuWeightEnabled()) { @@ -215,6 +215,17 @@ private void setupLimits(ContainerId containerId, int cpuShares = CPU_DEFAULT_WEIGHT * containerResource.getVirtualCores(); updateCgroup(CONTROLLER_CPU, containerName, "shares", String.valueOf(cpuShares)); + + // Set ceiling for the container if enabled. + if (cpuEnforceCeilingEnabled) { + int nmShares = CPU_DEFAULT_WEIGHT * + conf.getInt(YarnConfiguration.NM_VCORES, + YarnConfiguration.DEFAULT_NM_VCORES); + updateCgroup(CONTROLLER_CPU, containerName, + "cfs_period_us", String.valueOf(nmShares)); + updateCgroup(CONTROLLER_CPU, containerName, + "cfs_quota_us", String.valueOf(cpuShares)); + } } } @@ -228,9 +239,9 @@ private void clearLimits(ContainerId containerId) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) - throws IOException { - setupLimits(containerId, containerResource); + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { + setupLimits(containerId, containerResource, cpuEnforceCeilingEnabled); } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java index 9fb8707..b579fa7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java @@ -51,7 +51,8 @@ public void init(LinuxContainerExecutor lce) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) { + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) { } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java index 34f7f31..fbd7365 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java @@ -34,9 +34,12 @@ * inside the container. * @param containerId the id of the container being launched * @param containerResource the node resources the container will be using + * @param cpuEnforceCeilingEnabled whether enable cpu_enforce_ceiling for this + * container, only work for + * {@link CgroupsLCEResourcesHandler} */ - void preExecute(ContainerId containerId, Resource containerResource) - throws IOException; + void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException; /** * Called by the LinuxContainerExecutor after the executable inside the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index f840730..5f039be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -203,7 +203,7 @@ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException { exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index ddffa27..e8c8239 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -126,7 +126,7 @@ public void testContainerLaunch() throws IOException { mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); assertEquals(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, @@ -246,7 +246,7 @@ public void testContainerLaunchError() throws IOException { mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); Assert.assertNotSame(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index f72ef30..f02b178 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f2a8376..a28a9fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -409,6 +409,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) populateKeys(request, nodeHeartBeatResponse); + // Add containers with cpu_enforce_ceiling_enforced. + rmNode.updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + nodeHeartBeatResponse); + // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index eef361f..89b5f08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -40,10 +40,12 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -446,6 +448,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + container.notifyNodeIfCpuCeilingEnabled(true); + container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.appAttemptId)); } @@ -485,6 +489,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.finishTime = System.currentTimeMillis(); container.finishedStatus = finishedEvent.getRemoteContainerStatus(); + + container.notifyNodeIfCpuCeilingEnabled(false); // Inform AppAttempt // container.getContainer() can return null when a RMContainer is a // reserved container @@ -517,6 +523,8 @@ private static void updateMetricsIfPreempted(RMContainerImpl container) { FinishedTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + container.notifyNodeIfCpuCeilingEnabled(false); + // Unregister from containerAllocationExpirer. container.containerAllocationExpirer.unregister(container .getContainerId()); @@ -530,7 +538,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister from containerAllocationExpirer. container.containerAllocationExpirer.unregister(container .getContainerId()); @@ -559,4 +566,30 @@ public ContainerReport createContainerReport() { } return containerReport; } + + /** + * Notify the corresponding node the update its containers with + * cpu_enforce_ceiling enabled. + * + * @param addOrRemove true for add, and false for remove + */ + private void notifyNodeIfCpuCeilingEnabled(boolean addOrRemove) { + try { + if (rmContext.getRMApps() != null) { + RMApp app = rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (app.getApplicationSubmissionContext().getCpuEnforceCeilingEnabled()) { + RMNode node = rmContext.getRMNodes().get(nodeId); + if (addOrRemove) { + node.addContainerWithCpuCeilingEnabled(containerId); + } else { + node.removeContainerWithCpuCeilingEnabled(containerId); + } + } + } + } catch (Exception e) { + LOG.warn("Exception happened when the container " + containerId + + " notifies node " + nodeId + + " about cpu_enforce_ceiling_enabled information.", e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 24793e8..d25d212 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -147,4 +147,32 @@ * @return containerUpdates accumulated across NM heartbeats. */ public List pullContainerUpdates(); + + /** + * Update a {@link NodeHeartbeatResponse} with the list of applications + * which are cpu_enforce_ceiling enabled and have containers launched at + * that node. + * + * @param response the {@link NodeHeartbeatResponse} to update + */ + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled(NodeHeartbeatResponse response); + + /** + * Add a {@link ContainerId} whose cpu_enforce_ceiling is enabled + * + * @param containerId the {@link ContainerId} to add + */ + public void addContainerWithCpuCeilingEnabled(ContainerId containerId); + + /** + * Remove a {@link ContainerId} from the list + * + * @param containerId the {@link ContainerId} to remove + */ + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId); + + /** + * Get the list of {@link ContainerId} with cpu_enforce_ceiling enbaled + */ + public List getContainersWithCpuCeilingEnabled(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e20adc5..5b29dd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -114,6 +114,10 @@ /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); + /* the list of containers that have cpu_enforce_ceiling enabled */ + private final List containersWithCpuCeilingEnabled = + new ArrayList(); + private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); @@ -371,6 +375,46 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { } } + @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + response.setContainersWithCpuEnforceCeilingEnabled( + this.containersWithCpuCeilingEnabled); + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + this.writeLock.lock(); + + try { + this.containersWithCpuCeilingEnabled.add(containerId); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + this.writeLock.lock(); + + try { + this.containersWithCpuCeilingEnabled.remove(containerId); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + this.readLock.lock(); + + try { + return this.containersWithCpuCeilingEnabled; + } finally { + this.readLock.unlock(); + } + } + public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 8ef01d9..b7ceb76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -195,6 +195,24 @@ public String getNodeManagerVersion() { } @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + + @Override public String getHealthReport() { return healthReport; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 44f8381..eea754b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -27,10 +27,13 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -52,7 +55,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -216,6 +221,103 @@ public void testExpireWhileRunning() { } @Test + public void testNotifyNodeIfCpuCeilingEnabled() { + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler appAttemptEventHandler = mock(EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + ConcurrentMap applications = + new ConcurrentHashMap(); + ConcurrentMap nodes = new ConcurrentHashMap(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getRMApps()).thenReturn(applications); + when(rmContext.getRMNodes()).thenReturn(nodes); + + // Create a RMNode + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + RMNode node = new RMNodeImpl(nodeId, rmContext, null, 10, 20, null, null, null); + rmContext.getRMNodes().put(nodeId, node); + + // Create a RMApp + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ApplicationSubmissionContext appContext = mock(ApplicationSubmissionContext.class); + when(appContext.getCpuEnforceCeilingEnabled()).thenReturn(true); + RMApp app = mock(RMApp.class); + when(app.getApplicationSubmissionContext()).thenReturn(appContext); + rmContext.getRMApps().put(appId, app); + + // Create two RMContainers + ContainerId containerId1 = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerId containerId2 = BuilderUtils.newContainerId(appAttemptId, 2); + Container container1 = BuilderUtils.newContainer(containerId1, nodeId, + "host:3465", null, null, null); + Container container2 = BuilderUtils.newContainer(containerId2, nodeId, + "host:3466", null, null, null); + RMContainer rmContainer1 = new RMContainerImpl(container1, appAttemptId, + nodeId, "user", rmContext); + RMContainer rmContainer2 = new RMContainerImpl(container2, appAttemptId, + nodeId, "user", rmContext); + assertEquals(RMContainerState.NEW, rmContainer1.getState()); + assertEquals(RMContainerState.NEW, rmContainer2.getState()); + + // NEW --> ALLOCATED + rmContainer1.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + rmContainer2.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer1.getState()); + assertEquals(RMContainerState.ALLOCATED, rmContainer2.getState()); + assertEquals(2, node.getContainersWithCpuCeilingEnabled().size()); + + // Finish one container + rmContainer1.handle(new RMContainerEvent(containerId1, + RMContainerEventType.ACQUIRED)); + drainDispatcher.await(); + ContainerStatus containerStatus1 = SchedulerUtils + .createAbnormalContainerStatus(containerId1, + SchedulerUtils.COMPLETED_APPLICATION); + rmContainer1.handle(new RMContainerFinishedEvent(containerId1, + containerStatus1, RMContainerEventType.FINISHED)); + drainDispatcher.await(); + assertEquals(RMContainerState.COMPLETED, rmContainer1.getState()); + assertEquals(1, node.getContainersWithCpuCeilingEnabled().size()); + + // Kill another container + ContainerStatus containerStatus2 = SchedulerUtils + .createAbnormalContainerStatus(containerId2, + SchedulerUtils.RELEASED_CONTAINER); + rmContainer2.handle(new RMContainerFinishedEvent(containerId2, + containerStatus2, RMContainerEventType.KILL)); + drainDispatcher.await(); + assertEquals(RMContainerState.KILLED, rmContainer2.getState()); + assertEquals(0, node.getContainersWithCpuCeilingEnabled().size()); + + // Reset cpu_enforce_ceiling to false + when(appContext.getCpuEnforceCeilingEnabled()).thenReturn(false); + Container container3 = BuilderUtils.newContainer(containerId2, nodeId, + "host:3467", null, null, null); + RMContainer rmContainer3 = new RMContainerImpl(container3, appAttemptId, + nodeId, "user", rmContext); + rmContainer3.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer3.getState()); + assertEquals(0, node.getContainersWithCpuCeilingEnabled().size()); + } + + @Test public void testExistenceOfResourceRequestInRMContainer() throws Exception { Configuration conf = new Configuration(); MockRM rm1 = new MockRM(conf);