diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 6ccae98..a65f776 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -36,8 +36,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -933,4 +934,10 @@ public Resource getClusterResource() { return new HashMap>(); } + + @Override + protected void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + // do nothing + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c..04c955c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -20,12 +20,17 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -33,21 +38,30 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler @@ -66,6 +80,15 @@ protected RMContext rmContext; protected Map> applications; + protected int nmExpireInterval; + + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered container reported + // by NM should not be recovered. + private Set pendingRelease = null; + private final Object mutex = new Object(); protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -275,6 +298,19 @@ public synchronized void recoverContainersOnNode( ((RMContainerImpl)rmContainer).setAMContainer(true); } } + + synchronized (mutex) { + if (pendingRelease != null + && pendingRelease.contains(container.getContainerId())) { + // release the container + rmContainer.handle(new RMContainerFinishedEvent(container + .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); + pendingRelease.remove(container.getContainerId()); + LOG.info(container.getContainerId() + " is released by application."); + } + } } } @@ -314,6 +350,64 @@ protected void recoverResourceRequestForContainer(RMContainer rmContainer) { } } + protected void createReleaseCache() { + this.pendingRelease = new HashSet(); + // Cleanup the cache after nm expire interval. + new Timer().schedule(new TimerTask() { + @Override + public void run() { + synchronized (mutex) { + for (ContainerId containerId: pendingRelease) { + T attempt = getCurrentAttemptForContainer(containerId); + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + pendingRelease = null; + } + } + }, nmExpireInterval); + } + + // clean up a completed container + protected abstract void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event); + + protected void releaseContainers(List containers, + SchedulerApplicationAttempt attempt) { + for (ContainerId containerId : containers) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + < nmExpireInterval) { + LOG.info(containerId + " doesn't exist. Add the container" + + " to the release request cache as it maybe on recovery."); + synchronized (mutex) { + if (pendingRelease != null) { + pendingRelease.add(containerId); + } + } + } else { + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + } + } + + @VisibleForTesting + public Set getPendingRelease() { + return this.pendingRelease; + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2681238..eb9fa1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -54,8 +52,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -93,6 +89,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -255,6 +252,9 @@ private synchronized void initScheduler(Configuration configuration) throws IOException { this.conf = loadCapacitySchedulerConfiguration(configuration); validateConf(this.conf); + super.nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); this.minimumAllocation = this.conf.getMinimumAllocation(); this.maximumAllocation = this.conf.getMaximumAllocation(); this.calculator = this.conf.getResourceCalculator(); @@ -262,9 +262,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); - + createReleaseCache(); initializeQueues(this.conf); - scheduleAsynchronously = this.conf.getScheduleAynschronously(); asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, @@ -689,21 +688,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, getMinimumResourceCapability(), maximumAllocation); // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -995,7 +980,8 @@ private synchronized void removeNode(RMNode nodeInfo) { } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4e1c244..01f0c77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -797,7 +795,8 @@ private synchronized void removeApplicationAttempt( /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -901,21 +900,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { if (!ask.isEmpty()) { @@ -1214,6 +1199,9 @@ private synchronized void initScheduler(Configuration conf) throws IOException { this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); + super.nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); minimumAllocation = this.conf.getMinimumAllocation(); maximumAllocation = this.conf.getMaximumAllocation(); incrAllocation = this.conf.getIncrementAllocation(); @@ -1247,6 +1235,7 @@ private synchronized void initScheduler(Configuration conf) // This stores per-application scheduling information this.applications = new ConcurrentHashMap>(); + createReleaseCache(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 518a8d9..0a40e64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; - import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -197,9 +194,13 @@ public FifoScheduler() { private synchronized void initScheduler(Configuration conf) { validateConf(conf); + super.nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); //Use ConcurrentSkipListMap because applications need to be ordered this.applications = new ConcurrentSkipListMap>(); + createReleaseCache(); this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -295,21 +296,7 @@ public Allocation allocate( clusterResource, minimumAllocation, maximumAllocation); // Release containers - for (ContainerId releasedContainer : release) { - RMContainer rmContainer = getRMContainer(releasedContainer); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainer); - } - containerCompleted(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainer, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -443,7 +430,7 @@ private synchronized void doneApplicationAttempt( LOG.info("Skip killing " + container.getContainerId()); continue; } - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -717,7 +704,7 @@ private synchronized void nodeUpdate(RMNode rmNode) { for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - containerCompleted(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -818,7 +805,7 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - containerCompleted(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), @@ -831,7 +818,8 @@ public void handle(SchedulerEvent event) { } @Lock(FifoScheduler.class) - private synchronized void containerCompleted(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -881,7 +869,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index f1a3bbc..91e1905 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -49,7 +49,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List requests = new ArrayList(); @@ -61,8 +61,10 @@ public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } - - void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { + + public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, + RMContext context) { + this.context = context; this.amRMProtocol = amRMProtocol; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 671851c..b0ffc85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -171,7 +171,6 @@ public void testProgressFilter() throws Exception{ RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.setAMRMProtocol(rm.getApplicationMasterService()); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List release = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index dc3e9f1..79368d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -289,7 +289,7 @@ public void testRMRestart() throws Exception { // verify old AM is not accepted // change running AM to talk to new RM - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); @@ -1671,7 +1671,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService()); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 24a2f43..97d7d79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -33,10 +33,13 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -71,6 +74,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import com.google.common.base.Supplier; + + @SuppressWarnings({"rawtypes", "unchecked"}) @RunWith(value = Parameterized.class) public class TestWorkPreservingRMRestart { @@ -567,8 +573,8 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -641,6 +647,68 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); } + // Test if RM on recovery receives the container release request from AM + // before it receives the container status reported by NM for recovery. this + // container should not be recovered. + @Test (timeout = 30000) + public void testReleasedContainerNotRecovered() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + rm1.start(); + + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000); + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + + // try to release a container before the container is actually recovered. + final ContainerId runningContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + am1.allocate(null, Arrays.asList(runningContainer)); + + // send container statuses to recover the containers + List containerStatuses = + createNMContainerStatusForApp(am1); + nm1.registerNode(containerStatuses, null); + + // only the am container should be recovered. + waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId()); + + final AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // cached release request is cleaned. + // assertFalse(scheduler.getPendingRelease().contains(runningContainer)); + + AllocateResponse response = am1.allocate(null, null); + // AM gets notified of the completed container. + boolean receivedCompletedContainer = false; + for (ContainerStatus status : response.getCompletedContainersStatuses()) { + if (status.getContainerId().equals(runningContainer)) { + receivedCompletedContainer = true; + } + } + assertTrue(receivedCompletedContainer); + + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + // release cache is cleaned up and previous running container is not + // recovered + return scheduler.getPendingRelease() == null + && scheduler.getRMContainer(runningContainer) == null; + } + }, 1000, 20000); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, @@ -656,7 +724,7 @@ private void asserteMetrics(QueueMetrics qm, int appsSubmitted, assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); } - private void waitForNumContainersToRecover(int num, MockRM rm, + public static void waitForNumContainersToRecover(int num, MockRM rm, ApplicationAttemptId attemptId) throws Exception { AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); @@ -669,7 +737,9 @@ private void waitForNumContainersToRecover(int num, MockRM rm, attempt = scheduler.getApplicationAttempt(attemptId); } while (attempt.getLiveContainers().size() < num) { - System.out.println("Wait for " + num + " containers to recover."); + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); Thread.sleep(200); } }