From 7fc2c9e6e0a9241fe30f40c57e5c4e16adb1c1f1 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Sat, 16 Dec 2017 01:14:59 +0530 Subject: [PATCH] YARN-7494 --- .../api/records/ApplicationSubmissionContext.java | 21 +++++++ .../src/main/proto/yarn_protos.proto | 1 + .../pb/ApplicationSubmissionContextPBImpl.java | 73 +++++++++++++++++++++- .../yarn/server/resourcemanager/rmapp/RMApp.java | 7 +++ .../server/resourcemanager/rmapp/RMAppImpl.java | 15 ++++- .../scheduler/AppSchedulingInfo.java | 41 ++++++++---- .../scheduler/ApplicationPlacementFactory.java | 62 ++++++++++++++++++ .../scheduler/SchedulerApplicationAttempt.java | 13 +++- .../scheduler/activities/ActivitiesLogger.java | 18 +++--- .../scheduler/capacity/CapacityScheduler.java | 36 ++++++++++- .../capacity/CapacitySchedulerConfiguration.java | 12 ++++ .../scheduler/capacity/LeafQueue.java | 8 +-- .../allocator/RegularContainerAllocator.java | 2 +- .../scheduler/placement/AppPlacementAllocator.java | 8 ++- .../scheduler/placement/CandidateNodeSet.java | 1 + .../placement/LocalityAppPlacementAllocator.java | 32 +++++++--- .../scheduler/placement/MultiNodeLookupPolicy.java | 41 ++++++++++++ .../scheduler/placement/NodeUsageBasedPolicy.java | 51 +++++++++++++++ .../placement/PartitionBasedCandidateNodeSet.java | 54 ++++++++++++++++ .../applicationsmanager/MockAsm.java | 5 ++ .../server/resourcemanager/rmapp/MockRMApp.java | 5 ++ .../scheduler/TestAppSchedulingInfo.java | 7 ++- 22 files changed, 468 insertions(+), 45 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodeUsageBasedPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 38db60cd9e1..4c8a5afda80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -598,4 +598,25 @@ public abstract void setLogAggregationContext( @Unstable public abstract void setApplicationTimeouts( Map applicationTimeouts); + + /** + * Get application scheduling environment variables stored as a key value + * pair map for application. + * + * @return placement envs for application. + */ + @Public + @Unstable + public abstract Map getApplicationSchedulingEnvsMap(); + + /** + * Set the scheduling envs for the application. + * + * @param schedulingEnvMap + * A map of env's for the application placement + */ + @Public + @Unstable + public abstract void setApplicationSchedulingEnvsMap( + Map schedulingEnvMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3a9662b2540..acf666eb0a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -459,6 +459,7 @@ message ApplicationSubmissionContextProto { optional string node_label_expression = 16; repeated ResourceRequestProto am_container_resource_request = 17; repeated ApplicationTimeoutMapProto application_timeouts = 18; + repeated StringStringMapProto application_scheduling_envs = 19; } enum ApplicationTimeoutTypeProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index e3dbf4055be..129d1c7fa3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; import com.google.protobuf.TextFormat; @@ -71,6 +72,7 @@ private LogAggregationContext logAggregationContext = null; private ReservationId reservationId = null; private Map applicationTimeouts = null; + private Map schedulingEnvs = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -141,6 +143,9 @@ private void mergeLocalToBuilder() { if (this.applicationTimeouts != null) { addApplicationTimeouts(); } + if (this.schedulingEnvs != null) { + addApplicationSchedulingEnvs(); + } } private void mergeLocalToProto() { @@ -662,4 +667,70 @@ public void remove() { }; this.builder.addAllApplicationTimeouts(values); } -} + + private void addApplicationSchedulingEnvs() { + maybeInitBuilder(); + builder.clearApplicationSchedulingEnvs(); + if (schedulingEnvs == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = schedulingEnvs.keySet() + .iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public StringStringMapProto next() { + String key = iterator.next(); + return StringStringMapProto.newBuilder() + .setValue(schedulingEnvs.get(key)).setKey(key).build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationSchedulingEnvs(values); + } + + private void initApplicationSchedulingEnvs() { + if (this.schedulingEnvs != null) { + return; + } + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + List envs = p.getApplicationSchedulingEnvsList(); + this.schedulingEnvs = new HashMap(envs.size()); + for (StringStringMapProto envProto : envs) { + this.schedulingEnvs.put(envProto.getKey(), envProto.getValue()); + } + } + + @Override + public Map getApplicationSchedulingEnvsMap() { + initApplicationSchedulingEnvs(); + return this.schedulingEnvs; + } + + @Override + public void setApplicationSchedulingEnvsMap( + Map schedulingEnvMap) { + if (schedulingEnvMap == null) { + return; + } + initApplicationSchedulingEnvs(); + this.schedulingEnvs.clear(); + this.schedulingEnvs.putAll(schedulingEnvMap); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index b357d91cbfe..0a0ff04c129 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -47,6 +47,7 @@ .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; /** * The interface to an Application in the ResourceManager. Take a @@ -311,4 +312,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return ApplicationPlacementContext */ ApplicationPlacementContext getApplicationPlacementContext(); + + /** + * Get the application scheduling envs. + * @return Map of envs related to app scheduling. + */ + Map getApplicationSchedulingEnvs(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 38f666b4c7b..7b002373794 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -82,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; - import org.apache.hadoop.yarn.server.resourcemanager.placement .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -151,6 +150,7 @@ private final Map updatedNodes = new HashMap<>(); private final String applicationType; private final Set applicationTags; + private Map applicationSchedulingEnvs = new HashMap<>(); private final long attemptFailuresValidityInterval; private boolean amBlacklistingEnabled = false; @@ -489,6 +489,13 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.placementContext = placementContext; + // If applications are not explicitly specifying envs, try to pull from + // AM container environment lists. + applicationSchedulingEnvs + .putAll(submissionContext.getAMContainerSpec().getEnvironment()); + applicationSchedulingEnvs + .putAll(submissionContext.getApplicationSchedulingEnvsMap()); + long localLogAggregationStatusTimeout = conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); @@ -2029,10 +2036,14 @@ public void setApplicationPriority(Priority applicationPriority) { /** * Clear Unused fields to free memory. - * @param app */ private void clearUnusedFields() { this.submissionContext.setAMContainerSpec(null); this.submissionContext.setLogAggregationContext(null); } + + @Override + public Map getApplicationSchedulingEnvs() { + return this.applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e47f0c1d614..aecd05bcaea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -62,6 +63,14 @@ private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); + @InterfaceAudience.Private + public static final String ENV_APPLICATION_PLACEMENT_TYPE_CLASS = + "APPLICATION_PLACEMENT_TYPE_CLASS"; + + @InterfaceAudience.Private + public static final Class + DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; + private final ApplicationId applicationId; private final ApplicationAttemptId applicationAttemptId; private final AtomicLong containerIdCounter; @@ -90,10 +99,12 @@ private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; + public final Map applicationSchedulingEnvs; - public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, AbstractUsersManager abstractUsersManager, - long epoch, ResourceUsage appResourceUsage) { + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, + Queue queue, AbstractUsersManager abstractUsersManager, long epoch, + ResourceUsage appResourceUsage, + Map applicationSchedulingEnvs) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -102,6 +113,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.containerIdCounter = new AtomicLong( epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; + this.applicationSchedulingEnvs = applicationSchedulingEnvs; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); updateContext = new ContainerUpdateContext(this); @@ -211,24 +223,27 @@ boolean addRequestToAppPlacement( Map> dedupRequests) { boolean offswitchResourcesUpdated = false; for (Map.Entry> entry : - dedupRequests.entrySet()) { + dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - if (!schedulerKeyToAppPlacementAllocator.containsKey( - schedulerRequestKey)) { + if (!schedulerKeyToAppPlacementAllocator + .containsKey(schedulerRequestKey)) { + AppPlacementAllocator placementAllocatorInstance = ApplicationPlacementFactory + .getAppPlacementAllocator(applicationSchedulingEnvs + .get(ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); + placementAllocatorInstance.setAppSchedulingInfo(this); + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, - new LocalityAppPlacementAllocator<>(this)); + placementAllocatorInstance); } // Update AppPlacementAllocator - PendingAskUpdateResult pendingAmountChanges = - schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) - .updatePendingAsk(entry.getValue().values(), - recoverPreemptedRequestForAContainer); + PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator + .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(), + recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { - updatePendingResources( - pendingAmountChanges, schedulerRequestKey, + updatePendingResources(pendingAmountChanges, schedulerRequestKey, queue.getMetrics()); offswitchResourcesUpdated = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java new file mode 100644 index 00000000000..57b39cb25c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; + +/** + * Factory class to build various application placement policies. + */ +@Public +@Unstable +public class ApplicationPlacementFactory { + + /** + * Get AppPlacementAllocator related to the placement type requested. + * + * @param appPlacementAllocatorName + * allocator class name. + * @return Specific AppPlacementAllocator instance based on type + */ + public static AppPlacementAllocator getAppPlacementAllocator( + String appPlacementAllocatorName) { + Class policyClass; + try { + if (appPlacementAllocatorName == null) { + policyClass = AppSchedulingInfo.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } else { + policyClass = Class.forName(appPlacementAllocatorName); + } + } catch (ClassNotFoundException e) { + policyClass = AppSchedulingInfo.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) { + policyClass = AppSchedulingInfo.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + @SuppressWarnings("unchecked") + AppPlacementAllocator placementAllocatorInstance = (AppPlacementAllocator) ReflectionUtils + .newInstance(policyClass, null); + return placementAllocatorInstance; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dfb0e67fcd9..f02f113e9dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -197,6 +197,8 @@ protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + private Map applicationSchedulingEnvs = new HashMap<>(); + // Not confirmed allocation resource, will be used to avoid too many proposal // rejected because of duplicated allocation private AtomicLong unconfirmedAllocatedMem = new AtomicLong(); @@ -207,9 +209,6 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, RMContext rmContext) { Preconditions.checkNotNull(rmContext, "RMContext should not be null"); this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage); this.queue = queue; this.pendingRelease = Collections.newSetFromMap( new ConcurrentHashMap()); @@ -227,8 +226,12 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, this.logAggregationContext = appSubmissionContext.getLogAggregationContext(); } + applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs(); } + this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, + queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, + applicationSchedulingEnvs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -1434,4 +1437,8 @@ public String getDiagnosticMessage() { return diagnosticMessage; } } + + public Map getApplicationSchedulingEnvs() { + return this.applicationSchedulingEnvs; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 0c351b671a3..88b5424ebcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; /** * Utility for logging scheduler activities @@ -63,7 +64,7 @@ public static void recordRejectedAppActivityFromLeafQueue( SchedulerApplicationAttempt application, Priority priority, String diagnostic) { String type = "app"; - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { @@ -84,7 +85,7 @@ public static void recordAppActivityWithoutAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic, ActivityState appState) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { @@ -157,13 +158,13 @@ public static void recordAppActivityWithAllocation( * update. */ public static void startAppAllocationRecording( - ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, - SchedulerApplicationAttempt application) { - if (activitiesManager == null) { + ActivitiesManager activitiesManager, SchedulerNode node, + long currentTime, SchedulerApplicationAttempt application) { + if (activitiesManager == null || node == null) { return; } - activitiesManager.startAppAllocationRecording(nodeId, currentTime, - application); + activitiesManager.startAppAllocationRecording(node.getNodeID(), + currentTime, application); } /* @@ -211,7 +212,8 @@ public static void recordQueueActivity(ActivitiesManager activitiesManager, if (activitiesManager == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + if (node != null + && activitiesManager.shouldRecordThisNode(node.getNodeID())) { recordActivity(activitiesManager, node, parentQueueName, queueName, null, state, diagnostic, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8de363140fb..ed72b518e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.stat.descriptive.moment.GeometricMean; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; @@ -99,6 +102,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -137,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PartitionBasedCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -240,11 +245,13 @@ public Configuration getConf() { private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private boolean multiNodePlacementEnabled; /** * EXPERT */ private long asyncScheduleInterval; + private static final String ASYNC_SCHEDULER_INTERVAL = CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; @@ -345,6 +352,7 @@ void initScheduler(Configuration configuration) throws this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); + this.multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); // number of threads for async scheduling int maxAsyncSchedulingThreads = this.conf.getInt( @@ -1235,8 +1243,8 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = - new SimpleCandidateNodeSet<>(node); + CandidateNodeSet candidates = getCandidateNodeSet( + node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when @@ -1279,6 +1287,26 @@ private void allocateContainersToNode(NodeId nodeId, } } + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { + CandidateNodeSet candidates = null; + if (!multiNodePlacementEnabled) { + candidates = new SimpleCandidateNodeSet<>(node); + } else { + Set labels = new HashSet<>(); + labels.add(node.getPartition()); + Set nodes = labelManager.getLabelsInfoToNodes(labels) + .get(node.getPartition()); + Map nodesByPartition = new HashMap<>(); + for (NodeId nodeId : nodes) { + nodesByPartition.put(nodeId, getNode(nodeId)); + } + candidates = new PartitionBasedCandidateNodeSet( + nodesByPartition, node.getPartition()); + } + return candidates; + } + /* * Logics of allocate container on a single node (Old behavior) */ @@ -2754,4 +2782,8 @@ private LeafQueue autoCreateLeafQueue( } return autoCreatedLeafQueue; } + + public boolean isMultiNodePlacementEnabled() { + return multiNodePlacementEnabled; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 8aa41ee4a72..0a079a723ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -343,6 +343,13 @@ AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); + @Private + public static final String MULTI_NODE_PLACEMENT_ENABLED = + PREFIX + "multi-node-placement-enabled"; + + @Private + public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -1592,6 +1599,11 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( return userWeights; } + public boolean getMultiNodePlacementEnabled() { + return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, + DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); + } + public boolean getAssignMultipleEnabled() { return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ac1a26ccef2..7445f29f6a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1010,7 +1010,7 @@ private CSAssignment allocateFromReservedContainer(Resource clusterResource, if (null != application) { ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + node, SystemClock.getInstance().getTime(), application); CSAssignment assignment = application.assignContainers(clusterResource, candidates, currentResourceLimits, schedulingMode, reservedContainer); @@ -1076,12 +1076,12 @@ public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = assignmentIterator.next(); ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + node, SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit Resource appReserved = application.getCurrentReservation(); if (needAssignToQueueCheck) { - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, candidates.getPartition(), currentResourceLimits, appReserved, schedulingMode)) { ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( activitiesManager, node, application, application.getPriority(), @@ -1117,7 +1117,7 @@ public CSAssignment assignContainers(Resource clusterResource, userAssignable = false; } else { userAssignable = canAssignToUser(clusterResource, application.getUser(), - userLimit, application, node.getPartition(), currentResourceLimits); + userLimit, application, candidates.getPartition(), currentResourceLimits); if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { cul.canAssign = false; cul.reservation = appReserved; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 264253221ef..1b85f6a497b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -143,7 +143,7 @@ private ContainerAllocation preCheckForNodeCandidateSet( // Is the nodePartition of pending request matches the node's partition // If not match, jump to next priority. - if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(), + if (!appInfo.acceptNodePartition(schedulerKey, candidates.getPartition(), schedulingMode)) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index dcb38aa0054..f55bd983288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -28,7 +29,6 @@ import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; /** @@ -157,4 +157,10 @@ boolean acceptNodePartition(String nodePartition, * Print human-readable requests to LOG debug. */ void showRequests(); + + /** + * Set app scheduling info. + * @param appSchedulingInfo info object. + */ + void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java index 6f127c9f0af..0364d91e999 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import java.util.Iterator; import java.util.Map; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 766827ca300..7f1dcc95d4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -21,6 +21,7 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -34,9 +35,12 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -53,33 +57,42 @@ private final Map resourceRequestMap = new ConcurrentHashMap<>(); private AppSchedulingInfo appSchedulingInfo; + private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; + private MultiNodeLookupPolicy nodeLookupPolicy = null; + private Map cachedNodeSet = null; - public LocalityAppPlacementAllocator(AppSchedulingInfo info) { + public LocalityAppPlacementAllocator() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); - this.appSchedulingInfo = info; } @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( CandidateNodeSet candidateNodeSet) { - // Now only handle the case that single node in the candidateNodeSet - // TODO, Add support to multi-hosts inside candidateNodeSet which is passed - // in. - N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); if (null != singleNode) { + // This condition will hit only when multi-node placement lookup is not + // enabled. return IteratorUtils.singletonIterator(singleNode); } - return IteratorUtils.emptyIterator(); + // Multi-node placement lookup is enabled, and hence could consider sorting + // policies. + if (nodeLookupPolicy == null + || !candidateNodeSet.getAllNodes().equals(cachedNodeSet)) { + this.nodeLookupPolicy = new NodeUsageBasedPolicy( + candidateNodeSet.getAllNodes()); + cachedNodeSet = candidateNodeSet.getAllNodes(); + } + + return nodeLookupPolicy.getPreferredNodeIterator(); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, @@ -419,4 +432,9 @@ public ContainerRequest allocate(SchedulerRequestKey schedulerKey, writeLock.unlock(); } } + + @Override + public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) { + this.appSchedulingInfo = appSchedulingInfo; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java new file mode 100644 index 00000000000..feddf7b523f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Iterator; + +/** + *

+ * This class has the following functionality: + * + *

+ * Provide an interface for MultiNodeLookupPolicy so that different placement + * allocator can choose nodes based on need. + *

+ */ +public interface MultiNodeLookupPolicy { + /** + * Get iterator of preferred node depends on requirement and/or availability + * + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodeUsageBasedPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodeUsageBasedPolicy.java new file mode 100644 index 00000000000..f5363c1d60c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/NodeUsageBasedPolicy.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * A simple CandidateNodeSet which keeps an unordered map + */ +public class NodeUsageBasedPolicy + implements MultiNodeLookupPolicy { + private Set nodeList; + + public NodeUsageBasedPolicy(Map map) { + nodeList = new TreeSet(new Comparator() { + @Override + public int compare(N o1, N o2) { + return o2.getAllocatedResource().compareTo(o1.getAllocatedResource()); + } + }); + nodeList.addAll(map.values()); + } + + @Override + public Iterator getPreferredNodeIterator() { + return nodeList.iterator(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java new file mode 100644 index 00000000000..4e57bde56c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PartitionBasedCandidateNodeSet.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Map; + +/** + * A CandidateNodeSet which keeps an unordered map based on partition. + */ +public class PartitionBasedCandidateNodeSet + implements CandidateNodeSet { + + private Map map; + private String partition; + + public PartitionBasedCandidateNodeSet(Map map, String partition) { + this.map = map; + this.partition = partition; + } + + @Override + public Map getAllNodes() { + return map; + } + + @Override + public long getVersion() { + return 0L; + } + + @Override + public String getPartition() { + return partition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 2aca3752013..72de27cf95a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -254,6 +254,11 @@ public ApplicationPlacementContext getApplicationPlacementContext() { public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationSchedulingEnvs() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 756759957e6..c399368593a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -338,4 +338,9 @@ public ApplicationPlacementContext getApplicationPlacementContext() { public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationSchedulingEnvs() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index bb29889a854..443a990fd7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -47,8 +47,9 @@ public void testBacklistChanged() { FSLeafQueue queue = mock(FSLeafQueue.class); doReturn("test").when(queue).getQueueName(); - AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( - appAttemptId, "test", queue, null, 0, new ResourceUsage()); + AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId, + "test", queue, null, 0, new ResourceUsage(), + null); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -120,7 +121,7 @@ public void testSchedulerKeyAccounting() { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage()); + new ResourceUsage(), null); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); -- 2.14.3 (Apple Git-98)