diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 631d1a0..2a741ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -73,9 +73,13 @@ public Thread newThread(Runnable r) { return t; } }); + schedulePreemptionChecker(); + super.serviceStart(); + } + + private void schedulePreemptionChecker() { handler = ses.scheduleAtFixedRate(new PreemptionChecker(), 0, monitorInterval, TimeUnit.MILLISECONDS); - super.serviceStart(); } @Override @@ -98,8 +102,13 @@ public void invokePolicy(){ @Override public void run() { try { - //invoke the preemption policy - invokePolicy(); + if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) { + handler.cancel(true); + monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + schedulePreemptionChecker(); + } else { + invokePolicy(); + } } catch (Throwable t) { // The preemption monitor does not alter structures nor do structures // persist across invocations. Therefore, log, skip, and retry. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c4c98e2..ace0273 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; @@ -108,6 +109,9 @@ private float minimumThresholdForIntraQueuePreemption; private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; + // Current configuration + private CapacitySchedulerConfiguration config; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -121,8 +125,7 @@ new HashMap<>(); private Map> partitionToUnderServedQueues = new HashMap>(); - private List - candidatesSelectionPolicies = new ArrayList<>(); + private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; @@ -160,7 +163,16 @@ public void init(Configuration config, RMContext context, } rmContext = context; scheduler = (CapacityScheduler) sched; + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + updateConfigIfNeeded(); + } + + private void updateConfigIfNeeded() { CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + if (csConfig == config) { + return; + } maxIgnoredOverCapacity = csConfig.getDouble( CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, @@ -187,7 +199,7 @@ public void init(Configuration config, RMContext context, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY); lazyPreempionEnabled = csConfig.getBoolean( - CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( @@ -209,8 +221,7 @@ public void init(Configuration config, RMContext context, CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) .toUpperCase()); - rc = scheduler.getResourceCalculator(); - nlm = scheduler.getRMContext().getNodeLabelManager(); + candidatesSelectionPolicies = new ArrayList<>(); // Do we need white queue-priority preemption policy? boolean isQueuePriorityPreemptionEnabled = @@ -246,6 +257,8 @@ public void init(Configuration config, RMContext context, if (isIntraQueuePreemptionEnabled) { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + + config = csConfig; } @Override @@ -255,6 +268,8 @@ public ResourceCalculator getResourceCalculator() { @Override public synchronized void editSchedule() { + updateConfigIfNeeded(); + long startTs = clock.getTime(); CSQueue root = scheduler.getRootQueue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ec..d4d1a48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -296,7 +297,7 @@ "reservation-enforcement-window"; @Private - public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; + public static final String LAZY_PREEMPTION_ENABLED = PREFIX + "lazy-preemption-enabled"; @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; @@ -1166,7 +1167,7 @@ public void setOrderingPolicyParameter(String queue, } public boolean getLazyPreemptionEnabled() { - return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); + return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED); } private static final String PREEMPTION_CONFIG_PREFIX = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a14a2b1..6395ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -77,6 +77,10 @@ import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.junit.Assert.assertEquals; @@ -187,9 +191,7 @@ public void setup() { appAlloc = 0; } - @Test - public void testIgnore() { - int[][] qData = new int[][]{ + private static final int[][] Q_DATA_FOR_IGNORE = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs { 100, 100, 100, 100 }, // maxCap @@ -199,8 +201,12 @@ public void testIgnore() { { 3, 1, 1, 1 }, // apps { -1, 1, 1, 1 }, // req granularity { 3, 0, 0, 0 }, // subqueues - }; - ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + }; + + @Test + public void testIgnore() { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); policy.editSchedule(); // don't correct imbalances without demand verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); @@ -1033,6 +1039,31 @@ public void testPreemptionNotHappenForSingleReservedQueue() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } + + @Test + public void testRefreshPreemptionProperties() throws Exception { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); + + assertEquals(DEFAULT_PREEMPTION_MONITORING_INTERVAL, + policy.getMonitoringInterval()); + assertEquals(DEFAULT_PREEMPTION_OBSERVE_ONLY, + policy.isObserveOnly()); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + long newMonitoringInterval = 5000; + boolean newObserveOnly = true; + newConf.setLong(PREEMPTION_MONITORING_INTERVAL, newMonitoringInterval); + newConf.setBoolean(PREEMPTION_OBSERVE_ONLY, newObserveOnly); + when(mCS.getConfiguration()).thenReturn(newConf); + + policy.editSchedule(); + + assertEquals(newMonitoringInterval, policy.getMonitoringInterval()); + assertEquals(newObserveOnly, policy.isObserveOnly()); + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java index e7157b8..4e4e3c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java @@ -54,7 +54,7 @@ @Before public void setUp() throws Exception { super.setUp(); - conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, true); }