diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index c182531..cd2beb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -80,6 +80,10 @@ protected void setExpireInterval(int expireInterval) { this.expireInterval = expireInterval; } + public int getExpireInterval() { + return this.expireInterval; + } + protected void setMonitorInterval(int monitorInterval) { this.monitorInterval = monitorInterval; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 01d5064..41888d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -60,6 +60,8 @@ AMLivelinessMonitor getAMLivelinessMonitor(); + NMLivelinessMonitor getNMLivelinessMonitor(); + AMLivelinessMonitor getAMFinishingMonitor(); ContainerAllocationExpirer getContainerAllocationExpirer(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index f72ef30..8da5839 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -66,6 +66,8 @@ private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private NMLivelinessMonitor nmLivelinessMonitor; + private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; private DelegationTokenRenewer delegationTokenRenewer; @@ -165,6 +167,11 @@ public AMLivelinessMonitor getAMLivelinessMonitor() { } @Override + public NMLivelinessMonitor getNMLivelinessMonitor() { + return this.nmLivelinessMonitor; + } + + @Override public AMLivelinessMonitor getAMFinishingMonitor() { return this.amFinishingMonitor; } @@ -276,6 +283,10 @@ void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { this.amFinishingMonitor = amFinishingMonitor; } + void setNMLiveLinessMonitor(NMLivelinessMonitor nmLivelinessMonitor) { + this.nmLivelinessMonitor = nmLivelinessMonitor; + } + void setContainerTokenSecretManager( RMContainerTokenSecretManager containerTokenSecretManager) { this.containerTokenSecretManager = containerTokenSecretManager; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 40e346c..6b69c29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -437,6 +437,7 @@ protected void serviceInit(Configuration configuration) throws Exception { nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); + rmContext.setNMLiveLinessMonitor(nmLivelinessMonitor); resourceTracker = createResourceTrackerService(); addService(resourceTracker); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c..466b5b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -245,6 +245,15 @@ public synchronized void recoverContainersOnNode( SchedulerApplicationAttempt schedulerAttempt = schedulerApp.getCurrentAppAttempt(); + if (schedulerAttempt.getPendingRelease().containsKey( + container.getContainerId())) { + LOG.info("Skip recovering " + container.getContainerId() + + ". Application requested to release this container."); + killOrphanContainerOnNode(nm, container); + schedulerAttempt.getPendingRelease().remove(container.getContainerId()); + continue; + } + // create container RMContainer rmContainer = recoverAndCreateContainer(container, nm); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 32dd23b..459dfd5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -53,6 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -87,6 +89,13 @@ protected List newlyAllocatedContainers = new ArrayList(); + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered container reported + // by NM should not be recovered. + protected Cache pendingRelease = null; + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -114,7 +123,10 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager, rmContext.getEpoch()); this.queue = queue; - + this.pendingRelease = + CacheBuilder.newBuilder().expireAfterWrite( + rmContext.getNMLivelinessMonitor().getExpireInterval(), + TimeUnit.MILLISECONDS).build(); if (rmContext.getRMApps() != null && rmContext.getRMApps() .containsKey(applicationAttemptId.getApplicationId())) { @@ -134,7 +146,11 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, public synchronized Collection getLiveContainers() { return new ArrayList(liveContainers.values()); } - + + public Map getPendingRelease() { + return pendingRelease.asMap(); + } + /** * Is this application pending? * @return true if it is else false. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2681238..a9227ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -30,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -93,6 +92,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; @LimitedPrivate("yarn") @Evolving @@ -262,7 +263,6 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); - initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -695,8 +695,12 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, RMAuditLogger.logFailure(application.getUser(), AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", + "Trying to release container not owned by app or with invalid id." + + " Add the container to the release cache as it maybe on recovery.", application.getApplicationId(), releasedContainerId); + LOG.info(releasedContainerId + " doesn't exist. " + + "Add the container to the release cache as it maybe on recovery."); + application.getPendingRelease().put(releasedContainerId, new Object()); } completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4e1c244..be9bf20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -907,8 +907,12 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, RMAuditLogger.logFailure(application.getUser(), AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", + "Trying to release container not owned by app or with invalid id. " + + "Add the container to the release cache as it maybe on recovery.", application.getApplicationId(), releasedContainerId); + LOG.info(releasedContainerId + " doesn't exist. " + + "Add the container to the release cache as it maybe on recovery."); + application.getPendingRelease().put(releasedContainerId, new Object()); } completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 518a8d9..d65abdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -301,8 +301,12 @@ public Allocation allocate( RMAuditLogger.logFailure(application.getUser(), AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", + "Trying to release container not owned by app or with invalid id. " + + "Add the container to the release cache as it maybe on recovery.", application.getApplicationId(), releasedContainer); + LOG.info(releasedContainer + " doesn't exist. " + + "Add the container to the release cache as it maybe on recovery."); + application.getPendingRelease().put(releasedContainer, new Object()); } containerCompleted(rmContainer, SchedulerUtils.createAbnormalContainerStatus( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index f1a3bbc..91e1905 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -49,7 +49,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List requests = new ArrayList(); @@ -61,8 +61,10 @@ public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } - - void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { + + public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, + RMContext context) { + this.context = context; this.amRMProtocol = amRMProtocol; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 24a2f43..0c68660 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -71,6 +71,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + @SuppressWarnings({"rawtypes", "unchecked"}) @RunWith(value = Parameterized.class) public class TestWorkPreservingRMRestart { @@ -567,8 +568,8 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -641,6 +642,54 @@ public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); } + // Test if RM on recovery receives the container release request from AM + // before it receives the container status reported by NM for recovery. this + // container should not be recovered. + @Test + public void testReleasedContainerNotRecovered() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + rm1.start(); + + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + + // try to release a container before the container is actually recovered. + ContainerId runningContainer = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + am1.allocate(null, Arrays.asList(runningContainer)); + + // send container statuses to recover the containers + List containerStatuses = + createNMContainerStatusForApp(am1); + nm1.registerNode(containerStatuses, null); + + // only the am container should be recovered. + waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // previous running container is not recovered because it was released. + assertNull(scheduler.getRMContainer(runningContainer)); + + // previous cached release request is removed also. + assertFalse(scheduler.getApplicationAttempt(am1.getApplicationAttemptId()) + .getPendingRelease().containsKey(runningContainer)); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, @@ -656,7 +705,7 @@ private void asserteMetrics(QueueMetrics qm, int appsSubmitted, assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); } - private void waitForNumContainersToRecover(int num, MockRM rm, + public static void waitForNumContainersToRecover(int num, MockRM rm, ApplicationAttemptId attemptId) throws Exception { AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); @@ -669,7 +718,9 @@ private void waitForNumContainersToRecover(int num, MockRM rm, attempt = scheduler.getApplicationAttempt(attemptId); } while (attempt.getLiveContainers().size() < num) { - System.out.println("Wait for " + num + " containers to recover."); + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); Thread.sleep(200); } }