diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 1b2a03e..b1a2668 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -251,6 +251,18 @@ public static Token newContainerToken(NodeId nodeId, return containerToken; } + @Private + @VisibleForTesting + public static Token newContainerToken( + ContainerTokenIdentifier tokenIdentifier, byte[] password, + String tokenServiceAddress) { + Token containerToken = + newToken(Token.class, tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, + tokenServiceAddress); + return containerToken; + } + public static ContainerTokenIdentifier newContainerTokenIdentifier( Token containerToken) throws IOException { org.apache.hadoop.security.token.Token token = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index fd39dad..bab2e2b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -450,6 +448,7 @@ public AllocateResponse allocate(AllocateRequest request) } allocateResponse.setAllocatedContainers(allocation.getContainers()); + allocateResponse.setNMTokens(allocation.getNMTokens()); allocateResponse.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); @@ -460,13 +459,6 @@ public AllocateResponse allocate(AllocateRequest request) // add preemption to the allocateResponse message (if any) allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } - // before returning response, verify in sync AllocateResponse oldResponse = responseMap.put(appAttemptId, allocateResponse); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a1c1a40..3014512 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -146,7 +148,7 @@ private void cleanup() throws IOException, YarnException { // Protected. For tests. protected ContainerManagementProtocol getContainerMgrProxy( - final ContainerId containerId) { + final ContainerId containerId) throws IOException { final NodeId node = masterContainer.getNodeId(); final InetSocketAddress containerManagerBindAddress = @@ -158,13 +160,28 @@ protected ContainerManagementProtocol getContainerMgrProxy( UserGroupInformation.createRemoteUser(containerId .getApplicationAttemptId().toString()); - String user = - rmContext.getRMApps() - .get(containerId.getApplicationAttemptId().getApplicationId()) - .getUser(); - org.apache.hadoop.yarn.api.records.Token token = - rmContext.getNMTokenSecretManager().createNMToken( - containerId.getApplicationAttemptId(), node, user); + RMApp rmApp = + rmContext.getRMApps().get( + containerId.getApplicationAttemptId().getApplicationId()); + String user = rmApp.getUser(); + org.apache.hadoop.yarn.api.records.Token token = null; + + if (eventType == AMLauncherEventType.LAUNCH) { + token = rmApp.getCurrentAppAttempt().getNMToken().getToken(); + } else { + try { + token = + rmContext.getNMTokenSecretManager().createNMToken( + containerId.getApplicationAttemptId(), node, user); + } catch (IllegalArgumentException iae) { + LOG.error(iae); + if (iae.getCause() instanceof UnknownHostException) { + throw new IOException(iae); + } else { + throw iae; + } + } + } currentUser.addToken(ConverterUtils.convertFromYarn(token, containerManagerBindAddress)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda..a1ba1bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -31,11 +31,13 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -178,4 +180,14 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * Get application Master node's NMToken + */ + NMToken getNMToken(); + + /** + * Get RMContext + */ + RMContext getRMContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1247bb7..ddc7b26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -140,6 +141,7 @@ private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private NMToken amNMToken; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -441,6 +443,16 @@ public ApplicationSubmissionContext getSubmissionContext() { } @Override + public RMContext getRMContext() { + return this.rmContext; + } + + @Override + public NMToken getNMToken() { + return this.amNMToken; + } + + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); try { @@ -841,12 +853,21 @@ public void transition(RMAppAttemptImpl appAttempt, // and is put in SchedulerApplication#newlyAllocatedContainers. Then, // YarnScheduler#allocate will fetch it. assert amContainerAllocation.getContainers().size() != 0; + + appAttempt.amNMToken = amContainerAllocation.getNMTokens().get(0); // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( 0)); appAttempt.getSubmissionContext().setResource( appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); + // We need to clear NMToken for app attempt from NMTokenSecretManager + // cache. + appAttempt + .getRMContext() + .getNMTokenSecretManager() + .removeNodeKeyForAppAttempt(appAttempt.getAppAttemptId(), + appAttempt.amNMToken.getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d..47ae574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,20 +33,24 @@ final Set strictContainers; final Set fungibleContainers; final List fungibleResources; + final List nmTokens; - public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); + public Allocation(List containers, List nmTokens, + Resource resourceLimit) { + this(containers, nmTokens, resourceLimit, null, null, null); } - public Allocation(List containers, Resource resourceLimit, - Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); + public Allocation(List containers, List nmTokens, + Resource resourceLimit, Set strictContainers) { + this(containers, nmTokens, resourceLimit, strictContainers, null, null); } - public Allocation(List containers, Resource resourceLimit, - Set strictContainers, Set fungibleContainers, + public Allocation(List containers, List nmTokens, + Resource resourceLimit, Set strictContainers, + Set fungibleContainers, List fungibleResources) { this.containers = containers; + this.nmTokens = nmTokens; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; @@ -58,6 +61,10 @@ public Allocation(List containers, Resource resourceLimit, return containers; } + public List getNMTokens() { + return nmTokens; + } + public Resource getResourceLimit() { return resourceLimit; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 0fb8acb..5826dba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.HashMultiset; @@ -76,6 +79,10 @@ protected List newlyAllocatedContainers = new ArrayList(); + protected List newlyAllocatedNMTokens = + new ArrayList(); + private static final List EMPTY_NM_TOKEN_LIST = + new ArrayList(); /** * Count how many times the application has been given an opportunity @@ -342,7 +349,20 @@ public Resource getCurrentConsumption() { public synchronized List pullNewlyAllocatedContainers() { List returnContainerList = new ArrayList( newlyAllocatedContainers.size()); + RMContainerTokenSecretManager containerTokenSecretManager = + rmContext.getContainerTokenSecretManager(); for (RMContainer rmContainer : newlyAllocatedContainers) { + // We are refreshing the expiry interval for all the container tokens + try { + rmContainer.getContainer().setContainerToken( + containerTokenSecretManager.updateExpirationInterval(rmContainer + .getContainer().getContainerToken())); + } catch (IOException e) { + // We should ignore this because the container token was successfully + // generated and generated token is correct. Additionally we are not + // doing here DNS lookup for retrieving service. + LOG.error("error renewing container token expiry interval", e); + } rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); returnContainerList.add(rmContainer.getContainer()); @@ -351,6 +371,13 @@ public Resource getCurrentConsumption() { return returnContainerList; } + public synchronized List pullNewlyAssignedNMTokens() { + List nmTokens = + new ArrayList(this.newlyAllocatedNMTokens); + newlyAllocatedNMTokens.clear(); + return nmTokens; + } + public synchronized void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { if (!isStopped) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e61ff93..b4497ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -97,6 +99,8 @@ private final static List EMPTY_CONTAINER_LIST = new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); static final Comparator queueComparator = new Comparator() { @Override @@ -516,7 +520,8 @@ private synchronized void doneApplication( } private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0, 0)); @Override @Lock(Lock.NoLock.class) @@ -941,4 +946,8 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, return queue.hasAccess(acl, callerUGI); } + @Override + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return this.rmContext.getNMTokenSecretManager(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 3dd4c6d..1714494 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -38,7 +39,9 @@ Resource getMaximumResourceCapability(); RMContainerTokenSecretManager getContainerTokenSecretManager(); - + + NMTokenSecretManagerInRM getNMTokenSecretManager(); + int getNumClusterNodes(); RMContext getRMContext(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 11e4973..1a70952 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; @@ -107,6 +110,7 @@ private final float minimumAllocationFactor; private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; private Map users = new HashMap(); @@ -149,6 +153,7 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); this.containerTokenSecretManager = cs.getContainerTokenSecretManager(); + this.nmTokenSecretManager = cs.getNMTokenSecretManager(); float capacity = (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100; @@ -1337,8 +1342,22 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod unreserve(application, priority, node, rmContainer); } - Token containerToken = - createContainerToken(application, container); + NMToken nmToken = null; + Token containerToken = null; + try { + containerToken = createContainerToken(application, container); + nmToken = nmTokenSecretManager.createAndGetNMToken( + application.getUser(), application.getApplicationAttemptId(), + node.getNodeID()); + } catch (IllegalArgumentException e) { + if (e.getCause() instanceof UnknownHostException) { + LOG.warn("error creating token", e); + return Resources.none(); + } else { + LOG.warn(e); + throw e; + } + } if (containerToken == null) { // Something went wrong... return Resources.none(); @@ -1347,7 +1366,8 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // Inform the application RMContainer allocatedContainer = - application.allocate(type, node, priority, request, container); + application.allocate(type, node, priority, request, container, + nmToken); // Does the application need this resource? if (allocatedContainer == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 7f51126..1a79163 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -107,7 +108,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container container, NMToken nmToken) { if (isStopped) { return null; @@ -127,6 +128,10 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); + if (nmToken != null) { + assert newlyAllocatedNMTokens.contains(nmToken) == false; + newlyAllocatedNMTokens.add(nmToken); + } liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations @@ -238,9 +243,9 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + return new Allocation(pullNewlyAllocatedContainers(), + pullNewlyAssignedNMTokens(), getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index e5991b2..63d2eb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collection; @@ -27,10 +28,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -151,53 +154,26 @@ public void setRunnable(boolean runnable) { } /** - * Create and return a container object reflecting an allocation for the - * given appliction on the given node with the given capability and - * priority. - */ - public Container createContainer( - FSSchedulerApp application, FSSchedulerNode node, - Resource capability, Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); - org.apache.hadoop.yarn.api.records.Token containerToken = - containerTokenSecretManager.createContainerToken(containerId, nodeId, - application.getUser(), capability); - if (containerToken == null) { - return null; // Try again later. - } - - // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, containerToken); - - return container; - } - - /** * Reserve a spot for {@code container} on this {@code node}. If * the container is {@code alreadyReserved} on the node, simply * update relevant bookeeping. This dispatches ro relevant handlers * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. */ private void reserve(Priority priority, FSSchedulerNode node, - Container container, boolean alreadyReserved) { + Container container, boolean alreadyReserved, NMToken nmToken) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + app.getApplicationId()); if (!alreadyReserved) { getMetrics().reserveResource(app.getUser(), container.getResource()); RMContainer rmContainer = app.reserve(node, priority, null, container); - node.reserveResource(app, priority, rmContainer); + node.reserveResource(app, priority, rmContainer, nmToken); } else { RMContainer rmContainer = node.getReservedContainer(); app.reserve(node, priority, rmContainer, container); - node.reserveResource(app, priority, rmContainer); + node.reserveResource(app, priority, rmContainer, nmToken); } } @@ -230,17 +206,41 @@ private Resource assignContainer(FSSchedulerNode node, Resource available = node.getAvailableResource(); Container container = null; + NMToken nmToken = null; if (reserved) { container = node.getReservedContainer().getContainer(); + nmToken = node.getReservedContainerNMToken(); } else { - container = createContainer(app, node, capability, priority); + try { + ContainerId containerId = + BuilderUtils.newContainerId(app.getApplicationAttemptId(), + app.getNewContainerId()); + Token containerToken = + containerTokenSecretManager.createContainerToken( + containerId, node.getNodeID(), app.getUser(), capability); + nmToken = scheduler.getNMTokenSecretManager().createAndGetNMToken( + app.getUser(), app.getApplicationAttemptId(), node.getNodeID()); + // Create the container + container = + BuilderUtils.newContainer(containerId, node.getNodeID(), + node.getRMNode().getHttpAddress(), capability, priority, + containerToken); + } catch (IllegalArgumentException e) { + if (e.getCause() instanceof UnknownHostException) { + LOG.warn("error creating tokens", e); + return Resources.none(); + } else { + LOG.warn(e); + throw e; + } + } } // Can we allocate a container on this node? if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - app.allocate(type, node, priority, request, container); + app.allocate(type, node, priority, request, container, nmToken); if (allocatedContainer == null) { // Did the application need this resource? if (reserved) { @@ -261,7 +261,7 @@ private Resource assignContainer(FSSchedulerNode node, return container.getResource(); } else { // The desired container won't fit here, so reserve - reserve(priority, node, container, reserved); + reserve(priority, node, container, reserved, nmToken); return FairScheduler.CONTAINER_RESERVED; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index caf2a97..8877761 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -249,7 +250,7 @@ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container container, NMToken nmToken) { // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { @@ -278,6 +279,9 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); + if (nmToken!= null) { + newlyAllocatedNMTokens.add(nmToken); + } liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..a41bef0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -57,6 +58,7 @@ private volatile int numContainers; private RMContainer reservedContainer; + private NMToken reservedContainerNMToken; private AppSchedulable reservedAppSchedulable; /* set of containers that are allocated containers */ @@ -210,7 +212,7 @@ public int getNumContainers() { public synchronized void reserveResource( FSSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, NMToken nmToken) { // Check if it's already reserved if (this.reservedContainer != null) { // Sanity check @@ -241,6 +243,7 @@ public synchronized void reserveResource( " on node " + this + " for application " + application); } this.reservedContainer = reservedContainer; + this.reservedContainerNMToken = nmToken; this.reservedAppSchedulable = application.getAppSchedulable(); } @@ -266,6 +269,10 @@ public synchronized RMContainer getReservedContainer() { return reservedContainer; } + public synchronized NMToken getReservedContainerNMToken() { + return reservedContainerNMToken; + } + public synchronized AppSchedulable getReservedAppSchedulable() { return reservedAppSchedulable; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7d6c6a5..1cdfbcf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -138,9 +140,12 @@ private final static List EMPTY_CONTAINER_LIST = new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0)); // Aggregate metrics FSQueueMetrics rootMetrics; @@ -544,6 +549,10 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return rmContext.getNMTokenSecretManager(); + } + // synchronized for sizeBasedWeight public synchronized ResourceWeights getAppWeight(AppSchedulable app) { if (!app.getRunnable()) { @@ -891,7 +900,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, application.updateBlacklist(blacklistAdditions, blacklistRemovals); return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom(), preemptionContainerIds); + application.pullNewlyAssignedNMTokens(),application.getHeadroom(), + preemptionContainerIds); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4242d02..2e3ad84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -98,6 +100,8 @@ private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); private RMContext rmContext; protected Map nodes = new ConcurrentHashMap(); @@ -245,7 +249,9 @@ public Resource getMaximumResourceCapability() { } private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0)); + @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -310,8 +316,8 @@ public Allocation allocate( application.updateBlacklist(blacklistAdditions, blacklistRemovals); return new Allocation( - application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + application.pullNewlyAllocatedContainers(), + application.pullNewlyAssignedNMTokens(), application.getHeadroom()); } } @@ -592,11 +598,24 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application ContainerId containerId = BuilderUtils.newContainerId(application .getApplicationAttemptId(), application.getNewContainerId()); Token containerToken = null; + NMToken nmToken = null; + try { + containerToken = + this.rmContext.getContainerTokenSecretManager().createContainerToken( + containerId, nodeId, application.getUser(), capability); + nmToken = + this.rmContext.getNMTokenSecretManager().createAndGetNMToken( + application.getUser(), application.getApplicationAttemptId(), + nodeId); + } catch (IllegalArgumentException e) { + if (e.getCause() instanceof UnknownHostException) { + LOG.warn("error creating token ", e); + return i; + } else { + throw e; + } + } - containerToken = - this.rmContext.getContainerTokenSecretManager() - .createContainerToken(containerId, nodeId, application.getUser(), - capability); if (containerToken == null) { return i; // Try again later. } @@ -610,8 +629,9 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application // Inform the application RMContainer rmContainer = - application.allocate(type, node, priority, request, container); - + application.allocate(type, node, priority, request, container, + nmToken); + // Inform the node node.allocateContainer(application.getApplicationId(), rmContainer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf..0d2dbd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; -import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -31,7 +29,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; @@ -178,29 +175,22 @@ public void run() { } } - public List createAndGetNMTokens(String applicationSubmitter, - ApplicationAttemptId appAttemptId, List containers) { + public NMToken createAndGetNMToken(String applicationSubmitter, + ApplicationAttemptId appAttemptId, NodeId nodeId) { try { this.readLock.lock(); - List nmTokens = new ArrayList(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); - if (nodeSet != null) { - for (Container container : containers) { - if (!nodeSet.contains(container.getNodeId())) { - LOG.debug("Sending NMToken for nodeId : " - + container.getNodeId().toString() - + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); - NMToken nmToken = - NMToken.newInstance(container.getNodeId(), token); - nmTokens.add(nmToken); - // This will update the nmToken set. - nodeSet.add(container.getNodeId()); - } - } + if (nodeSet != null && !nodeSet.contains(nodeId)) { + LOG.info("Sending NMToken for nodeId : " + nodeId.toString() + + " for application attempt : " + appAttemptId.toString()); + Token token = createNMToken(appAttemptId, nodeId, applicationSubmitter); + NMToken nmToken = NMToken.newInstance(nodeId, token); + // This will update the nmToken set. + nodeSet.add(nodeId); + return nmToken; + } else { + return null; } - return nmTokens; } finally { this.readLock.unlock(); } @@ -270,4 +260,21 @@ public void removeNodeKey(NodeId nodeId) { this.writeLock.unlock(); } } + + /** + * This will be called by RM when it wants to remove NMToken key for + * specific node. + */ + public void removeNodeKeyForAppAttempt(ApplicationAttemptId appAttemptId, + NodeId nodeId) { + try { + this.writeLock.lock(); + HashSet nodeSet = appAttemptToNodeKeyMap.get(appAttemptId); + if (nodeSet != null) { + nodeSet.remove(nodeId); + } + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index c62f2ee..7e32e83 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; import java.util.Timer; import java.util.TimerTask; @@ -180,8 +181,8 @@ public void run() { System.currentTimeMillis() + containerTokenExpiryInterval; // Lock so that we use the same MasterKey's keyId and its bytes - this.readLock.lock(); try { + this.readLock.lock(); tokenIdentifier = new ContainerTokenIdentifier(containerId, nodeId.toString(), appSubmitter, capability, expiryTimeStamp, this.currentMasterKey @@ -194,4 +195,33 @@ public void run() { return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier); } + + /** + * Helper function for updating containerToken expiry interval. + * + * @throws IOException + */ + public Token updateExpirationInterval(Token containerToken) + throws IOException { + byte[] password; + long expiryTimeStamp = + System.currentTimeMillis() + containerTokenExpiryInterval; + ContainerTokenIdentifier nId = null; + ContainerTokenIdentifier oId = + BuilderUtils.newContainerTokenIdentifier(containerToken); + try { + this.readLock.lock(); + nId = + new ContainerTokenIdentifier(oId.getContainerID(), + oId.getNmHostAddress(), oId.getApplicationSubmitter(), + oId.getResource(), expiryTimeStamp, this.currentMasterKey + .getMasterKey().getKeyId(), + ResourceManager.getClusterTimeStamp()); + password = this.createPassword(nId); + } finally { + this.readLock.unlock(); + } + return BuilderUtils.newContainerToken(nId, password, + containerToken.getService()); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 0908b2f..790f6a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -305,7 +305,9 @@ protected void allocateContainersAndValidateNMTokens(MockAM am, am.allocate(resourceRequest, releaseContainerList); containersReceived.addAll(response.getAllocatedContainers()); if (!response.getNMTokens().isEmpty()) { + LOG.info("received nmtoken for : "); for (NMToken nmToken : response.getNMTokens()) { + LOG.info("nmtoken for nodeId : " + nmToken.getNodeId()); String nodeId = nmToken.getNodeId().toString(); if (nmTokens.containsKey(nodeId)) { Assert.fail("Duplicate NMToken received for : " + nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index b5f4992..4fba0ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -47,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -890,10 +893,18 @@ private void moveCurrentAttemptToLaunchedState(RMAppAttempt attempt) { BuilderUtils.newContainerId(attempt.getAppAttemptId(), 1)); when(container.getResource()).thenReturn(resource); Allocation allocation = mock(Allocation.class); + List nmTokens = new ArrayList(); + NMTokenSecretManagerInRM nmTokenSecretManager = + attempt.getRMContext().getNMTokenSecretManager(); + nmTokenSecretManager.rollMasterKey(); + NodeId nodeId = NodeId.newInstance("127.0.0.1", 12345); + nmTokens.add(nmTokenSecretManager.createAndGetNMToken("test", + attempt.getAppAttemptId(), nodeId)); when(allocation.getContainers()).thenReturn( Collections.singletonList(container)); when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); + when(allocation.getNMTokens()).thenReturn(nmTokens); when( scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), any(List.class), any(List.class), any(List.class))).thenReturn( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b9fc15f..a6352bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -53,7 +53,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; @@ -174,6 +177,7 @@ public void handle(AMLauncherEvent event) { } private static int appId = 1; + private String user; private ApplicationSubmissionContext submissionContext = null; private boolean unmanagedAM; @@ -204,12 +208,15 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); + NMTokenSecretManagerInRM nmTokenSecretManagerInRM = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManagerInRM.rollMasterKey(); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, amRMTokenManager, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), + nmTokenSecretManagerInRM, clientToAMTokenManager); store = mock(RMStateStore.class); @@ -239,7 +246,7 @@ public void setUp() throws Exception { ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 0); - final String user = MockApps.newUserName(); + user = MockApps.newUserName(); final String queue = MockApps.newQueue(); submissionContext = mock(ApplicationSubmissionContext.class); when(submissionContext.getQueue()).thenReturn(queue); @@ -567,6 +574,15 @@ private Container allocateApplicationAttempt() { Allocation allocation = mock(Allocation.class); when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); + NodeId nodeId = NodeId.newInstance("127.0.0.1", 12345); + Token token = + rmContext.getNMTokenSecretManager().createNMToken( + applicationAttempt.getAppAttemptId(), nodeId, user); + NMToken nmToken = + NMToken.newInstance(nodeId, token); + List nmTokens = Arrays.asList(nmToken); + when(allocation.getNMTokens()).thenReturn(nmTokens); + when( scheduler.allocate( any(ApplicationAttemptId.class), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 4f4bf2f..5992186 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -41,11 +41,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -64,11 +66,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -128,6 +132,10 @@ public void setUp() throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + when(csContext.getNMTokenSecretManager()).thenReturn(nmTokenSecretManager); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -1275,7 +1283,8 @@ public void testLocalityScheduling() throws Exception { // Start with off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1283,7 +1292,8 @@ public void testLocalityScheduling() throws Exception { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1291,7 +1301,8 @@ public void testLocalityScheduling() throws Exception { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1300,7 +1311,8 @@ public void testLocalityScheduling() throws Exception { // since missedOpportunities=3 and reqdContainers=3 assignment = a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.OFF_SWITCH, assignment.getType()); @@ -1308,7 +1320,8 @@ public void testLocalityScheduling() throws Exception { // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); @@ -1316,7 +1329,8 @@ public void testLocalityScheduling() throws Exception { // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); @@ -1350,7 +1364,8 @@ public void testLocalityScheduling() throws Exception { // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.RACK_LOCAL, assignment.getType()); @@ -1431,11 +1446,13 @@ public void testApplicationPriorityScheduling() throws Exception { // thus, no P2 either! a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); @@ -1443,44 +1460,52 @@ public void testApplicationPriorityScheduling() throws Exception { // thus, no P2 either! a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); assertEquals(1, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 a.assignContainers(clusterResource, node_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_2)); assertEquals(0, app_0.getTotalRequiredResources(priority_2)); @@ -1552,7 +1577,8 @@ public void testSchedulingConstraints() throws Exception { // NODE_LOCAL - node_0_1 a.assignContainers(clusterResource, node_0_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1560,7 +1586,8 @@ public void testSchedulingConstraints() throws Exception { // required(ANY) == 0 a.assignContainers(clusterResource, node_1_0); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero // since #req=0 assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1576,14 +1603,16 @@ public void testSchedulingConstraints() throws Exception { // required(rack_1) == 0 a.assignContainers(clusterResource, node_0_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 a.assignContainers(clusterResource, node_1_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1827,7 +1856,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false a.assignContainers(clusterResource, node_0_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // resourceName: @@ -1849,7 +1879,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since RR(rack_1) = relax: false a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Allow rack-locality for rack_1, but blacklist node_1_1 @@ -1879,7 +1910,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since node_1_1 is blacklisted a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now, remove node_1_1 from blacklist, but add rack_1 to blacklist @@ -1907,7 +1939,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since rack_1 is blacklisted a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now remove rack_1 from blacklist @@ -1933,7 +1966,8 @@ public void testLocalityConstraints() throws Exception { // Now, should allocate since RR(rack_1) = relax: true a.assignContainers(clusterResource, node_1_1); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1963,7 +1997,8 @@ public void testLocalityConstraints() throws Exception { a.assignContainers(clusterResource, node_1_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1999,6 +2034,60 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() assertEquals(400, a.getMaximumActiveApplications()); } + @Test + public void testUnavailableHost() throws Exception { + try { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplication(app_0, user_0, B); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplication(app_1, user_0, B); // same user + + // Setup some nodes + String host_0 = "UnavailableHostName"; + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (8 * GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 3, true, + priority, recordFactory))); + + // Start testing... + try { + a.assignContainers(clusterResource, node_0); + } catch (Exception e) { + Assert.fail("Exception in assigning an unavailable node: " + + e.getMessage()); + } + } finally { + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } + } + private CapacitySchedulerContext mockCSContext( CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index cc2e1cc..97906bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -2562,4 +2563,32 @@ public void testBlacklistNodes() throws Exception { assertEquals("Incorrect number of containers allocated", 1, app .getLiveContainers().size()); } + + @Test(timeout = 30000) + public void testAssignUnavailableHost() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + final String user = "user1"; + final String fifoQueue = "fifo"; + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 2, + "UnavailableHost"); + + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + createSchedulingRequest(1024, fifoQueue, user, 4); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + try { + scheduler.handle(updateEvent2); + } catch (Exception e) { + Assert.fail("Exception in updating an unavailable resource: " + + e.getMessage()); + } + // roll it back for other tests + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 2384590..f08534c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -553,6 +554,65 @@ public void testBlackListNodes() throws Exception { rm.stop(); } + @Test(timeout = 30000) + public void testUnavailableNodeAssignment() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + AsyncDispatcher dispatcher = new InlineDispatcher(); + Configuration conf = new Configuration(); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(new Configuration()); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null); + + FifoScheduler scheduler = new FifoScheduler(); + scheduler.reinitialize(new Configuration(), rmContext); + + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(1024 * 64), 1, "UnavailableNodeName"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId, + "queue1", "user1"); + scheduler.handle(appEvent1); + + int memory = 64; + int nConts = 3; + int priority = 20; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, nConts); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, nConts); + ResourceRequest any = + createResourceRequest(memory, ResourceRequest.ANY, priority, + +nConts); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, + null); + + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + + try { + scheduler.handle(node0Update); + } catch (Exception e) { + Assert.fail("Exception in updating an unavailable resource: " + + e.getMessage()); + } + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());