diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java index d13159b..bfb4890 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java @@ -35,14 +35,23 @@ private final String user; private final ContainerId containerId; private final Resource resource; + private final boolean isAMContainer; @Private @Unstable public ContainerContext(String user, ContainerId containerId, Resource resource) { + this(user, containerId, resource, false); + } + + @Private + @Unstable + public ContainerContext(String user, ContainerId containerId, + Resource resource, boolean isAMContainer) { this.user = user; this.containerId = containerId; this.resource = resource; + this.isAMContainer = isAMContainer; } /** @@ -72,4 +81,13 @@ public ContainerId getContainerId() { public Resource getResource() { return resource; } + + /** + * Get whether the container is an AM or not. + * + * @return true if the container is an AM, false otherwise. + */ + public boolean isAMContainer() { + return isAMContainer; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java index 5b5bbda..d1accbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java @@ -41,4 +41,11 @@ public ContainerInitializationContext(String user, ContainerId containerId, super(user, containerId, resource); } + @Private + @Unstable + public ContainerInitializationContext(String user, ContainerId containerId, + Resource resource, boolean isAMContainer) { + super(user, containerId, resource, isAMContainer); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java index 34ba73e..0605a44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java @@ -41,4 +41,11 @@ public ContainerTerminationContext(String user, ContainerId containerId, super(user, containerId, resource); } + @Private + @Unstable + public ContainerTerminationContext(String user, ContainerId containerId, + Resource resource, boolean isAMContainer) { + super(user, containerId, resource, isAMContainer); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 9a60d01..db898db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -66,14 +66,24 @@ public ContainerTokenIdentifier(ContainerId containerID, int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, null, - CommonNodeLabelsManager.NO_LABEL); + CommonNodeLabelsManager.NO_LABEL, false); } public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime, LogAggregationContext logAggregationContext, String nodeLabelExpression) { - ContainerTokenIdentifierProto.Builder builder = + this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, + rmIdentifier, priority, creationTime, logAggregationContext, + nodeLabelExpression, false); + } + + public ContainerTokenIdentifier(ContainerId containerID, String hostName, + String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, + long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + boolean isAMContainer) { + ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.newBuilder(); if (containerID != null) { builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); @@ -99,7 +109,8 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, if (nodeLabelExpression != null) { builder.setNodeLabelExpression(nodeLabelExpression); } - + builder.setIsAMContainer(isAMContainer); + proto = builder.build(); } @@ -168,6 +179,15 @@ public LogAggregationContext getLogAggregationContext() { return new LogAggregationContextPBImpl(proto.getLogAggregationContext()); } + /** + * Get whether the container is an AM or not. + * + * @return true if the container is an AM, false otherwise. + */ + public boolean getIsAMContainer() { + return proto.getIsAMContainer(); + } + @Override public void write(DataOutput out) throws IOException { LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto index d1bef21..ac8604c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto @@ -50,6 +50,7 @@ message ContainerTokenIdentifierProto { optional int64 creationTime = 9; optional LogAggregationContextProto logAggregationContext = 10; optional string nodeLabelExpression = 11; + optional bool isAMContainer = 12 [default = false]; } message ClientToAMTokenIdentifierProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java index 5fe75bc..0cfcd243 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -201,6 +202,11 @@ public void testContainerTokenIdentifier() throws IOException { anotherToken.getCreationTime(), creationTime); Assert.assertNull(anotherToken.getLogAggregationContext()); + + Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL, + anotherToken.getNodeLabelExpression()); + + Assert.assertFalse(anotherToken.getIsAMContainer()); } @Test @@ -347,4 +353,77 @@ public void testParseTimelineDelegationTokenIdentifierRenewer() throws IOExcepti Assert.assertEquals(new Text("yarn"), token.getRenewer()); } + @Test + public void testAMContainerTokenIdentifier() throws IOException { + ContainerId containerID = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance( + 1, 1), 1), 1); + String hostName = "host0"; + String appSubmitter = "usr0"; + Resource r = Resource.newInstance(1024, 1); + long expiryTimeStamp = 1000; + int masterKeyId = 1; + long rmIdentifier = 1; + Priority priority = Priority.newInstance(1); + long creationTime = 1000; + + ContainerTokenIdentifier token = new ContainerTokenIdentifier( + containerID, hostName, appSubmitter, r, expiryTimeStamp, + masterKeyId, rmIdentifier, priority, creationTime, null, CommonNodeLabelsManager.NO_LABEL, true); + + ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(); + + byte[] tokenContent = token.getBytes(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(tokenContent, tokenContent.length); + anotherToken.readFields(dib); + + // verify the whole record equals with original record + Assert.assertEquals("Token is not the same after serialization " + + "and deserialization.", token, anotherToken); + + Assert.assertEquals( + "ContainerID from proto is not the same with original token", + anotherToken.getContainerID(), containerID); + + Assert.assertEquals( + "Hostname from proto is not the same with original token", + anotherToken.getNmHostAddress(), hostName); + + Assert.assertEquals( + "ApplicationSubmitter from proto is not the same with original token", + anotherToken.getApplicationSubmitter(), appSubmitter); + + Assert.assertEquals( + "Resource from proto is not the same with original token", + anotherToken.getResource(), r); + + Assert.assertEquals( + "expiryTimeStamp from proto is not the same with original token", + anotherToken.getExpiryTimeStamp(), expiryTimeStamp); + + Assert.assertEquals( + "KeyId from proto is not the same with original token", + anotherToken.getMasterKeyId(), masterKeyId); + + Assert.assertEquals( + "RMIdentifier from proto is not the same with original token", + anotherToken.getRMIdentifier(), rmIdentifier); + + Assert.assertEquals( + "Priority from proto is not the same with original token", + anotherToken.getPriority(), priority); + + Assert.assertEquals( + "CreationTime from proto is not the same with original token", + anotherToken.getCreationTime(), creationTime); + + Assert.assertNull(anotherToken.getLogAggregationContext()); + + Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL, + anotherToken.getNodeLabelExpression()); + + Assert.assertTrue(anotherToken.getIsAMContainer()); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index fb6f79b..78ed1a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -225,7 +225,8 @@ public void handle(AuxServicesEvent event) { try { serv.initializeContainer(new ContainerInitializationContext( event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + event.getContainer().getResource(), event.getContainer() + .getContainerTokenIdentifier().getIsAMContainer())); } catch (Throwable th) { logWarningWhenAuxServiceThrowExceptions(serv, AuxServicesEventType.CONTAINER_INIT, th); @@ -237,7 +238,8 @@ public void handle(AuxServicesEvent event) { try { serv.stopContainer(new ContainerTerminationContext( event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + event.getContainer().getResource(), event.getContainer() + .getContainerTokenIdentifier().getIsAMContainer())); } catch (Throwable th) { logWarningWhenAuxServiceThrowExceptions(serv, AuxServicesEventType.CONTAINER_STOP, th); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dbc3cb5..5c10966 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -467,13 +467,28 @@ public ContainersAndNMTokensAllocation(List containerList, .hasNext();) { RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); + boolean isAMContainer = false; + // The working knowledge is that masterContainer for AM is null as it + // itself is the master container. + RMAppAttempt appAttempt = + rmContext + .getRMApps() + .get( + container.getId().getApplicationAttemptId() + .getApplicationId()).getCurrentAppAttempt(); + if (appAttempt != null && appAttempt.getSubmissionContext() != null) { + if (appAttempt.getMasterContainer() == null + && appAttempt.getSubmissionContext().getUnmanagedAM() == false) { + isAMContainer = true; + } + } try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext, - rmContainer.getNodeLabelExpression())); + rmContainer.getNodeLabelExpression(), isAMContainer)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e5b44a6..598f279 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -829,7 +829,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, RMAppAttempt rmAppAttempt = csContext.getRMContext().getRMApps() .get(application.getApplicationId()).getCurrentAppAttempt(); - if (null == rmAppAttempt.getMasterContainer()) { + if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false + && null == rmAppAttempt.getMasterContainer()) { if (LOG.isDebugEnabled()) { LOG.debug("Skip allocating AM container to app_attempt=" + application.getApplicationAttemptId() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 1c0533d..e7d05fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -192,12 +192,36 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, * @param priority * @param createTime * @param logAggregationContext + * @param nodeLabelExpression * @return the container-token */ public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, LogAggregationContext logAggregationContext, String nodeLabelExpression) { + return createContainerToken(containerId, nodeId, appSubmitter, capability, + priority, createTime, logAggregationContext, nodeLabelExpression, + false); + } + + /** + * Helper function for creating ContainerTokens + * + * @param containerId + * @param nodeId + * @param appSubmitter + * @param capability + * @param priority + * @param createTime + * @param logAggregationContext + * @param nodeLabelExpression + * @param isAMContainer + * @return the container-token + */ + public Token createContainerToken(ContainerId containerId, NodeId nodeId, + String appSubmitter, Resource capability, Priority priority, + long createTime, LogAggregationContext logAggregationContext, + String nodeLabelExpression, boolean isAMContainer) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -211,7 +235,7 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, appSubmitter, capability, expiryTimeStamp, this.currentMasterKey .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext, nodeLabelExpression); + logAggregationContext, nodeLabelExpression, isAMContainer); password = this.createPassword(tokenIdentifier); } finally {