diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index a41792d..2a58383 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -446,6 +447,7 @@ public AllocateResponse allocate(AllocateRequest request) } allocateResponse.setAllocatedContainers(allocation.getContainers()); + allocateResponse.setNMTokens(allocation.getNMTokens()); allocateResponse.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); @@ -455,14 +457,7 @@ public AllocateResponse allocate(AllocateRequest request) // add preemption to the allocateResponse message (if any) allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } - + // before returning response, verify in sync AllocateResponse oldResponse = responseMap.put(appAttemptId, allocateResponse); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a1c1a40..731ad6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -146,7 +148,7 @@ private void cleanup() throws IOException, YarnException { // Protected. For tests. protected ContainerManagementProtocol getContainerMgrProxy( - final ContainerId containerId) { + final ContainerId containerId) throws IOException{ final NodeId node = masterContainer.getNodeId(); final InetSocketAddress containerManagerBindAddress = @@ -157,14 +159,27 @@ protected ContainerManagementProtocol getContainerMgrProxy( UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(containerId .getApplicationAttemptId().toString()); + RMApp rmApp = + rmContext.getRMApps().get( + containerId.getApplicationAttemptId().getApplicationId()); - String user = - rmContext.getRMApps() - .get(containerId.getApplicationAttemptId().getApplicationId()) - .getUser(); - org.apache.hadoop.yarn.api.records.Token token = - rmContext.getNMTokenSecretManager().createNMToken( + String user = rmApp.getUser(); + org.apache.hadoop.yarn.api.records.Token token = null; + + if(eventType == AMLauncherEventType.LAUNCH) { + token = rmApp.getCurrentAppAttempt().getNMToken().getToken(); + } else { + try { + token = rmContext.getNMTokenSecretManager().createNMToken( containerId.getApplicationAttemptId(), node, user); + } catch (IllegalArgumentException e) { + if ( e.getCause() instanceof UnknownHostException) { + throw new IOException(e); + } else { + throw e; + } + } + } currentUser.addToken(ConverterUtils.convertFromYarn(token, containerManagerBindAddress)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index e9f064d..1019f51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -166,4 +168,14 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * Get Application Master node's nmToken + */ + NMToken getNMToken(); + + /** + * Get RMContext + */ + RMContext getRMContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 00397cf..ae78166 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -142,6 +143,7 @@ private String origTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private NMToken amNMToken = null; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -395,6 +397,11 @@ public ApplicationSubmissionContext getSubmissionContext() { } @Override + public RMContext getRMContext() { + return this.rmContext; + } + + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); try { @@ -503,6 +510,11 @@ public SecretKey getClientTokenMasterKey() { } @Override + public NMToken getNMToken() { + return amNMToken; + } + + @Override public Token getAMRMToken() { return this.amrmToken; } @@ -811,6 +823,8 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, null); + appAttempt.amNMToken = + amContainerAllocation.getNMTokens().get(0); // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( 0)); @@ -818,6 +832,12 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.getMasterContainer().getResource()); RMStateStore store = appAttempt.rmContext.getStateStore(); appAttempt.storeAttempt(store); + //We need to clear NMToken for this appattempt from NMTokenSecretManager + //cache. + appAttempt + .getRMContext().getNMTokenSecretManager() + .removeNodeKeyForAppAttempt(appAttempt.getAppAttemptId(), + appAttempt.getNMToken().getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d..b3df8c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,20 +33,24 @@ final Set strictContainers; final Set fungibleContainers; final List fungibleResources; + final List nmTokens; - public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); + public Allocation(List containers, List nmTokens, + Resource resourceLimit) { + this(containers, nmTokens, resourceLimit, null, null, null); } - public Allocation(List containers, Resource resourceLimit, - Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); + public Allocation(List containers, List nmTokens, + Resource resourceLimit, Set strictContainers) { + this(containers, nmTokens, resourceLimit, strictContainers, null, null); } - public Allocation(List containers, Resource resourceLimit, - Set strictContainers, Set fungibleContainers, + public Allocation(List containers, List nmTokens, + Resource resourceLimit, Set strictContainers, + Set fungibleContainers, List fungibleResources) { this.containers = containers; + this.nmTokens = nmTokens; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; @@ -58,6 +61,10 @@ public Allocation(List containers, Resource resourceLimit, return containers; } + public List getNMTokens() { + return nmTokens; + } + public Resource getResourceLimit() { return resourceLimit; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2efb9ad..bf78023 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -75,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -93,6 +95,8 @@ private final static List EMPTY_CONTAINER_LIST = new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); static final Comparator queueComparator = new Comparator() { @Override @@ -511,7 +515,8 @@ private synchronized void doneApplication( } private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0, 0)); @Override @Lock(Lock.NoLock.class) @@ -910,4 +915,9 @@ public void killContainer(RMContainer cont) { RMContainerEventType.KILL); } + @Override + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return this.rmContext.getNMTokenSecretManager(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 3dd4c6d..7cd2735 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -39,6 +40,8 @@ RMContainerTokenSecretManager getContainerTokenSecretManager(); + NMTokenSecretManagerInRM getNMTokenSecretManager(); + int getNumClusterNodes(); RMContext getRMContext(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8624ec0..b9660b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; @@ -107,6 +110,7 @@ private final float minimumAllocationFactor; private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; private Map users = new HashMap(); @@ -149,7 +153,8 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); this.containerTokenSecretManager = cs.getContainerTokenSecretManager(); - + this.nmTokenSecretManager = cs.getNMTokenSecretManager(); + float capacity = (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100; float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; @@ -1341,8 +1346,23 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod unreserve(application, priority, node, rmContainer); } - Token containerToken = - createContainerToken(application, container); + NMToken nmToken = null; + Token containerToken = null; + try { + containerToken = createContainerToken(application, container); + nmToken = + nmTokenSecretManager.createAndGetNMToken(application.getUser(), + application.getApplicationAttemptId(), node.getNodeID()); + } catch (IllegalArgumentException e) { + if (e.getCause() instanceof UnknownHostException) { + LOG.warn("error creating token", e); + return Resources.none(); + } else { + LOG.warn(e); + throw e; + } + } + if (containerToken == null) { // Something went wrong... return Resources.none(); @@ -1351,7 +1371,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod // Inform the application RMContainer allocatedContainer = - application.allocate(type, node, priority, request, container); + application.allocate(type, node, priority, request, container, nmToken); // Does the application need this resource? if (allocatedContainer == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a261dbf..7856f29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -93,6 +94,10 @@ new HashMap(); private List newlyAllocatedContainers = new ArrayList(); + private List newlyAllocatedNMTokens = + new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); final Map> reservedContainers = new HashMap>(); @@ -262,7 +267,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container container, NMToken nmToken) { if (isStopped) { return null; @@ -282,6 +287,9 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); + if (nmToken != null) { + newlyAllocatedNMTokens.add(nmToken); + } liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations @@ -317,6 +325,14 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, return returnContainerList; } + synchronized public List pullNewlyAssignedNMTokens() { + List nmTokens = + new ArrayList(this.newlyAllocatedNMTokens.size()); + nmTokens.addAll(newlyAllocatedNMTokens); + newlyAllocatedNMTokens.clear(); + return nmTokens; + } + public Resource getCurrentConsumption() { return this.currentConsumption; } @@ -571,9 +587,15 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + List newAllocatedContainers = pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = pullNewlyAssignedNMTokens(); + } + + return new Allocation(newAllocatedContainers, + newlyAllocatedNMTokens, getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 14ec99c..773498d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collection; @@ -27,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -151,47 +153,20 @@ public void setRunnable(boolean runnable) { } /** - * Create and return a container object reflecting an allocation for the - * given appliction on the given node with the given capability and - * priority. - */ - public Container createContainer( - FSSchedulerApp application, FSSchedulerNode node, - Resource capability, Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); - org.apache.hadoop.yarn.api.records.Token containerToken = - containerTokenSecretManager.createContainerToken(containerId, nodeId, - application.getUser(), capability); - if (containerToken == null) { - return null; // Try again later. - } - - // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, containerToken); - - return container; - } - - /** * Reserve a spot for {@code container} on this {@code node}. If * the container is {@code alreadyReserved} on the node, simply * update relevant bookeeping. This dispatches ro relevant handlers * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. */ private void reserve(Priority priority, FSSchedulerNode node, - Container container, boolean alreadyReserved) { + Container container, boolean alreadyReserved, NMToken nmToken) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + app.getApplicationId()); if (!alreadyReserved) { getMetrics().reserveResource(app.getUser(), container.getResource()); RMContainer rmContainer = app.reserve(node, priority, null, container); - node.reserveResource(app, priority, rmContainer); + node.reserveResource(app, priority, rmContainer, nmToken); getMetrics().reserveResource(app.getUser(), container.getResource()); scheduler.getRootQueueMetrics().reserveResource(app.getUser(), @@ -201,7 +176,7 @@ private void reserve(Priority priority, FSSchedulerNode node, else { RMContainer rmContainer = node.getReservedContainer(); app.reserve(node, priority, rmContainer, container); - node.reserveResource(app, priority, rmContainer); + node.reserveResource(app, priority, rmContainer, nmToken); } } @@ -236,17 +211,42 @@ private Resource assignContainer(FSSchedulerNode node, Resource available = node.getAvailableResource(); Container container = null; + NMToken nmToken = null; if (reserved) { container = node.getReservedContainer().getContainer(); + nmToken = node.getReservedContainerNMToken(); } else { - container = createContainer(app, node, capability, priority); + try { + ContainerId containerId = + BuilderUtils.newContainerId(app.getApplicationAttemptId(), + app.getNewContainerId()); + org.apache.hadoop.yarn.api.records.Token containerToken = + containerTokenSecretManager.createContainerToken(containerId, + node.getNodeID(), app.getUser(), capability); + nmToken = + scheduler.getNMTokenSecretManager().createAndGetNMToken( + app.getUser(), app.getApplicationAttemptId(), node.getNodeID()); + // Create the container + container = + BuilderUtils.newContainer(containerId, node.getNodeID(), node + .getRMNode().getHttpAddress(), capability, priority, + containerToken); + } catch (IllegalArgumentException e) { + if (e.getCause() instanceof UnknownHostException) { + LOG.warn("error creating tokens", e); + return Resources.none(); + } else { + LOG.warn(e); + throw e; + } + } } // Can we allocate a container on this node? if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - app.allocate(type, node, priority, request, container); + app.allocate(type, node, priority, request, container, nmToken); if (allocatedContainer == null) { // Did the application need this resource? if (reserved) { @@ -267,7 +267,7 @@ private Resource assignContainer(FSSchedulerNode node, return container.getResource(); } else { // The desired container won't fit here, so reserve - reserve(priority, node, container, reserved); + reserve(priority, node, container, reserved, nmToken); return FairScheduler.CONTAINER_RESERVED; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 670e961..236b5c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -84,6 +86,8 @@ = new HashMap(); private List newlyAllocatedContainers = new ArrayList(); + private List newlyAllocatedNMTokens = + new ArrayList(); final Map> reservedContainers = new HashMap>(); @@ -253,6 +257,15 @@ synchronized public void containerCompleted(RMContainer rmContainer, return returnContainerList; } + synchronized public List pullNewlyAssignedNMTokens() { + List nmTokens = + new ArrayList(this.newlyAllocatedNMTokens.size()); + nmTokens.addAll(newlyAllocatedNMTokens); + newlyAllocatedNMTokens.clear(); + return nmTokens; + } + + public Resource getCurrentConsumption() { return this.currentConsumption; } @@ -516,7 +529,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) { synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container container, NMToken nmToken) { // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { @@ -545,6 +558,9 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); + if (nmToken != null) { + newlyAllocatedNMTokens.add(nmToken); + } liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index d84547a..7bde21a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -57,6 +58,7 @@ private volatile int numContainers; private RMContainer reservedContainer; + private NMToken reservedContainerNMToken; private AppSchedulable reservedAppSchedulable; /* set of containers that are allocated containers */ @@ -210,7 +212,7 @@ public int getNumContainers() { public synchronized void reserveResource( FSSchedulerApp application, Priority priority, - RMContainer reservedContainer) { + RMContainer reservedContainer, NMToken nmToken) { // Check if it's already reserved if (this.reservedContainer != null) { // Sanity check @@ -241,6 +243,7 @@ public synchronized void reserveResource( " on node " + this + " for application " + application); } this.reservedContainer = reservedContainer; + this.reservedContainerNMToken = nmToken; this.reservedAppSchedulable = application.getAppSchedulable(); } @@ -259,6 +262,7 @@ public synchronized void unreserveResource( } this.reservedContainer = null; + this.reservedContainerNMToken = null; this.reservedAppSchedulable = null; } @@ -266,6 +270,10 @@ public synchronized RMContainer getReservedContainer() { return reservedContainer; } + public synchronized NMToken getReservedContainerNMToken() { + return reservedContainerNMToken; + } + public synchronized AppSchedulable getReservedAppSchedulable() { return reservedAppSchedulable; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7f31578..5db9532 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -140,9 +142,12 @@ private final static List EMPTY_CONTAINER_LIST = new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0)); // Aggregate metrics FSQueueMetrics rootMetrics; @@ -539,6 +544,10 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } + public NMTokenSecretManagerInRM getNMTokenSecretManager() { + return rmContext.getNMTokenSecretManager(); + } + // synchronized for sizeBasedWeight public synchronized ResourceWeights getAppWeight(AppSchedulable app) { if (!app.getRunnable()) { @@ -851,8 +860,13 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, for (RMContainer container : application.getPreemptionContainers()) { preemptionContainerIds.add(container.getContainerId()); } - - return new Allocation(application.pullNewlyAllocatedContainers(), + List newAllocatedContainers = + application.pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = application.pullNewlyAssignedNMTokens(); + } + return new Allocation(newAllocatedContainers, newlyAllocatedNMTokens, application.getHeadroom(), preemptionContainerIds); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 115d208..3cf106c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -104,6 +106,9 @@ private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); + private RMContext rmContext; private Map nodes = new ConcurrentHashMap(); @@ -245,7 +250,8 @@ public Resource getMaximumResourceCapability() { } private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + new Allocation(EMPTY_CONTAINER_LIST, EMPTY_NM_TOKEN_LIST, + Resources.createResource(0)); @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -306,9 +312,13 @@ public Allocation allocate( " applicationId=" + applicationAttemptId + " #ask=" + ask.size()); } - - return new Allocation( - application.pullNewlyAllocatedContainers(), + List newAllocatedContainers = + application.pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = application.pullNewlyAssignedNMTokens(); + } + return new Allocation(newAllocatedContainers, newlyAllocatedNMTokens, application.getHeadroom()); } } @@ -582,11 +592,25 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application ContainerId containerId = BuilderUtils.newContainerId(application .getApplicationAttemptId(), application.getNewContainerId()); Token containerToken = null; - - containerToken = - this.rmContext.getContainerTokenSecretManager() - .createContainerToken(containerId, nodeId, application.getUser(), - capability); + NMToken nmToken = null; + try { + containerToken = + this.rmContext.getContainerTokenSecretManager() + .createContainerToken(containerId, nodeId, application.getUser(), + capability); + nmToken = + this.rmContext.getNMTokenSecretManager().createAndGetNMToken( + application.getUser(), application.getApplicationAttemptId(), + nodeId); + }catch (IllegalArgumentException e) { + if ( e.getCause() instanceof UnknownHostException) { + // It must have caused because of a dns problem YARN-713 + LOG.warn(e); + return i; + } else { + throw e; + } + } if (containerToken == null) { return i; // Try again later. } @@ -600,7 +624,8 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application // Inform the application RMContainer rmContainer = - application.allocate(type, node, priority, request, container); + application.allocate(type, node, priority, request, container, + nmToken); // Inform the node node.allocateContainer(application.getApplicationId(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf..a3c4af0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -178,34 +178,32 @@ public void run() { } } - public List createAndGetNMTokens(String applicationSubmitter, - ApplicationAttemptId appAttemptId, List containers) { + public NMToken createAndGetNMToken(String applicationSubmitter, + ApplicationAttemptId appAttemptId, NodeId nodeId) { try { this.readLock.lock(); - List nmTokens = new ArrayList(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); - if (nodeSet != null) { - for (Container container : containers) { - if (!nodeSet.contains(container.getNodeId())) { - LOG.debug("Sending NMToken for nodeId : " - + container.getNodeId().toString() - + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); - NMToken nmToken = - NMToken.newInstance(container.getNodeId(), token); - nmTokens.add(nmToken); - // This will update the nmToken set. - nodeSet.add(container.getNodeId()); - } - } + if (nodeSet != null && !nodeSet.contains(nodeId)) { + LOG.debug("Sending NMToken for nodeId : " + + nodeId.toString() + + " for application attempt : " + appAttemptId.toString()); + Token token = createNMToken(appAttemptId, nodeId, + applicationSubmitter); + NMToken nmToken = + NMToken.newInstance(nodeId, token); + // This will update the nmToken set. + nodeSet.add(nodeId); + return nmToken; + } else { + return null; } - return nmTokens; } finally { this.readLock.unlock(); } } + + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { try { this.writeLock.lock(); @@ -270,4 +268,18 @@ public void removeNodeKey(NodeId nodeId) { this.writeLock.unlock(); } } + + /** + * This will be called by RM when it wants to remove NMToken key for + * specific node. + */ + public void removeNodeKeyForAppAttempt(ApplicationAttemptId appAttemptId, + NodeId nodeId) { + try { + this.writeLock.lock(); + appAttemptToNodeKeyMap.get(appAttemptId).remove(nodeId); + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 40b73dc..7d18571 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -39,13 +39,17 @@ import java.util.List; import java.util.Map; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -64,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -128,6 +133,10 @@ public void setUp() throws Exception { containerTokenSecretManager.rollMasterKey(); when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + when(csContext.getNMTokenSecretManager()).thenReturn(nmTokenSecretManager); root = CapacityScheduler.parseQueue(csContext, csConf, null, @@ -1268,7 +1277,8 @@ public void testLocalityScheduling() throws Exception { // Start with off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1276,7 +1286,8 @@ public void testLocalityScheduling() throws Exception { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1284,7 +1295,8 @@ public void testLocalityScheduling() throws Exception { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL @@ -1293,7 +1305,8 @@ public void testLocalityScheduling() throws Exception { // since missedOpportunities=3 and reqdContainers=3 assignment = a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.OFF_SWITCH, assignment.getType()); @@ -1301,7 +1314,8 @@ public void testLocalityScheduling() throws Exception { // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); @@ -1309,7 +1323,8 @@ public void testLocalityScheduling() throws Exception { // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); @@ -1343,7 +1358,8 @@ public void testLocalityScheduling() throws Exception { // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.RACK_LOCAL, assignment.getType()); @@ -1424,11 +1440,13 @@ public void testApplicationPriorityScheduling() throws Exception { // thus, no P2 either! a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); @@ -1436,44 +1454,52 @@ public void testApplicationPriorityScheduling() throws Exception { // thus, no P2 either! a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); assertEquals(2, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); assertEquals(1, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, DATA_LOCAL for P1 a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_2)); assertEquals(1, app_0.getTotalRequiredResources(priority_2)); // Now, OFF_SWITCH for P2 a.assignContainers(clusterResource, node_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), - eq(priority_1), any(ResourceRequest.class), any(Container.class)); + eq(priority_1), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); assertEquals(0, app_0.getTotalRequiredResources(priority_1)); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), - eq(priority_2), any(ResourceRequest.class), any(Container.class)); + eq(priority_2), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_2)); assertEquals(0, app_0.getTotalRequiredResources(priority_2)); @@ -1545,7 +1571,8 @@ public void testSchedulingConstraints() throws Exception { // NODE_LOCAL - node_0_1 a.assignContainers(clusterResource, node_0_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1553,7 +1580,8 @@ public void testSchedulingConstraints() throws Exception { // required(ANY) == 0 a.assignContainers(clusterResource, node_1_0); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero // since #req=0 assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1569,14 +1597,16 @@ public void testSchedulingConstraints() throws Exception { // required(rack_1) == 0 a.assignContainers(clusterResource, node_0_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(1, app_0.getTotalRequiredResources(priority)); // NODE_LOCAL - node_1 a.assignContainers(clusterResource, node_1_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1820,7 +1850,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false a.assignContainers(clusterResource, node_0_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // resourceName: @@ -1842,7 +1873,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since RR(rack_1) = relax: false a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Allow rack-locality for rack_1, but blacklist node_1_1 @@ -1872,7 +1904,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since node_1_1 is blacklisted a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now, remove node_1_1 from blacklist, but add rack_1 to blacklist @@ -1899,7 +1932,8 @@ public void testLocalityConstraints() throws Exception { // Shouldn't allocate since rack_1 is blacklisted a.assignContainers(clusterResource, node_1_1); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 // Now remove rack_1 from blacklist @@ -1925,7 +1959,8 @@ public void testLocalityConstraints() throws Exception { // Now, should allocate since RR(rack_1) = relax: true a.assignContainers(clusterResource, node_1_1); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1955,7 +1990,8 @@ public void testLocalityConstraints() throws Exception { a.assignContainers(clusterResource, node_1_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), - any(Priority.class), any(ResourceRequest.class), any(Container.class)); + any(Priority.class), any(ResourceRequest.class), any(Container.class), + any(NMToken.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); assertEquals(0, app_0.getTotalRequiredResources(priority)); @@ -1991,6 +2027,59 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() assertEquals(400, a.getMaximumActiveApplications()); } + @Test + public void testUnavailableHost() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplication(app_0, user_0, B); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplication(app_1, user_0, B); // same user + + // Setup some nodes + String host_0 = "UnavailableHostName"; + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (8 * GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1 * GB, 3, true, + priority, recordFactory)), null, null); + + // Start testing... + try { + a.assignContainers(clusterResource, node_0); + } catch (Exception e) { + Assert.fail("Exception in assigning an unavailable node: " + + e.getMessage()); + } + // roll it back for other tests + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } + + private CapacitySchedulerContext mockCSContext( CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 17ff7de..f7a1787 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -2196,4 +2197,30 @@ public void testHostPortNodeName() throws Exception { assertEquals(1, app.getLiveContainers().size()); } + @Test(timeout = 30000) + public void testAssignUnavailableHost() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + final String user = "user1"; + final String fifoQueue = "fifo"; + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 2, "UnavailableHost"); + + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + createSchedulingRequest(1024, fifoQueue, user, 4); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + try { + scheduler.handle(updateEvent2); + } catch (Exception e) { + Assert.fail("Exception in updating an unavailable resource: " + + e.getMessage()); + } + //roll it back for other tests + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index b71726a..5be1ccc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -414,7 +415,65 @@ public void testFifoScheduler() throws Exception { LOG.info("--- END: testFifoScheduler ---"); } - + + @Test(timeout=30000) + public void testUnavailableNodeAssignment() throws Exception { + SecurityUtilTestHelper.setTokenServiceUseIp(true); + AsyncDispatcher dispatcher = new InlineDispatcher(); + Configuration conf = new Configuration(); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(new Configuration()); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null); + + FifoScheduler scheduler = new FifoScheduler(); + scheduler.reinitialize(new Configuration(), rmContext); + + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(1024 * 64), 1, "UnavailableNodeName"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent1 = new AppAddedSchedulerEvent(appAttemptId, + "queue1", "user1"); + scheduler.handle(appEvent1); + + int memory = 64; + int nConts = 3; + int priority = 20; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, nConts); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, nConts); + ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, + nConts); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + + try { + scheduler.handle(node0Update); + } catch (Exception e) { + Assert.fail("Exception in updating an unavailable resource: " + + e.getMessage()); + } + SecurityUtilTestHelper.setTokenServiceUseIp(false); + } + + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());