diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index 09dd3bf3cc6..63ffe3f37c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -66,10 +66,15 @@ public static void updateSchedulingResourceUsage(ResourceUsage ru) { protected void reorderSchedulableEntity(S schedulableEntity) { //remove, update comparable data, and reinsert to update position in order - schedulableEntities.remove(schedulableEntity); + boolean exists = schedulableEntities.remove(schedulableEntity); updateSchedulingResourceUsage( schedulableEntity.getSchedulingResourceUsage()); - schedulableEntities.add(schedulableEntity); + if (exists) { + schedulableEntities.add(schedulableEntity); + } else { + LOG.info("Skip reordering non-existent schedulable entity: " + + schedulableEntity.getId()); + } } protected void reorderScheduleEntities() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index c090866ca4f..c7d010a074c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1156,6 +1156,8 @@ public void testAllocateReorder() throws Exception { Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); + conf.setFloat(CapacitySchedulerConfiguration. + MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f); MockRM rm = new MockRM(conf); rm.start(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index 683173af709..4faa1ce1e7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -20,11 +20,37 @@ import java.util.*; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.Assert; import org.junit.Test; @@ -34,6 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestFairOrderingPolicy { final static int GB = 1024; @@ -187,6 +216,95 @@ public void testSizeBasedWeightNotAffectAppActivation() throws Exception { Assert.assertEquals(3, lq.getNumPendingApplications()); } + @Test + public void testSchedulableEntitiesLeak() { + // Init cluster: set fair ordering policy and enable recovery + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + String queuePath = CapacitySchedulerConfiguration.ROOT + ".default"; + csConf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getCanonicalName()); + csConf.setOrderingPolicy(queuePath, + CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY); + csConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + csConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class, + RMStateStore.class); + MockRM rm = new MockRM(csConf); + rm.start(); + + // Mock app & app attempt + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(container.getId()) + .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); + when(attempt.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(submissionContext.getKeepContainersAcrossApplicationAttempts()) + .thenReturn(true); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + when(app.getApplicationSubmissionContext()).thenReturn(submissionContext); + when(app.getAppAttempts()) + .thenReturn(ImmutableMap.of(appAttemptId, attempt)); + rm.getRMContext().getRMApps().put(appId, app); + + // Add app and app attempt + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(appId, "default", "user"); + cs.handle(appAddedEvent); + AppAttemptAddedSchedulerEvent attemptAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(attemptAddedEvent); + + // Remove app attempt + AppAttemptRemovedSchedulerEvent attemptRemovedEvent = + new AppAttemptRemovedSchedulerEvent(appAttemptId, + RMAppAttemptState.FINISHED, true); + cs.handle(attemptRemovedEvent); + + // Add node and recover 1 container for app + RMNode rmNode = + MockNodes.newNodeInfo(0, MockNodes.newResource(10240), 1, "h2:1234"); + NMContainerStatus status = TestRMRestart + .createNMContainerStatus(appAttemptId, 1, ContainerState.RUNNING); + cs.handle(new NodeAddedSchedulerEvent(rmNode, Arrays.asList(status))); + rm.drainEvents(); + + // Complete container will kick fair ordering policy + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(appAttemptId); + ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 1); + RMContainer rmContainer = + schedulerApp.getLiveContainersMap().get(newContainerId); + cs.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); + + // Remove app + AppRemovedSchedulerEvent appRemovedEvent = + new AppRemovedSchedulerEvent(appId, RMAppState.KILLED); + cs.handle(appRemovedEvent); + + // Reorder apps in fair ordering policy + LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); + leafQueue.getOrderingPolicy().getAssignmentIterator(); + + // Check there is no scheduling entities left, + // non-zero represents finished app was brought back to fair ordering policy + Assert.assertEquals(0, + leafQueue.getOrderingPolicy().getNumSchedulableEntities()); + } + public void checkIds(Iterator si, String[] ids) { for (int i = 0;i < ids.length;i++) {