diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 886f4ab..cc46e13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; @@ -781,26 +782,47 @@ protected void serviceStop() throws Exception { } protected void createPolicyMonitors() { - if (scheduler instanceof PreemptableResourceScheduler - && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { - LOG.info("Loading policy monitors"); - List policies = conf.getInstances( - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - SchedulingEditPolicy.class); - if (policies.size() > 0) { - for (SchedulingEditPolicy policy : policies) { - LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); - // periodically check whether we need to take action to guarantee - // constraints - SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); - addService(mon); + if (scheduler instanceof PreemptableResourceScheduler) { + // Make sure we have a PCPP added to service so later on it will + // sync to scheduler in order to update preemption config changes + boolean isPCPPFound = false; + + List policies; + if (conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { + LOG.info("Loading policy monitors"); + policies = conf.getInstances( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + SchedulingEditPolicy.class); + if (policies.size() > 0) { + for (SchedulingEditPolicy policy : policies) { + if (policy instanceof ProportionalCapacityPreemptionPolicy) { + isPCPPFound = true; + } + + LOG.info( + "LOADING SchedulingEditPolicy:" + policy.getPolicyName()); + // periodically check whether we need to take action to guarantee + // constraints + SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); + addService(mon); + } + } else{ + LOG.warn("Policy monitors configured (" + + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + + ") but none specified (" + + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); } - } else { - LOG.warn("Policy monitors configured (" + - YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + - ") but none specified (" + - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); + } + + if (!isPCPPFound) { + ProportionalCapacityPreemptionPolicy policy = + new ProportionalCapacityPreemptionPolicy(); + LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); + // periodically check whether we need to take action to guarantee + // constraints + SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); + addService(mon); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 03e180d..c629dc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -36,7 +36,6 @@ //received. private Thread checkerThread; private volatile boolean stopped; - private long monitorInterval; private RMContext rmContext; public SchedulingMonitor(RMContext rmContext, @@ -54,7 +53,6 @@ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() { public void serviceInit(Configuration conf) throws Exception { scheduleEditPolicy.init(conf, rmContext, (PreemptableResourceScheduler) rmContext.getScheduler()); - this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); } @@ -97,7 +95,7 @@ public void run() { // Wait before next run try { - Thread.sleep(monitorInterval); + Thread.sleep(scheduleEditPolicy.getMonitoringInterval()); } catch (InterruptedException e) { LOG.info(getName() + " thread interrupted"); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 3bf6994..785d1ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -88,7 +88,8 @@ // Configurable fields private double maxIgnoredOverCapacity; private long maxWaitTime; - private long monitoringInterval; + private long monitoringInterval = + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL; private float percentageClusterPreemptionAllowed; private double naturalTerminationFactor; private boolean observeOnly; @@ -119,6 +120,10 @@ private Map preemptableQueues; private Set killableContainers; + // Local scheduler config version + private long localSchedulerConfigVersion = -1; + private boolean preemptionEnabled = false; + @SuppressWarnings("unchecked") public ProportionalCapacityPreemptionPolicy() { clock = SystemClock.getInstance(); @@ -149,6 +154,28 @@ public void init(Configuration config, RMContext context, } rmContext = context; scheduler = (CapacityScheduler) sched; + + syncConfigFromScheduler(); + } + + private void syncConfigFromScheduler() { + long newSchedulerConfigVersion = scheduler.getSchedulerConfigVersion(); + if (localSchedulerConfigVersion == newSchedulerConfigVersion) { + // If no scheduler config changed, skip all following logic + return; + } + + // Update local version + localSchedulerConfigVersion = newSchedulerConfigVersion; + + // Update preemption enabled + preemptionEnabled = scheduler.preemptionEnabled(); + if (!preemptionEnabled) { + // When preemption becomes disabled, clean up selected candidates and exit + preemptionCandidates.clear(); + return; + } + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); maxIgnoredOverCapacity = csConfig.getDouble( @@ -181,19 +208,21 @@ public void init(Configuration config, RMContext context, maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( CapacitySchedulerConfiguration. - INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, CapacitySchedulerConfiguration. - DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); + DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); minimumThresholdForIntraQueuePreemption = csConfig.getFloat( CapacitySchedulerConfiguration. - INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, + INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, CapacitySchedulerConfiguration. - DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); + DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); rc = scheduler.getResourceCalculator(); nlm = scheduler.getRMContext().getNodeLabelManager(); + candidatesSelectionPolicies.clear(); + // Do we need white queue-priority preemption policy? boolean isQueuePriorityPreemptionEnabled = csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); @@ -205,9 +234,9 @@ public void init(Configuration config, RMContext context, // Do we need to specially consider reserved containers? boolean selectCandidatesForResevedContainers = csConfig.getBoolean( CapacitySchedulerConfiguration. - PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, CapacitySchedulerConfiguration. - DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); + DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS); if (selectCandidatesForResevedContainers) { candidatesSelectionPolicies .add(new ReservedContainerCandidatesSelector(this)); @@ -232,6 +261,13 @@ public ResourceCalculator getResourceCalculator() { @Override public synchronized void editSchedule() { + // Before doing preemption, sync config from scheduler + syncConfigFromScheduler(); + if (!preemptionEnabled) { + // Skip following logic if preemption is disabled + return; + } + long startTs = clock.getTime(); CSQueue root = scheduler.getRootQueue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index b73c538..bc3a6ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -47,4 +47,9 @@ */ void markContainerForKillable(RMContainer container); + /** + * Get if preemption enabled + * @return if preemption enabled + */ + boolean preemptionEnabled(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 03bdd3a..5fb9913 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; @@ -167,6 +169,7 @@ private int offswitchPerHeartbeatLimit; + private AtomicLong schedulerConfigVersion = new AtomicLong(0); @Override public void setConf(Configuration conf) { @@ -414,6 +417,12 @@ public void reinitialize(Configuration newConf, RMContext rmContext) // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + + schedulerConfigVersion.getAndIncrement(); + // Make sure version is always >= 0 + if (schedulerConfigVersion.get() < 0) { + schedulerConfigVersion.set(0); + } } finally { writeLock.unlock(); } @@ -2567,4 +2576,34 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, writeLock.unlock(); } } + + @Override + public boolean preemptionEnabled() { + boolean monitorEnabled = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + if (monitorEnabled) { + String[] monitors = conf.getStrings( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, new String[0]); + for (String s : monitors) { + if (StringUtils.equals(s, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName())) { + return true; + } + } + } + return false; + } + + /** + * Get version of scheduler config, version will be increased by 1 for each + * reinitialize called. + * + * TODO: only increase version if changes made to scheduler. + * + * @return version. + */ + public long getSchedulerConfigVersion() { + return schedulerConfigVersion.get(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a14a2b1..79848e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1057,6 +1057,7 @@ public String toString() { } ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { + when(mCS.preemptionEnabled()).thenReturn(true); ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy( rmContext, mCS, mClock); clusterResources = Resource.newInstance( @@ -1070,6 +1071,7 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, String[][] resData, boolean useDominantResourceCalculator) { + when(mCS.preemptionEnabled()).thenReturn(true); if (useDominantResourceCalculator) { when(mCS.getResourceCalculator()).thenReturn( new DominantResourceCalculator());