diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 826d9f523eb..2ed232fae37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -102,6 +102,9 @@ @VisibleForTesting Map> configuredQueues; + // Preemption global configuration + private FSPreemptionConfig fsPreemptionConfig; + // Reservation system configuration private ReservationQueueConfiguration globalReservationQueueConfig; @@ -110,6 +113,7 @@ public AllocationConfiguration(QueueProperties queueProperties, AllocationFileParser allocationFileParser, QueuePlacementPolicy newPlacementPolicy, + FSPreemptionConfig fsPreemptionConfig, ReservationQueueConfiguration globalReservationQueueConfig) throws AllocationConfigurationException { this.minQueueResources = queueProperties.getMinQueueResources(); @@ -137,6 +141,7 @@ public AllocationConfiguration(QueueProperties queueProperties, this.queueAcls = queueProperties.getQueueAcls(); this.resAcls = queueProperties.getReservationAcls(); this.reservableQueues = queueProperties.getReservableQueues(); + this.fsPreemptionConfig = fsPreemptionConfig; this.globalReservationQueueConfig = globalReservationQueueConfig; this.placementPolicy = newPlacementPolicy; this.configuredQueues = queueProperties.getConfiguredQueues(); @@ -373,6 +378,10 @@ public void setAverageCapacity(int avgCapacity) { globalReservationQueueConfig.setAverageCapacity(avgCapacity); } + public FSPreemptionConfig getPreemptionConfig(){ + return fsPreemptionConfig; + } + /** * Initialize a {@link FSQueue} with queue-specific properties and its * metrics. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 3300948ce70..66ee7141c49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -263,8 +263,12 @@ public synchronized void reloadAllocations() ReservationQueueConfiguration globalReservationQueueConfig = createReservationQueueConfig(allocationFileParser); + FSPreemptionConfig fsPreemptionConfig = + createFSPreemptionConfig(allocationFileParser); + AllocationConfiguration info = new AllocationConfiguration(queueProperties, - allocationFileParser, newPlacementPolicy, globalReservationQueueConfig); + allocationFileParser, newPlacementPolicy, + fsPreemptionConfig, globalReservationQueueConfig); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -312,6 +316,18 @@ private void setupRootQueueProperties( } } + private FSPreemptionConfig createFSPreemptionConfig( + AllocationFileParser allocationFileParser) { + FSPreemptionConfig fsPreemptionConfig = new FSPreemptionConfig(); + + fsPreemptionConfig.setToBePreemptedAppPriorityThreshold( + allocationFileParser.getToBePreemptedAppPriorityThreshold()); + fsPreemptionConfig.setToBePreemptedContainerRuntimeThreshold( + allocationFileParser.getToBePreemptedContainerRuntimeThreshold()); + + return fsPreemptionConfig; + } + private ReservationQueueConfiguration createReservationQueueConfig( AllocationFileParser allocationFileParser) { ReservationQueueConfiguration globalReservationQueueConfig = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 2a6657aa5df..f6acc379b27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -589,6 +589,14 @@ private void untrackContainerForPreemption(RMContainer container) { } } + public void logPreemptContainerPreCheckInfo(String failedInfo) { + if (LOG.isDebugEnabled()){ + LOG.debug("FairScheduler Preemption Record: " + "[Preemption step 1]." + + " Container preemption pre check failed." + + " Because: " + failedInfo); + } + } + boolean canContainerBePreempted(RMContainer container, Resource alreadyConsideringForPreemption) { if (!isPreemptable()) { @@ -610,6 +618,31 @@ boolean canContainerBePreempted(RMContainer container, } } + // we should not preempt AM. + if (container.isAMContainer()) { + logPreemptContainerPreCheckInfo("this container is AM!"); + return false; + } + + // We should not preempt high priority job. + if (getPriority().getPriority() >= getQueue().getFSContext() + .getPreemptionConfig().getToBePreemptedAppPriorityThreshold()) { + logPreemptContainerPreCheckInfo( + "this app's priority is bigger than preemption thresh: " + + getQueue().getFSContext().getPreemptionConfig(). + getToBePreemptedAppPriorityThreshold()); + return false; + } + + // We should not preempt container which has been running for a long time. + if ((System.currentTimeMillis() - container.getCreationTime()) >= + getQueue().getFSContext().getPreemptionConfig() + .getToBePreemptedContainerRuntimeThreshold()) { + logPreemptContainerPreCheckInfo( + "this container already run a long time!"); + return false; + } + // Check if the app's allocation will be over its fairshare even // after preempting this container Resource usageAfterPreemption = getUsageAfterPreemptingContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java index eb76ca3f8fe..7d192372e50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java @@ -31,10 +31,16 @@ private FSStarvedApps starvedApps; private final FairScheduler scheduler; + FSPreemptionConfig preemptionConfig; + FSContext(FairScheduler scheduler) { this.scheduler = scheduler; } + public void updatePreemptionVariables(AllocationConfiguration allocConf){ + preemptionConfig = allocConf.getPreemptionConfig(); + } + boolean isPreemptionEnabled() { return preemptionEnabled; } @@ -62,4 +68,8 @@ void setPreemptionUtilizationThreshold( public Resource getClusterResource() { return scheduler.getClusterResource(); } + + public FSPreemptionConfig getPreemptionConfig(){ + return preemptionConfig; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3deddee5d6e..0bca4da7c24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -679,4 +679,8 @@ public void removeAssignedApp(ApplicationId applicationId) { writeLock.unlock(); } } + + public FSContext getFSContext(){ + return context; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionConfig.java new file mode 100644 index 00000000000..1806911825e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionConfig.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +public class FSPreemptionConfig { + + // to be preempted apps configs + private int toBePreemptedAppPriorityThreshold; + private long toBePreemptedContainerRuntimeThreshold; + + public int getToBePreemptedAppPriorityThreshold(){ + return toBePreemptedAppPriorityThreshold; + } + + public void setToBePreemptedAppPriorityThreshold( + int toBePreemptedAppPriorityThreshold){ + this.toBePreemptedAppPriorityThreshold + = toBePreemptedAppPriorityThreshold; + } + + public long getToBePreemptedContainerRuntimeThreshold(){ + return toBePreemptedContainerRuntimeThreshold; + } + + public void setToBePreemptedContainerRuntimeThreshold( + long toBePreemptedContainerRuntimeThreshold){ + this.toBePreemptedContainerRuntimeThreshold = + toBePreemptedContainerRuntimeThreshold; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 4c830523cb4..154b6e57e1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -111,6 +111,9 @@ public void run() { for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { List potentialNodes = scheduler.getNodeTracker() .getNodesByResourceName(rr.getResourceName()); + + + for (int i = 0; i < rr.getNumContainers(); i++) { PreemptableContainers bestContainers = getBestPreemptableContainers(rr, potentialNodes); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 282367edbaa..26ec1bbd3c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1647,6 +1647,7 @@ public void onReload(AllocationConfiguration queueInfo) queueMgr.setQueuesToDynamic(removedStaticQueues); applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); + context.updatePreemptionVariables(allocConf); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java index 161405b7656..ee1fe79855a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java @@ -60,6 +60,10 @@ private static final String QUEUE_MAX_APPS_DEFAULT = "queueMaxAppsDefault"; private static final String DEFAULT_FAIR_SHARE_PREEMPTION_THRESHOLD = "defaultFairSharePreemptionThreshold"; + private static final String TO_BE_PREEMPTED_APP_PRIORITY_THRESHOLD = + "toBePreemptedAppPriorityThreshold"; + private static final String TO_BE_PREEMPTED_CONTAINER_RUNTIME_THRESHOLD = + "toBePreemptedContainerRuntimeThreshold"; private static final String QUEUE_MAX_AM_SHARE_DEFAULT = "queueMaxAMShareDefault"; private static final String RESERVATION_PLANNER = "reservation-planner"; @@ -82,6 +86,8 @@ DEFAULT_FAIR_SHARE_PREEMPTION_TIMEOUT, FAIR_SHARE_PREEMPTION_TIMEOUT, DEFAULT_MIN_SHARE_PREEMPTION_TIMEOUT, QUEUE_MAX_APPS_DEFAULT, DEFAULT_FAIR_SHARE_PREEMPTION_THRESHOLD, QUEUE_MAX_AM_SHARE_DEFAULT, + TO_BE_PREEMPTED_APP_PRIORITY_THRESHOLD, + TO_BE_PREEMPTED_CONTAINER_RUNTIME_THRESHOLD, RESERVATION_PLANNER, RESERVATION_AGENT, RESERVATION_ADMISSION_POLICY, QUEUE_PLACEMENT_POLICY, QUEUE, POOL, USER, DEFAULT_QUEUE_SCHEDULING_POLICY, DEFAULT_QUEUE_SCHEDULING_MODE); @@ -211,6 +217,18 @@ public float getDefaultFairSharePreemptionThreshold() { return 0.5f; } + public int getToBePreemptedAppPriorityThreshold() { + Optional value = + getTextValue(TO_BE_PREEMPTED_APP_PRIORITY_THRESHOLD); + return value.map(Integer::parseInt).orElse(Integer.MAX_VALUE); + } + + public long getToBePreemptedContainerRuntimeThreshold() { + Optional value = + getTextValue(TO_BE_PREEMPTED_CONTAINER_RUNTIME_THRESHOLD); + return value.map(v -> Long.parseLong(v) * 1000L).orElse(Long.MAX_VALUE); + } + public float getQueueMaxAMShareDefault() { Optional value = getTextValue(QUEUE_MAX_AM_SHARE_DEFAULT); if (value.isPresent()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionConfig.java new file mode 100644 index 00000000000..fcf3c811545 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemptionConfig.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import static org.junit.Assert.assertEquals; + +/** + * Test Preemption configs in scheduler + */ +public class TestFairSchedulerPreemptionConfig extends FairSchedulerTestBase { + private final static File ALLOC_FILE = + new File(TEST_DIR, "test-queue-mgr"); + + @Before + public void setup() throws IOException { + createConfiguration(); + writeAllocFile(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } + + @After + public void teardown() { + ALLOC_FILE.deleteOnExit(); + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + private void writeAllocFile() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("3"); + out.println("600"); + out.println(""); + out.close(); + } + + @Test + public void testPreemptionConfiguration() + throws IOException { + // Check preemption global config + FSContext context = scheduler.getContext(); + assertEquals(3, + context.getPreemptionConfig().getToBePreemptedAppPriorityThreshold()); + assertEquals(600000, + context.getPreemptionConfig().getToBePreemptedContainerRuntimeThreshold()); + } +}