diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index fdddcf4..342ca70 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -156,6 +156,24 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { } @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + + @Override public String getNodeManagerVersion() { return null; } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 3b185ae..bdfe9b9 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -138,7 +138,25 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { } return list; } - + + @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + List getContainerUpdates() { return updates; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index f1ebbfe..aaea9a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -76,6 +76,21 @@ public static ApplicationSubmissionContext newInstance( int maxAppAttempts, Resource resource, String applicationType, boolean keepContainers, String appLabelExpression, String amContainerLabelExpression) { + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, appLabelExpression, + amContainerLabelExpression, false); + } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, String appLabelExpression, + String amContainerLabelExpression, boolean cpuEnforceCeilingEnabled) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -90,7 +105,7 @@ public static ApplicationSubmissionContext newInstance( context.setKeepContainersAcrossApplicationAttempts(keepContainers); context.setNodeLabelExpression(appLabelExpression); context.setResource(resource); - + ResourceRequest amReq = Records.newRecord(ResourceRequest.class); amReq.setResourceName(ResourceRequest.ANY); amReq.setCapability(resource); @@ -98,9 +113,10 @@ public static ApplicationSubmissionContext newInstance( amReq.setRelaxLocality(true); amReq.setNodeLabelExpression(amContainerLabelExpression); context.setAMContainerResourceRequest(amReq); + context.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); return context; } - + public static ApplicationSubmissionContext newInstance( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, @@ -532,4 +548,26 @@ public abstract void setLogAggregationContext( @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get the flag which indicates whether to enable cpu enforce ceiling for + * this application. + * + * @return the flag which indicates whether to enable cpu enforce ceiling + * or not. + */ + @Public + @Evolving + public abstract boolean getCpuEnforceCeilingEnabled(); + + /** + * Set the flag which indicates whether to enable cpu enforce ceiling for + * this application. + * + * @param ceilingEnabled + * the flag which indicates whether to enable cpu enforce ceiling. + */ + @Public + @Evolving + public abstract void setCpuEnforceCeilingEnabled(boolean ceilingEnabled); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c4e756d..d106ded 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -303,6 +303,7 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + optional bool cpu_enforce_ceiling_enabled = 18 [default = false]; } message LogAggregationContextProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0e9a4e4..80f9b23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -153,6 +153,8 @@ // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; + // cpu_enforce_ceiling enabled or not + private boolean cpuEnforceCeilingEnabled = false; // log4j.properties file // if available, add to local resources and set into classpath @@ -259,6 +261,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("cpu_enforce_ceiling_enabled", true, "Whether enable cpu_enforce_ceiling (true/false)"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + @@ -397,7 +400,7 @@ public boolean init(String[] args) throws ParseException { containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - + cpuEnforceCeilingEnabled = Boolean.parseBoolean(cliParser.getOptionValue("cpu_enforce_ceiling_enabled", "false")); if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," @@ -510,6 +513,7 @@ public boolean run() throws IOException, YarnException { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + appContext.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 303b437..536c9f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -412,6 +412,18 @@ public boolean getKeepContainersAcrossApplicationAttempts() { return p.getKeepContainersAcrossApplicationAttempts(); } + @Override + public void setCpuEnforceCeilingEnabled(boolean ceilingEnabled) { + maybeInitBuilder(); + builder.setCpuEnforceCeilingEnabled(ceilingEnabled); + } + + @Override + public boolean getCpuEnforceCeilingEnabled() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpuEnforceCeilingEnabled(); + } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 9fb44ca..f0d4136 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -67,4 +67,8 @@ void setSystemCredentialsForApps( Map systemCredentials); + + List getContainersWithCpuEnforceCeilingEnabled(); + + void setContainersWithCpuEnforceCeilingEnabled(List containerIds); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 1e91514..b7456a4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -58,6 +58,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private List containersWithCpuCeilingEnabled = null; public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); @@ -96,6 +97,9 @@ private void mergeLocalToBuilder() { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + if (this.containersWithCpuCeilingEnabled != null) { + addContainersWithCpuEnforceCeilingEnabledToProto(); + } } private void addSystemCredentialsToProto() { @@ -116,6 +120,70 @@ private void mergeLocalToProto() { viaProto = true; } + @Override + public List getContainersWithCpuEnforceCeilingEnabled() { + initContainerssWithCpuEnforceCeilingEnabled(); + return this.containersWithCpuCeilingEnabled; + } + + private void initContainerssWithCpuEnforceCeilingEnabled() { + if (this.containersWithCpuCeilingEnabled != null) { + return; + } + + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getContainersWithCpuEnforceCeilingEnabledList(); + this.containersWithCpuCeilingEnabled = new ArrayList(); + + for (ContainerIdProto c : list) { + this.containersWithCpuCeilingEnabled.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setContainersWithCpuEnforceCeilingEnabled( + List containerIds) { + if (containersWithCpuCeilingEnabled == null) { + builder.clearContainersWithCpuEnforceCeilingEnabled(); + } + this.containersWithCpuCeilingEnabled = containerIds; + } + + private void addContainersWithCpuEnforceCeilingEnabledToProto() { + maybeInitBuilder(); + builder.clearContainersWithCpuEnforceCeilingEnabled(); + if (containersWithCpuCeilingEnabled == null) { + return; + } + + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = + containersWithCpuCeilingEnabled.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersWithCpuEnforceCeilingEnabled(iterable); + } + private void maybeInitBuilder() { if (viaProto || builder == null) { builder = NodeHeartbeatResponseProto.newBuilder(proto); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 91473c5..8cb7fe0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -60,6 +60,7 @@ message NodeHeartbeatResponseProto { optional string diagnostics_message = 8; repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; + repeated ContainerIdProto containers_with_cpu_enforce_ceiling_enabled = 11; } message SystemCredentialsForAppsProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 327f882..a2670e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -133,13 +133,16 @@ public abstract void startLocalizer(Path nmPrivateContainerTokens, * @param containerWorkDir the work dir for the container * @param localDirs nm-local-dirs to be used for this container * @param logDirs nm-log-dirs to be used for this container + * @param cpuEnforceCeilingEnabled whether enable cpu ceiling, + * only used by {@link LinuxContainerExecutor} * @return the return status of the launch * @throws IOException */ public abstract int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException; + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException; public abstract boolean signalContainer(String user, String pid, Signal signal) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 6e7e2ec..f5ff9d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.List; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; @@ -77,4 +78,8 @@ boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); + + List getContainersWithCpuEnforceCeilingEnabled(); + + void setContainersWithCpuEnforceCeilingEnabled(List containerIds); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index cc2de99..66b997f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -133,7 +133,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { FsPermission dirPerm = new FsPermission(APPDIR_PERM); ContainerId containerId = container.getContainerId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java index d8dd890..8704bc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -142,7 +142,8 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { String containerImageName = container.getLaunchContext().getEnvironment() .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 4db4ef2..a89d045 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -256,7 +256,8 @@ public void buildMainArgs(List command, String user, String appId, public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { verifyUsernamePattern(user); String runAsUser = getRunAsUser(user); @@ -265,7 +266,7 @@ public int launchContainer(Container container, String containerIdStr = ConverterUtils.toString(containerId); resourcesHandler.preExecute(containerId, - container.getResource()); + container.getResource(), cpuEnforceCeilingEnabled); String resourcesOptions = resourcesHandler.getResourcesOption( containerId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 53cbb11..b18fab1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -322,6 +323,7 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); + private List containersWithCpuEnforceCeilingEnabled; private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -362,6 +364,16 @@ public int getHttpPort() { } @Override + public List getContainersWithCpuEnforceCeilingEnabled() { + return this.containersWithCpuEnforceCeilingEnabled; + } + + @Override + public void setContainersWithCpuEnforceCeilingEnabled(List containerIds) { + this.containersWithCpuEnforceCeilingEnabled = containerIds; + } + + @Override public ConcurrentMap getApplications() { return this.applications; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f561dbb..14a57c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -640,6 +640,10 @@ public void run() { ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + // Update list of containers with cpu_enforce_ceiling enabled + context.setContainersWithCpuEnforceCeilingEnabled( + response.getContainersWithCpuEnforceCeilingEnabled()); } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index a87238d..bdc48b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -299,9 +299,12 @@ public Integer call() { } else { exec.activateContainer(containerID, pidFilePath); + boolean cpuEnforceCeilingEnabled = + context.getContainersWithCpuEnforceCeilingEnabled() != null && + context.getContainersWithCpuEnforceCeilingEnabled().contains(containerID); ret = exec.launchContainer(container, nmPrivateContainerScriptPath, - nmPrivateTokensPath, user, appIdStr, containerWorkDir, - localDirs, logDirs); + nmPrivateTokensPath, user, appIdStr, containerWorkDir, localDirs, + logDirs, cpuEnforceCeilingEnabled); } } catch (Throwable e) { LOG.warn("Failed to launch container.", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 63039d8..d0052e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -293,8 +293,8 @@ boolean deleteCgroup(String cgroupPath) { * Next three functions operate on all the resources we are enforcing. */ - private void setupLimits(ContainerId containerId, - Resource containerResource) throws IOException { + private void setupLimits(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { String containerName = containerId.toString(); if (isCpuWeightEnabled()) { @@ -303,7 +303,8 @@ private void setupLimits(ContainerId containerId, int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; updateCgroup(CONTROLLER_CPU, containerName, "shares", String.valueOf(cpuShares)); - if (strictResourceUsageMode) { + + if (strictResourceUsageMode || cpuEnforceCeilingEnabled) { int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); @@ -330,9 +331,9 @@ private void clearLimits(ContainerId containerId) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) - throws IOException { - setupLimits(containerId, containerResource); + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { + setupLimits(containerId, containerResource, cpuEnforceCeilingEnabled); } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java index 9fb8707..b245a90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; +import java.io.IOException; + public class DefaultLCEResourcesHandler implements LCEResourcesHandler { final static Log LOG = LogFactory @@ -51,7 +53,8 @@ public void init(LinuxContainerExecutor lce) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) { + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) { } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java index 34f7f31..70c286e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java @@ -34,9 +34,11 @@ * inside the container. * @param containerId the id of the container being launched * @param containerResource the node resources the container will be using + * @param cpuEnforceCeilingEnabled whether enable cpu_enforce_ceiling for this + * container, only work for {@link CgroupsLCEResourcesHandler} */ - void preExecute(ContainerId containerId, Resource containerResource) - throws IOException; + void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException; /** * Called by the LinuxContainerExecutor after the executable inside the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index c669047..ba6e870 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -311,7 +311,7 @@ public Object answer(InvocationOnMock invocationOnMock) mockExec.activateContainer(cId, pidFile); int ret = mockExec .launchContainer(container, scriptPath, tokensPath, appSubmitter, - appId, workDir, localDirs, localDirs); + appId, workDir, localDirs, localDirs, false); Assert.assertNotSame(0, ret); } finally { mockExec.deleteAsUser(appSubmitter, localDir); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java index e43ac2e..61098bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java @@ -165,7 +165,7 @@ private int runAndBlock(ContainerId cId, Map launchCtxEnv, Strin exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } private String writeScriptFile(Map launchCtxEnv, String... cmd) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java index fa8bfaf..ebf6d8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java @@ -143,7 +143,7 @@ public void testContainerLaunchNullImage() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test(expected = IllegalArgumentException.class) @@ -175,7 +175,7 @@ public void testContainerLaunchInvalidImage() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test @@ -205,7 +205,7 @@ public void testContainerLaunch() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); assertEquals(0, ret); //get the script Path sessionScriptPath = new Path(workDir, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index 7417f69..66c2581 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -222,7 +222,7 @@ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException { exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test @@ -465,8 +465,8 @@ public void init(LinuxContainerExecutor lce) throws IOException { } @Override - public void preExecute(ContainerId containerId, Resource containerResource) - throws IOException { + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index d54367a..f9eaf97 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -132,7 +132,7 @@ public void testContainerLaunch() throws IOException { mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); assertEquals(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, @@ -279,7 +279,7 @@ public Object answer(InvocationOnMock invocationOnMock) mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); Assert.assertNotSame(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java index d0bceee..63a8a5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java @@ -266,7 +266,7 @@ public void testContainerLimits() throws IOException { // check values // default case - files shouldn't exist, strict mode off by default ContainerId id = ContainerId.fromString("container_1_1_1_1"); - handler.preExecute(id, Resource.newInstance(1024, 1)); + handler.preExecute(id, Resource.newInstance(1024, 1), false); File containerDir = new File(cgroupMountDir, id.toString()); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); @@ -281,7 +281,7 @@ public void testContainerLimits() throws IOException { YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); handler.initConfig(); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); @@ -295,7 +295,7 @@ public void testContainerLimits() throws IOException { YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); handler.initConfig(); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); @@ -314,7 +314,7 @@ public void testContainerLimits() throws IOException { handler.initConfig(); handler.init(mockLCE, plugin); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4beb895..9f0d85a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -419,6 +419,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } + // Add containers with cpu_enforce_ceiling_enforced. + rmNode.updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + nodeHeartBeatResponse); + // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index e37d8fd..2ad056d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -455,6 +456,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + container.notifyNodeIfCpuCeilingEnabled(true); container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.appAttemptId)); } @@ -494,6 +496,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.finishTime = System.currentTimeMillis(); container.finishedStatus = finishedEvent.getRemoteContainerStatus(); + container.notifyNodeIfCpuCeilingEnabled(false); // Inform AppAttempt // container.getContainer() can return null when a RMContainer is a // reserved container @@ -537,6 +540,7 @@ private static void updateAttemptMetrics(RMContainerImpl container) { FinishedTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + container.notifyNodeIfCpuCeilingEnabled(false); // Unregister from containerAllocationExpirer. container.containerAllocationExpirer.unregister(container .getContainerId()); @@ -579,4 +583,30 @@ public ContainerReport createContainerReport() { } return containerReport; } + + /** + * Notify the corresponding node the update its containers with + * cpu_enforce_ceiling enabled. + * + * @param addOrRemove true for add, and false for remove + */ + private void notifyNodeIfCpuCeilingEnabled(boolean addOrRemove) { + try { + if (rmContext.getRMApps() != null) { + RMApp app = rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (app.getApplicationSubmissionContext().getCpuEnforceCeilingEnabled()) { + RMNode node = rmContext.getRMNodes().get(nodeId); + if (addOrRemove) { + node.addContainerWithCpuCeilingEnabled(containerId); + } else { + node.removeContainerWithCpuCeilingEnabled(containerId); + } + } + } + } catch (Exception e) { + LOG.warn("Exception happened when the container " + containerId + + " notifies node " + nodeId + + " about cpu_enforce_ceiling_enabled information.", e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 95eeaf6..c875b89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -142,4 +142,32 @@ * @return labels in this node */ public Set getNodeLabels(); + + /** + * Update a {@link NodeHeartbeatResponse} with the list of applications + * which are cpu_enforce_ceiling enabled and have containers launched at + * that node. + * + * @param response the {@link NodeHeartbeatResponse} to update + */ + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled(NodeHeartbeatResponse response); + + /** + * Add a {@link ContainerId} whose cpu_enforce_ceiling is enabled + * + * @param containerId the {@link ContainerId} to add + */ + public void addContainerWithCpuCeilingEnabled(ContainerId containerId); + + /** + * Remove a {@link ContainerId} from the list + * + * @param containerId the {@link ContainerId} to remove + */ + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId); + + /** + * Get the list of {@link ContainerId} with cpu_enforce_ceiling enbaled + */ + public List getContainersWithCpuCeilingEnabled(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1774eb5..6b4bbf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -122,6 +122,10 @@ /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); + /* the list of containers that have cpu_enforce_ceiling enabled */ + private final List containersWithCpuCeilingEnabled = + new ArrayList(); + private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); @@ -407,6 +411,46 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { } } + @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + response.setContainersWithCpuEnforceCeilingEnabled( + this.containersWithCpuCeilingEnabled); + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + this.writeLock.lock(); + + try { + this.containersWithCpuCeilingEnabled.add(containerId); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + this.writeLock.lock(); + + try { + this.containersWithCpuCeilingEnabled.remove(containerId); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + this.readLock.lock(); + + try { + return this.containersWithCpuCeilingEnabled; + } finally { + this.readLock.unlock(); + } + } + public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 228f200..4691eb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -195,6 +195,24 @@ public String getNodeManagerVersion() { } @Override + public void updateNodeHeartbeatResponseForContainersCpuCeilingEnabled( + NodeHeartbeatResponse response) { + } + + @Override + public void addContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public void removeContainerWithCpuCeilingEnabled(ContainerId containerId) { + } + + @Override + public List getContainersWithCpuCeilingEnabled() { + return null; + } + + @Override public String getHealthReport() { return healthReport; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 21aba3b..7da42b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -57,7 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -270,4 +273,101 @@ public void testExistenceOfResourceRequestInRMContainer() throws Exception { Assert.assertNull(scheduler.getRMContainer(containerId2) .getResourceRequests()); } + + @Test + public void testNotifyNodeIfCpuCeilingEnabled() { + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler appAttemptEventHandler = mock(EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + ConcurrentMap applications = + new ConcurrentHashMap(); + ConcurrentMap nodes = new ConcurrentHashMap(); + RMContext rmContext = mock(RMContext.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getRMApps()).thenReturn(applications); + when(rmContext.getRMNodes()).thenReturn(nodes); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + + // Create a RMNode + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + RMNode node = new RMNodeImpl(nodeId, rmContext, null, 10, 20, null, null, null); + rmContext.getRMNodes().put(nodeId, node); + + // Create a RMApp + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); + ApplicationSubmissionContext appContext = mock(ApplicationSubmissionContext.class); + when(appContext.getCpuEnforceCeilingEnabled()).thenReturn(true); + RMApp app = mock(RMApp.class); + when(app.getApplicationSubmissionContext()).thenReturn(appContext); + rmContext.getRMApps().put(appId, app); + + // Create two RMContainers + ContainerId containerId1 = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerId containerId2 = BuilderUtils.newContainerId(appAttemptId, 2); + Container container1 = BuilderUtils.newContainer(containerId1, nodeId, + "host:3465", null, null, null); + Container container2 = BuilderUtils.newContainer(containerId2, nodeId, + "host:3466", null, null, null); + RMContainer rmContainer1 = new RMContainerImpl(container1, appAttemptId, + nodeId, "user", rmContext); + RMContainer rmContainer2 = new RMContainerImpl(container2, appAttemptId, + nodeId, "user", rmContext); + assertEquals(RMContainerState.NEW, rmContainer1.getState()); + assertEquals(RMContainerState.NEW, rmContainer2.getState()); + + // NEW --> ALLOCATED + rmContainer1.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + rmContainer2.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer1.getState()); + assertEquals(RMContainerState.ALLOCATED, rmContainer2.getState()); + assertEquals(2, node.getContainersWithCpuCeilingEnabled().size()); + + // Finish one container + rmContainer1.handle(new RMContainerEvent(containerId1, + RMContainerEventType.ACQUIRED)); + drainDispatcher.await(); + ContainerStatus containerStatus1 = SchedulerUtils + .createAbnormalContainerStatus(containerId1, + SchedulerUtils.COMPLETED_APPLICATION); + rmContainer1.handle(new RMContainerFinishedEvent(containerId1, + containerStatus1, RMContainerEventType.FINISHED)); + drainDispatcher.await(); + assertEquals(RMContainerState.COMPLETED, rmContainer1.getState()); + assertEquals(1, node.getContainersWithCpuCeilingEnabled().size()); + + // Kill another container + ContainerStatus containerStatus2 = SchedulerUtils + .createAbnormalContainerStatus(containerId2, + SchedulerUtils.RELEASED_CONTAINER); + rmContainer2.handle(new RMContainerFinishedEvent(containerId2, + containerStatus2, RMContainerEventType.KILL)); + drainDispatcher.await(); + assertEquals(RMContainerState.KILLED, rmContainer2.getState()); + assertEquals(0, node.getContainersWithCpuCeilingEnabled().size()); + + // Reset cpu_enforce_ceiling to false + when(appContext.getCpuEnforceCeilingEnabled()).thenReturn(false); + Container container3 = BuilderUtils.newContainer(containerId2, nodeId, + "host:3467", null, null, null); + RMContainer rmContainer3 = new RMContainerImpl(container3, appAttemptId, + nodeId, "user", rmContext); + rmContainer3.handle(new RMContainerEvent(containerId1, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer3.getState()); + assertEquals(0, node.getContainersWithCpuCeilingEnabled().size()); + } }