diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 118f9fb9927..bcfd5fd99bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -532,11 +532,57 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; - /** Enable rich placement constraints. */ - public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = - RM_PREFIX + "placement-constraints.enabled"; + /** + * Specify use which handler will be used to process PlacementConstraints. + * For detail of PlacementConstraint please refer to + * {@link org.apache.hadoop.yarn.api.resource.PlacementConstraint} + */ + @Private + public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER = + RM_PREFIX + "placement-constraints.handler"; + + /** + * If any SchedulingRequest asked by application, the allocate call will be + * rejected. + */ + @Private + public static final String NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER = "none"; + + /** + * For this handler, the placement of containers with constraints is + * determined as a pre-processing step before the capacity or the fair + * scheduler is called. Once the placement is decided, the capacity/fair + * scheduler is invoked to perform the actual allocation. The advantage + * of this approach is that it supports all constraint types (affinity, + * anti-affinity, cardinality). Moreover, it considers multiple containers + * at a time, which allows to satisfy more constraints than a + * container-at-a-time approach can achieve. As it sits outside the main + * scheduler, it can be used by both the capacity and fair schedulers. + * Note that at the moment it does not account for task priorities within + * an application, given that such priorities might be conflicting with the + * placement constraints. + */ + @Private + public static final String PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "placement-processor"; - public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false; + /** + * For this handler, containers with constraints will be placed by scheduler + * (as of now, only CapacityScheduler supports SchedulingRequest. + * It currently supports anti-affinity constraints (no affinity or + * cardinality) and places one container at a time. The advantage of this + * handler comparing to placement-processor is: it follows the same ordering + * rules for queues (sorted by utilization, priority) and apps + * (sorted by FIFO/fairness/priority) which enforced by existing scheduler. + * + * If the configured RM + * scheduler:
yarn.resourcemanager.scheduler.class
cannot handle + * placement constraint, SchedulingRequests will be rejected. + */ + @Private + public static final String + SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER = + "scheduler"; /** Placement Algorithm. */ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java index fdc8d58d723..0e882996d42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java @@ -65,7 +65,8 @@ public void testAMRMClientWithPlacementConstraints() // mismatches between client and server teardown(); conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); createClusterAndStartApplication(conf); AMRMClient amClient = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 509a040fafc..094855a9dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -131,9 +131,13 @@ - Enable Constraint Placement. - yarn.resourcemanager.placement-constraints.enabled - false + + Specify use which handler will be used to process PlacementConstraints. + Acceptable values are: `placement-processor`, `scheduler` and `none`. + For detailed explanation of these values, please refer to documentation. + + yarn.resourcemanager.placement-constraints.handler + none diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index aa1177d514c..02b56e66374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -59,7 +59,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -67,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NonePlacementProcessor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -118,20 +121,44 @@ protected void serviceInit(Configuration conf) throws Exception { initializeProcessingChain(conf); } - private void initializeProcessingChain(Configuration conf) { + private void addPlacementConstraintHandler(Configuration conf) { amsProcessingChain.init(rmContext, null); - boolean enablePlacementConstraints = conf.getBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, - YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED); - if (enablePlacementConstraints) { - amsProcessingChain.addProcessor(new PlacementProcessor()); + String placementConstraintsHandler = conf.get( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER); + if (placementConstraintsHandler.equals( + YarnConfiguration.NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info("Use " + YarnConfiguration.NONE_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler, all placement requests will be rejected."); + amsProcessingChain.addProcessor(new NonePlacementProcessor()); + } else if (placementConstraintsHandler.equals( + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + LOG.info("Use " + + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER + + " placement handler, all placement requests will be handled by an " + + "external processor outside of scheduler"); + amsProcessingChain.addProcessor(new PlacementConstraintProcessor()); + } else if (placementConstraintsHandler.equals( + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) { + amsProcessingChain.addProcessor(new SchedulerPlacementProcessor()); } + } + + private void initializeProcessingChain(Configuration conf) { + addPlacementConstraintHandler(conf); + List processors = getProcessorList(conf); if (processors != null) { Collections.reverse(processors); for (ApplicationMasterServiceProcessor p : processors) { // Ensure only single instance of PlacementProcessor is included - if (enablePlacementConstraints && p instanceof PlacementProcessor) { + if (p instanceof AbstractPlacementProcessor) { + LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName() + + " defined in " + + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS + + ", however PlacementProcessor handler should be configured " + + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + ", this processor will be ignored."); continue; } this.amsProcessingChain.addProcessor(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d3aa5cbe728..de246972345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1090,18 +1090,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, return EMPTY_ALLOCATION; } - if ((!getConfiguration().getBoolean( - CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED)) - && schedulingRequests != null && (!schedulingRequests.isEmpty())) { - throw new SchedulerInvalidResoureRequestException( - "Application attempt:" + applicationAttemptId - + " is using SchedulingRequest, which is disabled. Please update " - + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED - + " to true in capacity-scheduler.xml in order to use this " - + "feature."); - } - // The allocate may be the leftover from previous attempt, and it will // impact current attempt, such as confuse the request and allocation for // current attempt's AM container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 00733a1e683..e609be905df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -77,11 +77,6 @@ @Private public static final String PREFIX = "yarn.scheduler.capacity."; - - @Private - public static final String SCHEDULING_REQUEST_ALLOWED = - PREFIX + "scheduling-request.allowed"; - public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false; @Private public static final String DOT = "."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java new file mode 100644 index 00000000000..7a6eb67f0ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; + +/** + * Base class of all PlacementProcessors + */ +public abstract class AbstractPlacementProcessor implements + ApplicationMasterServiceProcessor{ + protected ApplicationMasterServiceProcessor nextAMSProcessor; + protected AbstractYarnScheduler scheduler; + + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + this.nextAMSProcessor = nextProcessor; + this.scheduler = + (AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler(); + } + + @Override + public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request, + response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java new file mode 100644 index 00000000000..24c1951f604 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NonePlacementProcessor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Processor which reject all SchedulingRequests + */ +public class NonePlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(NonePlacementProcessor.class); + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + if (request.getPlacementConstraints() != null && !request + .getPlacementConstraints().isEmpty()) { + String message = "Found non empty placement constraints map of " + + "RegisterApplicationMasterRequest for application=" + + applicationAttemptId.toString() + ", however the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints, rejecting this " + + "registerApplicationMaster operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null && !request + .getSchedulingRequests().isEmpty()) { + String message = "Found non empty SchedulingRequest of " + + "AllocateRequest for application=" + + appAttemptId.toString() + ", however the configured " + + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER + + " cannot handle placement constraints, rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java similarity index 96% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java index 9ce38f4fdda..f4978007ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.slf4j.Logger; @@ -69,7 +68,7 @@ * the placement constraint manager. * 3. Dispatches Scheduling Requests to the Planner. */ -public class PlacementProcessor implements ApplicationMasterServiceProcessor { +public class PlacementConstraintProcessor extends AbstractPlacementProcessor { /** * Wrapper over the SchedulingResponse that wires in the placement attempt @@ -90,11 +89,9 @@ private Response(boolean isSuccess, ApplicationId applicationId, } private static final Logger LOG = - LoggerFactory.getLogger(PlacementProcessor.class); + LoggerFactory.getLogger(PlacementConstraintProcessor.class); private PlacementConstraintManager constraintManager; - private ApplicationMasterServiceProcessor nextAMSProcessor; - private AbstractYarnScheduler scheduler; private ExecutorService schedulingThreadPool; private int retryAttempts; private Map> requestsToRetry = @@ -110,12 +107,10 @@ private Response(boolean isSuccess, ApplicationId applicationId, public void init(ApplicationMasterServiceContext amsContext, ApplicationMasterServiceProcessor nextProcessor) { LOG.info("Initializing Constraint Placement Processor:"); - this.nextAMSProcessor = nextProcessor; + super.init(amsContext, nextProcessor); this.constraintManager = ((RMContextImpl)amsContext).getPlacementConstraintManager(); - this.scheduler = - (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler(); // Only the first class is considered - even if a comma separated // list is provided. (This is for simplicity, since getInstances does a // lot of good things by handling things correctly) @@ -315,7 +310,7 @@ public void finishApplicationMaster(ApplicationAttemptId appAttemptId, placementDispatcher.clearApplicationState(appAttemptId.getApplicationId()); requestsToReject.remove(appAttemptId.getApplicationId()); requestsToRetry.remove(appAttemptId.getApplicationId()); - nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response); + super.finishApplicationMaster(appAttemptId, request, response); } private void handleSchedulingResponse(SchedulingResponse schedulerResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java new file mode 100644 index 00000000000..0e87dc380fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Use scheduler to handle placement constraint requests. + */ +public class SchedulerPlacementProcessor extends AbstractPlacementProcessor { + private static final Logger LOG = + LoggerFactory.getLogger(SchedulerPlacementProcessor.class); + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + if (request.getPlacementConstraints() != null && !request + .getPlacementConstraints().isEmpty()) { + if (!(scheduler instanceof CapacityScheduler)) { + String message = "Found non empty placement constraints map of " + + "RegisterApplicationMasterRequest for application=" + + applicationAttemptId.toString() + + ", however the configured scheduler:" + scheduler.getClass() + .getCanonicalName() + + " cannot handle placement constraints, rejecting this " + + "registerApplicationMaster operation"; + LOG.warn(message); + throw new YarnException(message); + } + } + nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request, + response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + if (request.getSchedulingRequests() != null && !request + .getSchedulingRequests().isEmpty()) { + if (!(scheduler instanceof CapacityScheduler)) { + String message = "Found non empty SchedulingRequest of " + + "AllocateRequest for application=" + appAttemptId.toString() + + ", however the configured scheduler=" + scheduler.getClass() + .getCanonicalName() + + " cannot handle placement constraints, rejecting this " + + "allocate operation"; + LOG.warn(message); + throw new YarnException(message); + } + } + nextAMSProcessor.allocate(appAttemptId, request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java index 484d7803410..ee7e013ccbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java @@ -50,6 +50,8 @@ public void testBasicPendingResourceUpdate() throws Exception { Configuration conf = TestUtils.getConfigurationWithQueueLabels( new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); @@ -166,6 +168,8 @@ public void testNodePartitionPendingResourceUpdate() throws Exception { Configuration conf = TestUtils.getConfigurationWithQueueLabels( new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index b297f79d114..27d86611e31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -58,8 +58,8 @@ public void setUp() throws Exception { public void testIntraAppAntiAffinity() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { @@ -141,8 +141,8 @@ public RMNodeLabelsManager createNodeLabelManager() { public void testIntraAppAntiAffinityWithMultipleTags() throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java index fc1cb0d37b6..d1d05dc407f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java @@ -57,13 +57,13 @@ public void setUp() throws Exception { private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception { Configuration csConf = TestUtils.getConfigurationWithMultipleQueues( new Configuration()); - csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); csConf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, numThreads); csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms", 0); + csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); // inject node label manager MockRM rm1 = new MockRM(csConf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 7180e24c0c2..fae63be5051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -275,9 +275,7 @@ public static Container getMockContainer( public static Configuration getConfigurationWithQueueLabels(Configuration config) { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); - conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED, - true); - + // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index c4c0b5df475..e129a750c17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -86,8 +86,8 @@ public void createAndStartRM() { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1); startRM(conf); @@ -381,8 +381,8 @@ public void testSchedulerRejection() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); startRM(conf); HashMap nodes = new HashMap<>(); @@ -533,8 +533,8 @@ public void testRePlacementAfterSchedulerRejection() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean( - YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); conf.setInt( YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2); startRM(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/PlacementConstraints.md.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/PlacementConstraints.md.vm index 7926eaba43d..787e1a6e214 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/PlacementConstraints.md.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/PlacementConstraints.md.vm @@ -1,3 +1,4 @@ +