diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index c182531..cd2beb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -80,6 +80,10 @@ protected void setExpireInterval(int expireInterval) { this.expireInterval = expireInterval; } + public int getExpireInterval() { + return this.expireInterval; + } + protected void setMonitorInterval(int monitorInterval) { this.monitorInterval = monitorInterval; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 01d5064..41888d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -60,6 +60,8 @@ AMLivelinessMonitor getAMLivelinessMonitor(); + NMLivelinessMonitor getNMLivelinessMonitor(); + AMLivelinessMonitor getAMFinishingMonitor(); ContainerAllocationExpirer getContainerAllocationExpirer(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index f72ef30..2796ff7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -66,6 +66,8 @@ private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private NMLivelinessMonitor nmLivelinessMonitor; + private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; private DelegationTokenRenewer delegationTokenRenewer; @@ -165,6 +167,11 @@ public AMLivelinessMonitor getAMLivelinessMonitor() { } @Override + public NMLivelinessMonitor getNMLivelinessMonitor() { + return this.nmLivelinessMonitor; + } + + @Override public AMLivelinessMonitor getAMFinishingMonitor() { return this.amFinishingMonitor; } @@ -276,6 +283,11 @@ void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { this.amFinishingMonitor = amFinishingMonitor; } + @VisibleForTesting + public void setNMLiveLinessMonitor(NMLivelinessMonitor nmLivelinessMonitor) { + this.nmLivelinessMonitor = nmLivelinessMonitor; + } + void setContainerTokenSecretManager( RMContainerTokenSecretManager containerTokenSecretManager) { this.containerTokenSecretManager = containerTokenSecretManager; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 40e346c..6b69c29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -437,6 +437,7 @@ protected void serviceInit(Configuration configuration) throws Exception { nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); + rmContext.setNMLiveLinessMonitor(nmLivelinessMonitor); resourceTracker = createResourceTrackerService(); addService(resourceTracker); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c..103ea30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -20,8 +20,12 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -33,15 +37,21 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -67,6 +77,13 @@ protected RMContext rmContext; protected Map> applications; + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered container reported + // by NM should not be recovered. + private Set pendingRelease = null; + protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -252,6 +269,18 @@ public synchronized void recoverContainersOnNode( rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), container)); + if (pendingRelease.contains(container.getContainerId())) { + // release the container + rmContainer.handle(new RMContainerFinishedEvent(container + .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); + pendingRelease.remove(container.getContainerId()); + LOG.info(container.getContainerId() + + " is released by application on recovery."); + continue; + } + // recover scheduler node nodes.get(nm.getNodeID()).recoverContainer(rmContainer); @@ -314,6 +343,47 @@ protected void recoverResourceRequestForContainer(RMContainer rmContainer) { } } + protected void createReleaseCache() { + this.pendingRelease = new HashSet(); + // Cleanup the cache after nm expire interval. + new Timer().schedule(new TimerTask() { + @Override + public void run() { + pendingRelease = null; + } + }, rmContext.getNMLivelinessMonitor().getExpireInterval()); + } + + // clean up a completed container + protected abstract void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event); + + protected void releaseContainers(List containers, + SchedulerApplicationAttempt attempt) { + for (ContainerId containerId : containers) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() < rmContext + .getNMLivelinessMonitor().getExpireInterval()) { + LOG.info(containerId + " doesn't exist. Add the container" + + " to the release request cache as it maybe on recovery."); + pendingRelease.add(containerId); + + } else { + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "CapacityScheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + } + } + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2681238..f0f327f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -54,8 +52,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -93,6 +89,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -262,9 +259,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); - + createReleaseCache(); initializeQueues(this.conf); - scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, @@ -689,21 +685,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, getMinimumResourceCapability(), maximumAllocation); // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -995,7 +977,8 @@ private synchronized void removeNode(RMNode nodeInfo) { } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4e1c244..a77ba94 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -797,7 +795,8 @@ private synchronized void removeApplicationAttempt( /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -901,21 +900,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { if (!ask.isEmpty()) { @@ -1247,6 +1232,7 @@ private synchronized void initScheduler(Configuration conf) // This stores per-application scheduling information this.applications = new ConcurrentHashMap>(); + createReleaseCache(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 518a8d9..544bd7e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; - import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -200,6 +197,7 @@ private synchronized void initScheduler(Configuration conf) { //Use ConcurrentSkipListMap because applications need to be ordered this.applications = new ConcurrentSkipListMap>(); + createReleaseCache(); this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -295,21 +293,7 @@ public Allocation allocate( clusterResource, minimumAllocation, maximumAllocation); // Release containers - for (ContainerId releasedContainer : release) { - RMContainer rmContainer = getRMContainer(releasedContainer); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainer); - } - containerCompleted(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainer, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -443,7 +427,7 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + container.getContainerId()); continue; } - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -717,7 +701,7 @@ private synchronized void nodeUpdate(RMNode rmNode) { for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - containerCompleted(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -818,7 +802,7 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - containerCompleted(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), @@ -831,7 +815,8 @@ public void handle(SchedulerEvent event) { } @Lock(FifoScheduler.class) - private synchronized void containerCompleted(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -881,7 +866,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index f1a3bbc..91e1905 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -49,7 +49,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List requests = new ArrayList(); @@ -61,8 +61,10 @@ public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } - - void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { + + public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, + RMContext context) { + this.context = context; this.amRMProtocol = amRMProtocol; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 671851c..b0ffc85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -171,7 +171,6 @@ public void testProgressFilter() throws Exception{ RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.setAMRMProtocol(rm.getApplicationMasterService()); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List release = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index dc3e9f1..79368d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -289,7 +289,7 @@ public void testRMRestart() throws Exception { // verify old AM is not accepted // change running AM to talk to new RM - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); @@ -1671,7 +1671,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 24a2f43..a490bb0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -33,10 +33,12 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -71,6 +73,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + @SuppressWarnings({"rawtypes", "unchecked"}) @RunWith(value = Parameterized.class) public class TestWorkPreservingRMRestart { @@ -567,8 +570,8 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -641,6 +644,60 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); } + // Test if RM on recovery receives the container release request from AM + // before it receives the container status reported by NM for recovery. this + // container should not be recovered. + @Test + public void testReleasedContainerNotRecovered() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + rm1.start(); + + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + + // try to release a container before the container is actually recovered. + ContainerId runningContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + am1.allocate(null, Arrays.asList(runningContainer)); + + // send container statuses to recover the containers + List containerStatuses = + createNMContainerStatusForApp(am1); + nm1.registerNode(containerStatuses, null); + + // only the am container should be recovered. + waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // previous running container is not recovered because it was released. + assertNull(scheduler.getRMContainer(runningContainer)); + + AllocateResponse response = am1.allocate(null, null); + // AM gets notified of the completed container. + boolean receivedCompletedContainer = false; + for (ContainerStatus status : response.getCompletedContainersStatuses()) { + if (status.getContainerId().equals(runningContainer)) { + receivedCompletedContainer = true; + } + } + assertTrue(receivedCompletedContainer); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, @@ -656,7 +713,7 @@ private void asserteMetrics(QueueMetrics qm, int appsSubmitted, assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); } - private void waitForNumContainersToRecover(int num, MockRM rm, + public static void waitForNumContainersToRecover(int num, MockRM rm, ApplicationAttemptId attemptId) throws Exception { AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); @@ -669,7 +726,9 @@ private void waitForNumContainersToRecover(int num, MockRM rm, attempt = scheduler.getApplicationAttempt(attemptId); } while (attempt.getLiveContainers().size() < num) { - System.out.println("Wait for " + num + " containers to recover."); + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); Thread.sleep(200); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index e548661..a27a868 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -92,6 +93,7 @@ public EventHandler getEventHandler() { new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), writer); + rmContext.setNMLiveLinessMonitor(mock(NMLivelinessMonitor.class)); return rmContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index a0e2279..a621bff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.io.IOException; import java.lang.reflect.Method; @@ -43,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -52,10 +49,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -64,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -140,11 +136,7 @@ public void testFifoSchedulerCapacityWhenNoNMs() { @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null, writer); - + RMContext rmContext = TestUtils.getMockRMContext(); FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new Configuration(); scheduler.setRMContext(rmContext); @@ -176,7 +168,6 @@ public void testAppAttemptMetrics() throws Exception { @Test(timeout=2000) public void testNodeLocalAssignment() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); Configuration conf = new Configuration(); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -184,10 +175,7 @@ public void testNodeLocalAssignment() throws Exception { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); - + RMContext rmContext = TestUtils.getMockRMContext(); FifoScheduler scheduler = new FifoScheduler(); scheduler.setRMContext(rmContext); scheduler.init(conf); @@ -245,7 +233,6 @@ public void testNodeLocalAssignment() throws Exception { @Test(timeout=2000) public void testUpdateResourceOnNode() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); Configuration conf = new Configuration(); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -253,10 +240,7 @@ public void testUpdateResourceOnNode() throws Exception { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null, writer); - + RMContext rmContext = TestUtils.getMockRMContext(); FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") public Map getNodes(){