diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 0c56134..6018c8f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -456,9 +456,9 @@ public AllocateResponse allocate(AllocateRequest request) blacklistAdditions, blacklistRemovals); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); + allocateResponse.setNMTokens(allocation.getNMTokens()); // update the response with the deltas of node status changes List updatedNodes = new ArrayList(); @@ -497,12 +497,6 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 4ca8c28..577ad1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; @@ -202,7 +203,8 @@ // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, - RMAppAttemptState.ALLOCATED_SAVING, + EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, @@ -768,8 +770,9 @@ public void transition(RMAppAttemptImpl appAttempt, private static final List EMPTY_CONTAINER_RELEASE_LIST = new ArrayList(); + private static final List EMPTY_CONTAINER_REQUEST_LIST = - new ArrayList(); + new ArrayList(); private static final class ScheduleTransition implements @@ -802,29 +805,41 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerAllocatedTransition - extends BaseTransition { + private static final class AMContainerAllocatedTransition + implements + MultipleArcTransition { @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { + public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { // Acquire the AM container from the scheduler. - Allocation amContainerAllocation = appAttempt.scheduler.allocate( - appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, - EMPTY_CONTAINER_RELEASE_LIST, null, null); + Allocation amContainerAllocation = + appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, + null); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, - // and is put in SchedulerApplication#newlyAllocatedContainers. Then, - // YarnScheduler#allocate will fetch it. - assert amContainerAllocation.getContainers().size() != 0; + // and is put in SchedulerApplication#newlyAllocatedContainers. + + // Note that YarnScheduler#allocate is not guaranteed to be able to + // fetch it since container may not be fetchable for some reason like + // DNS unavailable causing container token not generated. As such, we + // return to the previous state and keep retry until am container is + // fetched. + if (amContainerAllocation.getContainers().size() == 0) { + appAttempt.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( + appAttempt.applicationAttemptId, null)); + return RMAppAttemptState.SCHEDULED; + } // Set the masterContainer - appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( - 0)); + appAttempt.setMasterContainer(amContainerAllocation.getContainers() + .get(0)); appAttempt.getSubmissionContext().setResource( - appAttempt.getMasterContainer().getResource()); + appAttempt.getMasterContainer().getResource()); appAttempt.storeAttempt(); + return RMAppAttemptState.ALLOCATED_SAVING; } } - + private static final class AttemptStoredTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4208d1d..0f3af41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -31,11 +31,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; public abstract class AbstractYarnScheduler implements ResourceScheduler { protected RMContext rmContext; protected Map applications; + protected final static List EMPTY_CONTAINER_LIST = + new ArrayList(); + protected static final Allocation EMPTY_ALLOCATION = new Allocation( + EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d..3f2d8af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,24 +33,24 @@ final Set strictContainers; final Set fungibleContainers; final List fungibleResources; - - public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); - } + final List nmTokens; public Allocation(List containers, Resource resourceLimit, - Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); + Set strictContainers, Set fungibleContainers, + List fungibleResources) { + this(containers, resourceLimit,strictContainers, fungibleContainers, + fungibleResources, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, - List fungibleResources) { + List fungibleResources, List nmTokens) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; this.fungibleResources = fungibleResources; + this.nmTokens = nmTokens; } public List getContainers() { @@ -74,4 +73,8 @@ public Resource getResourceLimit() { return fungibleResources; } + public List getNMTokens() { + return nmTokens; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b1801dc..33c7aa0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,10 +34,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -339,21 +342,58 @@ public Resource getCurrentConsumption() { return currentConsumption; } - public synchronized List pullNewlyAllocatedContainers() { - List returnContainerList = new ArrayList( - newlyAllocatedContainers.size()); - for (RMContainer rmContainer : newlyAllocatedContainers) { - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); + public class ContainersAndNMTokensAllocation { + List containerList; + List nmTokenList; + + public ContainersAndNMTokensAllocation(List containerList, + List nmTokenList) { + this.containerList = containerList; + this.nmTokenList = nmTokenList; + } + + public List getContainerList() { + return containerList; + } + + public List getNMTokenList() { + return nmTokenList; + } + } + + public synchronized ContainersAndNMTokensAllocation + pullNewlyAllocatedContainersAndNMTokens() { + List returnContainerList = + new ArrayList(newlyAllocatedContainers.size()); + List nmTokens = new ArrayList(); + for (Iterator i = newlyAllocatedContainers.iterator(); i + .hasNext();) { + RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); - rmContainer.getContainer().setContainerToken( - rmContext.getContainerTokenSecretManager().createContainerToken( - rmContainer.getContainerId(), container.getNodeId(), getUser(), - container.getResource())); - returnContainerList.add(rmContainer.getContainer()); + try { + // create container token and NMToken altogether. + container.setContainerToken(rmContext.getContainerTokenSecretManager() + .createContainerToken(container.getId(), container.getNodeId(), + getUser(), container.getResource())); + NMToken nmToken = + rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), + getApplicationAttemptId(), container); + if (nmToken != null) { + nmTokens.add(nmToken); + } + } catch (IllegalArgumentException e) { + // DNS might be down, skip returning this container. + LOG.error( + "Error trying to assign container token to allocated container " + + container.getId(), e); + continue; + } + returnContainerList.add(container); + i.remove(); + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); } - newlyAllocatedContainers.clear(); - return returnContainerList; + return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } public synchronized void updateBlacklist( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index eb4f814..2fcc7ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -105,9 +105,6 @@ private CSQueue root; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - static final Comparator queueComparator = new Comparator() { @Override public int compare(CSQueue q1, CSQueue q2) { @@ -573,9 +570,6 @@ private synchronized void doneApplicationAttempt( } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0, 0)); - @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4be6b94..470cb10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -237,9 +237,11 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + ContainersAndNMTokensAllocation allocation = + pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr), + allocation.getNMTokenList()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e057e74..f0249c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -143,12 +144,6 @@ // How often fair shares are re-calculated (ms) protected long UPDATE_INTERVAL = 500; - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); - // Aggregate metrics FSQueueMetrics rootMetrics; @@ -922,9 +917,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom(), preemptionContainerIds); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), preemptionContainerIds, null, null, + allocation.getNMTokenList()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index a2e0134..61628f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -80,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -114,9 +114,6 @@ Configuration conf; - private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; - private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); - protected Map nodes = new ConcurrentHashMap(); private boolean initialized; @@ -264,8 +261,7 @@ public Resource getMaximumResourceCapability() { } } - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -328,10 +324,11 @@ public Allocation allocate( } application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation( - application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + ContainersAndNMTokensAllocation allocation = + application.pullNewlyAllocatedContainersAndNMTokens(); + return new Allocation(allocation.getContainerList(), + application.getHeadroom(), null, null, null, + allocation.getNMTokenList()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf..b98639c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; -import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -177,35 +175,34 @@ public void run() { activateNextMasterKey(); } } - - public List createAndGetNMTokens(String applicationSubmitter, - ApplicationAttemptId appAttemptId, List containers) { + + public NMToken createAndGetNMToken(String applicationSubmitter, + ApplicationAttemptId appAttemptId, Container container) { try { this.readLock.lock(); - List nmTokens = new ArrayList(); HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + NMToken nmToken = null; if (nodeSet != null) { - for (Container container : containers) { - if (!nodeSet.contains(container.getNodeId())) { - LOG.debug("Sending NMToken for nodeId : " - + container.getNodeId().toString() - + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); - NMToken nmToken = - NMToken.newInstance(container.getNodeId(), token); - nmTokens.add(nmToken); - // This will update the nmToken set. + if (!nodeSet.contains(container.getNodeId())) { + LOG.debug("Sending NMToken for nodeId : " + + container.getNodeId().toString() + + " for application attempt : " + appAttemptId.toString()); + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + nmToken = NMToken.newInstance(container.getNodeId(), token); + // do not update node set, if this is an am container. + if (container.getId().getId() != 1) { nodeSet.add(container.getNodeId()); } } } - return nmTokens; + return nmToken; } finally { this.readLock.unlock(); } } - + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { try { this.writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0e3bdeb..d40ed21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -149,4 +150,35 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception { Assert.assertNotNull(containers.get(0).getContainerToken()); rm1.stop(); } + + @Test + public void testContainerAllocationWhenDNSUnavailable() throws Exception{ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // acquire the container. + SecurityUtilTestHelper.setTokenServiceUseIp(true); + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // not able to fetch the container; + Assert.assertEquals(0, containers.size()); + + SecurityUtilTestHelper.setTokenServiceUseIp(false); + containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + // should be able to fetch the container; + Assert.assertEquals(1, containers.size()); + } } \ No newline at end of file