diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 118f9fb9927..c5ca098757b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -532,11 +532,38 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; - /** Enable rich placement constraints. */ - public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = - RM_PREFIX + "placement-constraints.enabled"; + /** Specify which */ + @Private + public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER = + RM_PREFIX + "placement-constraints.handler"; + + /** + * Reject all placement requests. + */ + @Private + public static final String NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER = "none"; + + /** + * Handle placement constraints by an external processor which lives outside + * of scheduler. + */ + @Private + public static final String + EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "external-processor"; + + /** + * Handle placement constraints by RM scheduler itself, if the configured RM + * scheduler:
yarn.resourcemanager.scheduler.class
cannot handle + * placement constraint, placement requests will be rejected. + */ + @Private + public static final String + SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "scheduler"; - public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false; + @Private + public static final String DEFAULT_RM_PLACEMENT_CONSTRAINTS_HANDLER = "none"; /** Placement Algorithm. */ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java index fdc8d58d723..2bcdd3b3354 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java @@ -65,7 +65,7 @@ public void testAMRMClientWithPlacementConstraints() // mismatches between client and server teardown(); conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, true); createClusterAndStartApplication(conf); AMRMClient amClient = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 509a040fafc..b0f6e304830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -131,9 +131,14 @@ - Enable Constraint Placement. - yarn.resourcemanager.placement-constraints.enabled - false + + Specify which class to handle placement constraint requests, + by default is none which reject all placement constraint requests. + Acceptable values are external-processor (Handled by an external processor + lives outside of scheduler) / scheduler (handled by YARN scheduler itself) + + yarn.resourcemanager.placement-constraints.handler + none diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index aa1177d514c..5f701947d45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -59,7 +59,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.ExternalPlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -67,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NonePlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -118,20 +121,44 @@ protected void serviceInit(Configuration conf) throws Exception { initializeProcessingChain(conf); } - private void initializeProcessingChain(Configuration conf) { + private void addPlacementConstraintHandler(Configuration conf) { amsProcessingChain.init(rmContext, null); - boolean enablePlacementConstraints = conf.getBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED); - if (enablePlacementConstraints) { - amsProcessingChain.addProcessor(new PlacementProcessor()); + String placementConstraintsHandler = conf.get( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_HANDLER); + if (placementConstraintsHandler.equals( + YarnConfiguration.NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info("Use " + YarnConfiguration.NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler, all placement requests will be rejected."); + amsProcessingChain.addProcessor(new NonePlacementProcessor()); + } else if (placementConstraintsHandler.equals( + YarnConfiguration.EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info("Use " + + YarnConfiguration.EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler, all placement requests will be handled by an " + + "external processor outside of scheduler"); + amsProcessingChain.addProcessor(new ExternalPlacementProcessor()); + } else if (placementConstraintsHandler.equals( + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + amsProcessingChain.addProcessor(new SchedulerPlacementProcessor()); } + } + + private void initializeProcessingChain(Configuration conf) { + addPlacementConstraintHandler(conf); + List processors = getProcessorList(conf); if (processors != null) { Collections.reverse(processors); for (ApplicationMasterServiceProcessor p : processors) { // Ensure only single instance of PlacementProcessor is included - if (enablePlacementConstraints && p instanceof PlacementProcessor) { + if (p instanceof AbstractPlacementProcessor) { + LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName() + + " defined in " + + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS + + ", however PlacementProcessor handler should be configured " + + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + ", this processor will be ignored."); continue; } this.amsProcessingChain.addProcessor(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3aa5cbe728..de246972345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1090,18 +1090,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, return EMPTY_ALLOCATION; } - if ((!getConfiguration().getBoolean( - CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED)) - && schedulingRequests != null && (!schedulingRequests.isEmpty())) { - throw new SchedulerInvalidResoureRequestException( - "Application attempt:" + applicationAttemptId - + " is using SchedulingRequest, which is disabled. Please update " - + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED - + " to true in capacity-scheduler.xml in order to use this " - + "feature."); - } - // The allocate may be the leftover from previous attempt, and it will // impact current attempt, such as confuse the request and allocation for // current attempt's AM container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 00733a1e683..e609be905df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -77,11 +77,6 @@ @Private public static final String PREFIX = "yarn.scheduler.capacity."; - - @Private - public static final String SCHEDULING_REQUEST_ALLOWED = - PREFIX + "scheduling-request.allowed"; - public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false; @Private public static final String DOT = "."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java new file mode 100644 index 00000000000..7a6eb67f0ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; + +/** + * Base class of all PlacementProcessors + */ +public abstract class AbstractPlacementProcessor implements + ApplicationMasterServiceProcessor{ + protected ApplicationMasterServiceProcessor nextAMSProcessor; + protected AbstractYarnScheduler scheduler; + + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + this.nextAMSProcessor = nextProcessor; + this.scheduler = + (AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler(); + } + + @Override + public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request, + response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/ExternalPlacementProcessor.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/ExternalPlacementProcessor.java index 9ce38f4fdda..6957b740a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/ExternalPlacementProcessor.java @@ -69,7 +69,7 @@ * the placement constraint manager. * 3. Dispatches Scheduling Requests to the Planner. */ -public class PlacementProcessor implements ApplicationMasterServiceProcessor { +public class ExternalPlacementProcessor extends AbstractPlacementProcessor { /** * Wrapper over the SchedulingResponse that wires in the placement attempt @@ -90,11 +90,9 @@ private Response(boolean isSuccess, ApplicationId applicationId, } private static final Logger LOG = - LoggerFactory.getLogger(PlacementProcessor.class); + LoggerFactory.getLogger(ExternalPlacementProcessor.class); private PlacementConstraintManager constraintManager; - private ApplicationMasterServiceProcessor nextAMSProcessor; - private AbstractYarnScheduler scheduler; private ExecutorService schedulingThreadPool; private int retryAttempts; private Map> requestsToRetry = @@ -110,12 +108,10 @@ private Response(boolean isSuccess, ApplicationId applicationId, public void init(ApplicationMasterServiceContext amsContext, ApplicationMasterServiceProcessor nextProcessor) { LOG.info("Initializing Constraint Placement Processor:"); - this.nextAMSProcessor = nextProcessor; + super.init(amsContext, nextProcessor); this.constraintManager = ((RMContextImpl)amsContext).getPlacementConstraintManager(); - this.scheduler = - (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler(); // Only the first class is considered - even if a comma separated // list is provided. (This is for simplicity, since getInstances does a // lot of good things by handling things correctly) @@ -315,7 +311,7 @@ public void finishApplicationMaster(ApplicationAttemptId appAttemptId, placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); requestsToReject.remove(appAttemptId.getApplicationId()); requestsToRetry.remove(appAttemptId.getApplicationId()); - nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response); + super.finishApplicationMaster(appAttemptId, request, response); } private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java new file mode 100644 index 00000000000..24c1951f604 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Processor which reject all SchedulingRequests + */ +public class NonePlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(NonePlacementProcessor.class); + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + if (request.getPlacementConstraints() != null && !request + .getPlacementConstraints().isEmpty()) { + String message = "Found non empty placement constraints map of " + + "RegisterApplicationMasterRequest for application=" + + applicationAttemptId.toString() + ", however the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints, rejecting this " + + "registerApplicationMaster operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null && !request + .getSchedulingRequests().isEmpty()) { + String message = "Found non empty SchedulingRequest of " + + "AllocateRequest for application=" + + appAttemptId.toString() + ", however the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints, rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java new file mode 100644 index 00000000000..0e87dc380fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Use scheduler to handle placement constraint requests. + */ +public class SchedulerPlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(SchedulerPlacementProcessor.class); + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + if (request.getPlacementConstraints() != null && !request + .getPlacementConstraints().isEmpty()) { + if (!(scheduler instanceof CapacityScheduler)) { + String message = "Found non empty placement constraints map of " + + "RegisterApplicationMasterRequest for application=" + + applicationAttemptId.toString() + + ", however the configured scheduler:" + scheduler.getClass() + .getCanonicalName() + + " cannot handle placement constraints, rejecting this " + + "registerApplicationMaster operation"; + LOG.warn(message); + throw new YarnException(message); + } + } + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null && !request + .getSchedulingRequests().isEmpty()) { + if (!(scheduler instanceof CapacityScheduler)) { + String message = "Found non empty SchedulingRequest of " + + "AllocateRequest for application=" + appAttemptId.toString() + + ", however the configured scheduler=" + scheduler.getClass() + .getCanonicalName() + + " cannot handle placement constraints, rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index b297f79d114..27d86611e31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -58,8 +58,8 @@ public void setUp() throws Exception { public void testIntraAppAntiAffinity() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { @@ -141,8 +141,8 @@ public RMNodeLabelsManager createNodeLabelManager() { public void testIntraAppAntiAffinityWithMultipleTags() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java index fc1cb0d37b6..d1d05dc407f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java @@ -57,13 +57,13 @@ public void setUp() throws Exception { private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); csConf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, numThreads); csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms", 0); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 7180e24c0c2..fae63be5051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -275,9 +275,7 @@ public static Container getMockContainer( public static Configuration getConfigurationWithQueueLabels(Configuration config) { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); - conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); - + // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index c4c0b5df475..71855cafe90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -86,8 +86,8 @@ public void createAndStartRM() { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); startRM(conf); @@ -381,8 +381,8 @@ public void testSchedulerRejection() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); startRM(conf); HashMap nodes = new HashMap<>(); @@ -533,8 +533,8 @@ public void testRePlacementAfterSchedulerRejection() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.EXTERNAL_PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2); startRM(conf);