diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index f1ebbfe..11ec977 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -76,6 +76,21 @@ public static ApplicationSubmissionContext newInstance( int maxAppAttempts, Resource resource, String applicationType, boolean keepContainers, String appLabelExpression, String amContainerLabelExpression) { + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, appLabelExpression, + amContainerLabelExpression, false); + } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers, String appLabelExpression, + String amContainerLabelExpression, boolean cpuEnforceCeilingEnabled) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -98,6 +113,7 @@ public static ApplicationSubmissionContext newInstance( amReq.setRelaxLocality(true); amReq.setNodeLabelExpression(amContainerLabelExpression); context.setAMContainerResourceRequest(amReq); + context.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); return context; } @@ -532,4 +548,26 @@ public abstract void setLogAggregationContext( @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get the flag which indicates whether to enable cpu enforce ceiling for + * this application. + * + * @return the flag which indicates whether to enable cpu enforce ceiling + * or not. + */ + @Public + @Evolving + public abstract boolean getCpuEnforceCeilingEnabled(); + + /** + * Set the flag which indicates whether to enable cpu enforce ceiling for + * this application. + * + * @param ceilingEnabled + * the flag which indicates whether to enable cpu enforce ceiling. + */ + @Public + @Evolving + public abstract void setCpuEnforceCeilingEnabled(boolean ceilingEnabled); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c4e756d..d106ded 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -303,6 +303,7 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + optional bool cpu_enforce_ceiling_enabled = 18 [default = false]; } message LogAggregationContextProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0e9a4e4..80f9b23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -153,6 +153,8 @@ // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; + // cpu_enforce_ceiling enabled or not + private boolean cpuEnforceCeilingEnabled = false; // log4j.properties file // if available, add to local resources and set into classpath @@ -259,6 +261,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("cpu_enforce_ceiling_enabled", true, "Whether enable cpu_enforce_ceiling (true/false)"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, "Flag to indicate whether to keep containers across application attempts." + @@ -397,7 +400,7 @@ public boolean init(String[] args) throws ParseException { containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - + cpuEnforceCeilingEnabled = Boolean.parseBoolean(cliParser.getOptionValue("cpu_enforce_ceiling_enabled", "false")); if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," @@ -510,6 +513,7 @@ public boolean run() throws IOException, YarnException { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + appContext.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 303b437..536c9f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -412,6 +412,18 @@ public boolean getKeepContainersAcrossApplicationAttempts() { return p.getKeepContainersAcrossApplicationAttempts(); } + @Override + public void setCpuEnforceCeilingEnabled(boolean ceilingEnabled) { + maybeInitBuilder(); + builder.setCpuEnforceCeilingEnabled(ceilingEnabled); + } + + @Override + public boolean getCpuEnforceCeilingEnabled() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpuEnforceCeilingEnabled(); + } + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 593bfc3..ca16a04 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -71,7 +71,15 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime, LogAggregationContext logAggregationContext) { - ContainerTokenIdentifierProto.Builder builder = + this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, + rmIdentifier, priority, creationTime, logAggregationContext, false); + } + + public ContainerTokenIdentifier(ContainerId containerID, String hostName, + String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, + long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext, boolean cpuEnforceCeilingEnabled) { + ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.newBuilder(); if (containerID != null) { builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); @@ -88,11 +96,12 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, builder.setPriority(((PriorityPBImpl)priority).getProto()); } builder.setCreationTime(creationTime); - + if (logAggregationContext != null) { builder.setLogAggregationContext( ((LogAggregationContextPBImpl)logAggregationContext).getProto()); } + builder.setCpuEnforceCeilingEnabled(cpuEnforceCeilingEnabled); proto = builder.build(); } @@ -161,6 +170,10 @@ public LogAggregationContext getLogAggregationContext() { return new LogAggregationContextPBImpl(proto.getLogAggregationContext()); } + public boolean getCpuEnforceCeilingEnabled() { + return proto.getCpuEnforceCeilingEnabled(); + } + @Override public void write(DataOutput out) throws IOException { LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto index 317032d..2e70ac6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto @@ -49,6 +49,7 @@ message ContainerTokenIdentifierProto { optional PriorityProto priority = 8; optional int64 creationTime = 9; optional LogAggregationContextProto logAggregationContext = 10; + optional bool cpu_enforce_ceiling_enabled = 11 [default = false]; } message ClientToAMTokenIdentifierProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 327f882..a2670e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -133,13 +133,16 @@ public abstract void startLocalizer(Path nmPrivateContainerTokens, * @param containerWorkDir the work dir for the container * @param localDirs nm-local-dirs to be used for this container * @param logDirs nm-log-dirs to be used for this container + * @param cpuEnforceCeilingEnabled whether enable cpu ceiling, + * only used by {@link LinuxContainerExecutor} * @return the return status of the launch * @throws IOException */ public abstract int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException; + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException; public abstract boolean signalContainer(String user, String pid, Signal signal) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index cc2de99..66b997f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -133,7 +133,8 @@ public void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { FsPermission dirPerm = new FsPermission(APPDIR_PERM); ContainerId containerId = container.getContainerId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java index d8dd890..8704bc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -142,7 +142,8 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, public int launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { String containerImageName = container.getLaunchContext().getEnvironment() .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); if (LOG.isDebugEnabled()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 4db4ef2..a89d045 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -256,7 +256,8 @@ public void buildMainArgs(List command, String user, String appId, public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, - List localDirs, List logDirs) throws IOException { + List localDirs, List logDirs, + boolean cpuEnforceCeilingEnabled) throws IOException { verifyUsernamePattern(user); String runAsUser = getRunAsUser(user); @@ -265,7 +266,7 @@ public int launchContainer(Container container, String containerIdStr = ConverterUtils.toString(containerId); resourcesHandler.preExecute(containerId, - container.getResource()); + container.getResource(), cpuEnforceCeilingEnabled); String resourcesOptions = resourcesHandler.getResourcesOption( containerId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index a87238d..108773b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -300,8 +300,8 @@ public Integer call() { else { exec.activateContainer(containerID, pidFilePath); ret = exec.launchContainer(container, nmPrivateContainerScriptPath, - nmPrivateTokensPath, user, appIdStr, containerWorkDir, - localDirs, logDirs); + nmPrivateTokensPath, user, appIdStr, containerWorkDir, localDirs, + logDirs, container.getContainerTokenIdentifier().getCpuEnforceCeilingEnabled()); } } catch (Throwable e) { LOG.warn("Failed to launch container.", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 63039d8..d0052e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -293,8 +293,8 @@ boolean deleteCgroup(String cgroupPath) { * Next three functions operate on all the resources we are enforcing. */ - private void setupLimits(ContainerId containerId, - Resource containerResource) throws IOException { + private void setupLimits(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { String containerName = containerId.toString(); if (isCpuWeightEnabled()) { @@ -303,7 +303,8 @@ private void setupLimits(ContainerId containerId, int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; updateCgroup(CONTROLLER_CPU, containerName, "shares", String.valueOf(cpuShares)); - if (strictResourceUsageMode) { + + if (strictResourceUsageMode || cpuEnforceCeilingEnabled) { int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); @@ -330,9 +331,9 @@ private void clearLimits(ContainerId containerId) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) - throws IOException { - setupLimits(containerId, containerResource); + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { + setupLimits(containerId, containerResource, cpuEnforceCeilingEnabled); } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java index 9fb8707..b579fa7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java @@ -51,7 +51,8 @@ public void init(LinuxContainerExecutor lce) { * LCE Resources Handler interface */ - public void preExecute(ContainerId containerId, Resource containerResource) { + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) { } public void postExecute(ContainerId containerId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java index 34f7f31..70c286e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java @@ -34,9 +34,11 @@ * inside the container. * @param containerId the id of the container being launched * @param containerResource the node resources the container will be using + * @param cpuEnforceCeilingEnabled whether enable cpu_enforce_ceiling for this + * container, only work for {@link CgroupsLCEResourcesHandler} */ - void preExecute(ContainerId containerId, Resource containerResource) - throws IOException; + void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException; /** * Called by the LinuxContainerExecutor after the executable inside the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index c669047..ba6e870 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -311,7 +311,7 @@ public Object answer(InvocationOnMock invocationOnMock) mockExec.activateContainer(cId, pidFile); int ret = mockExec .launchContainer(container, scriptPath, tokensPath, appSubmitter, - appId, workDir, localDirs, localDirs); + appId, workDir, localDirs, localDirs, false); Assert.assertNotSame(0, ret); } finally { mockExec.deleteAsUser(appSubmitter, localDir); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java index e43ac2e..61098bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java @@ -165,7 +165,7 @@ private int runAndBlock(ContainerId cId, Map launchCtxEnv, Strin exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } private String writeScriptFile(Map launchCtxEnv, String... cmd) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java index fa8bfaf..ebf6d8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java @@ -143,7 +143,7 @@ public void testContainerLaunchNullImage() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test(expected = IllegalArgumentException.class) @@ -175,7 +175,7 @@ public void testContainerLaunchInvalidImage() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test @@ -205,7 +205,7 @@ public void testContainerLaunch() throws IOException { dockerContainerExecutor.activateContainer(cId, pidFile); int ret = dockerContainerExecutor.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); assertEquals(0, ret); //get the script Path sessionScriptPath = new Path(workDir, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index 7417f69..66c2581 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -222,7 +222,7 @@ private int runAndBlock(ContainerId cId, String ... cmd) throws IOException { exec.activateContainer(cId, pidFile); return exec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); } @Test @@ -465,8 +465,8 @@ public void init(LinuxContainerExecutor lce) throws IOException { } @Override - public void preExecute(ContainerId containerId, Resource containerResource) - throws IOException { + public void preExecute(ContainerId containerId, Resource containerResource, + boolean cpuEnforceCeilingEnabled) throws IOException { } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java index d54367a..f9eaf97 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java @@ -132,7 +132,7 @@ public void testContainerLaunch() throws IOException { mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); assertEquals(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, @@ -279,7 +279,7 @@ public Object answer(InvocationOnMock invocationOnMock) mockExec.activateContainer(cId, pidFile); int ret = mockExec.launchContainer(container, scriptPath, tokensPath, appSubmitter, appId, workDir, dirsHandler.getLocalDirs(), - dirsHandler.getLogDirs()); + dirsHandler.getLogDirs(), false); Assert.assertNotSame(0, ret); assertEquals(Arrays.asList(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER, appSubmitter, cmd, appId, containerId, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java index d0bceee..63a8a5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java @@ -266,7 +266,7 @@ public void testContainerLimits() throws IOException { // check values // default case - files shouldn't exist, strict mode off by default ContainerId id = ContainerId.fromString("container_1_1_1_1"); - handler.preExecute(id, Resource.newInstance(1024, 1)); + handler.preExecute(id, Resource.newInstance(1024, 1), false); File containerDir = new File(cgroupMountDir, id.toString()); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); @@ -281,7 +281,7 @@ public void testContainerLimits() throws IOException { YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); handler.initConfig(); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); @@ -295,7 +295,7 @@ public void testContainerLimits() throws IOException { YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); handler.initConfig(); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); @@ -314,7 +314,7 @@ public void testContainerLimits() throws IOException { handler.initConfig(); handler.init(mockLCE, plugin); handler.preExecute(id, - Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2), false); Assert.assertTrue(containerDir.exists()); Assert.assertTrue(containerDir.isDirectory()); periodFile = new File(containerDir, "cpu.cfs_period_us"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index e37d8fd..c6fe24c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index d5b6ce6..ab6e788 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -445,10 +445,14 @@ public ContainersAndNMTokensAllocation(List containerList, Container container = rmContainer.getContainer(); try { // create container token and NMToken altogether. + boolean cpuEnforceCeilingEnabled = rmContext.getRMApps() + .get(rmContainer.getApplicationAttemptId().getApplicationId()) + .getApplicationSubmissionContext().getCpuEnforceCeilingEnabled(); container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime(), this.logAggregationContext)); + rmContainer.getCreationTime(), this.logAggregationContext, + cpuEnforceCeilingEnabled)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 15dd1a9..4e0af7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -197,20 +197,43 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, LogAggregationContext logAggregationContext) { + return createContainerToken(containerId, nodeId, appSubmitter, capability, + priority, createTime, logAggregationContext, false); + } + + /** + * Helper function for creating ContainerTokens + * + * @param containerId + * @param nodeId + * @param appSubmitter + * @param capability + * @param priority + * @param createTime + * @param logAggregationContext + * @param cpuEnforceCeilingEnabled + * @return the container-token + */ + public Token createContainerToken(ContainerId containerId, NodeId nodeId, + String appSubmitter, Resource capability, Priority priority, + long createTime, LogAggregationContext logAggregationContext, + boolean cpuEnforceCeilingEnabled) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = System.currentTimeMillis() + containerTokenExpiryInterval; + //boolean cpuEnforceCeilingEnabled = this. + // Lock so that we use the same MasterKey's keyId and its bytes this.readLock.lock(); try { tokenIdentifier = new ContainerTokenIdentifier(containerId, nodeId.toString(), - appSubmitter, capability, expiryTimeStamp, this.currentMasterKey + appSubmitter, capability, expiryTimeStamp, this.currentMasterKey .getMasterKey().getKeyId(), - ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext); + ResourceManager.getClusterTimeStamp(), priority, createTime, + logAggregationContext, cpuEnforceCeilingEnabled); password = this.createPassword(tokenIdentifier); } finally {