From 9d29c6596de0639d07ca80bacf9429dd73a4a6e3 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 11 Feb 2019 20:01:40 +0530 Subject: [PATCH] YARN-9290 --- .../resourcemanager/DefaultAMSProcessor.java | 3 ++ .../resourcemanager/scheduler/Allocation.java | 19 ++++++--- .../scheduler/AppSchedulingInfo.java | 25 ++++++++++++ .../scheduler/common/fica/FiCaSchedulerApp.java | 2 +- .../scheduler/placement/AppPlacementAllocator.java | 5 +++ .../SingleConstraintAppPlacementAllocator.java | 1 + .../TestSchedulingRequestContainerAllocation.java | 47 ++++++++++++++++++++++ 7 files changed, 95 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index b2c5ef3..752ddfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -343,6 +343,9 @@ public void allocate(ApplicationAttemptId appAttemptId, response.setContainersFromPreviousAttempts( allocation.getPreviousAttemptContainers()); + + response.setRejectedSchedulingRequests(allocation.getRejectedRequest()); + } private void handleInvalidResourceException(InvalidResourceRequestException e, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 9573ac8..f8e47bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; public class Allocation { @@ -40,7 +41,7 @@ final List demotedContainers; private final List previousAttemptContainers; private Resource resourceLimit; - + private List rejectedRequest; public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, @@ -52,17 +53,17 @@ public Allocation(List containers, Resource resourceLimit, public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens) { - this(containers, resourceLimit,strictContainers, fungibleContainers, - fungibleResources, nmTokens, null, null, null, null, null); + this(containers, resourceLimit, strictContainers, fungibleContainers, + fungibleResources, nmTokens, null, null, null, null, null, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens, List increasedContainers, List decreasedContainer) { - this(containers, resourceLimit,strictContainers, fungibleContainers, + this(containers, resourceLimit, strictContainers, fungibleContainers, fungibleResources, nmTokens, increasedContainers, decreasedContainer, - null, null, null); + null, null, null, null); } public Allocation(List containers, Resource resourceLimit, @@ -70,7 +71,8 @@ public Allocation(List containers, Resource resourceLimit, List fungibleResources, List nmTokens, List increasedContainers, List decreasedContainer, List promotedContainers, List demotedContainer, - List previousAttemptContainers) { + List previousAttemptContainers, List + rejectedRequest) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; @@ -82,6 +84,7 @@ public Allocation(List containers, Resource resourceLimit, this.promotedContainers = promotedContainers; this.demotedContainers = demotedContainer; this.previousAttemptContainers = previousAttemptContainers; + this.rejectedRequest = rejectedRequest; } public List getContainers() { @@ -128,6 +131,10 @@ public Resource getResourceLimit() { return previousAttemptContainers; } + public List getRejectedRequest() { + return rejectedRequest; + } + @VisibleForTesting public void setResourceLimit(Resource resource) { this.resourceLimit = resource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index ca7d9ce..369b65d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -41,7 +41,10 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -95,6 +98,7 @@ public final ContainerUpdateContext updateContext; private final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; + private final int retryAttempts; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, long epoch, @@ -110,6 +114,9 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, this.appResourceUsage = appResourceUsage; this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs); this.rmContext = rmContext; + this.retryAttempts = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); updateContext = new ContainerUpdateContext(this); @@ -493,6 +500,21 @@ public boolean getAndResetBlacklistChanged() { return ret; } + public List getRejectedRequest() { + List ret = new ArrayList<>(); + try { + this.readLock.lock(); + schedulerKeyToAppPlacementAllocator.values().stream() + .filter(ap -> ap.getPlacementAttempt() >= retryAttempts) + .forEach(ap -> ret.add(RejectedSchedulingRequest.newInstance( + RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, + ap.getSchedulingRequest()))); + } finally { + this.readLock.unlock(); + } + return ret; + } + public PendingAsk getNextPendingAsk() { try { readLock.lock(); @@ -776,6 +798,9 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey, this.readLock.lock(); AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(schedulerKey); + if (ap.getPlacementAttempt() >= retryAttempts) { + return false; + } return (ap != null) && ap.precheckNode(schedulerNode, schedulingMode); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 114ee51..726863b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -832,7 +832,7 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator, currentContPreemption, Collections.singletonList(rr), updatedNMTokens, newlyIncreasedContainers, newlyDecreasedContainers, newlyPromotedContainers, newlyDemotedContainers, - previousAttemptContainers); + previousAttemptContainers, appSchedulingInfo.getRejectedRequest()); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index 088b3dd..b19c233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -55,6 +55,7 @@ protected AppSchedulingInfo appSchedulingInfo; protected SchedulerRequestKey schedulerRequestKey; protected RMContext rmContext; + protected int placementAttempt; /** * Get iterator of preferred node depends on requirement and/or availability. @@ -198,4 +199,8 @@ public void initialize(AppSchedulingInfo appSchedulingInfo, * @return SchedulingRequest */ public abstract SchedulingRequest getSchedulingRequest(); + + public int getPlacementAttempt() { + return placementAttempt; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 54e4666..b3a6ab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -357,6 +357,7 @@ private boolean checkCardinalityAndPending(SchedulerNode node) { placementConstraintManager, allocationTagsManager); } catch (InvalidAllocationTagsQueryException e) { LOG.warn("Failed to query node cardinality:", e); + this.placementAttempt++; return false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index b1bb515..27d1e18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -446,6 +446,53 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + @Test(timeout = 30000L) + public void testInvalidSchedulingRequest() throws Exception { + Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf); + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // 4 NMs. + MockNM[] nms = new MockNM[4]; + RMNode[] rmNodes = new RMNode[4]; + for (int i = 0; i < 4; i++) { + nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId()); + } + + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]); + + // Constraint with Invalid Allocation Tag Namespace + PlacementConstraint constraint = targetNotIn("node", + allocationTagWithNamespace("invalid","t1")).build(); + SchedulingRequest sc = SchedulingRequest + .newInstance(1, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + ImmutableSet.of("t1"), + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + constraint); + AllocateRequest request = AllocateRequest.newBuilder() + .schedulingRequests(ImmutableList.of(sc)).build(); + am1.allocate(request); + + while (true) { + doNodeHeartbeat(nms); + AllocateResponse response = am1.schedule(); + if (response.getRejectedSchedulingRequests().size() == 1) { + return; + } + Thread.sleep(500); + } + } + private static void doNodeHeartbeat(MockNM... nms) throws Exception { for (MockNM nm : nms) { nm.nodeHeartbeat(true); -- 2.7.4 (Apple Git-66)