diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c0265ff..6d0844c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -339,6 +339,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = "WAIT_TIME"; + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY = + YARN_PREFIX + "distributed-scheduling.queue-limit-policy"; + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT = + "MEAN_SIGMA"; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_SIGMA = + YARN_PREFIX + "distributed-scheduling.queue-limit-sigma"; + public static final float DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT = + 1.0f; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MIN = + YARN_PREFIX + "distributed-scheduling.queue-limit-min"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT = 1; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MAX = + YARN_PREFIX + "distributed-scheduling.queue-limit-max"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT = 10; + /** Container token expiry for container allocated via * Distributed Scheduling */ public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..4188666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,11 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +69,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -102,6 +107,10 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +206,30 @@ public void setNMTokenMasterKey(MasterKey masterKey) { } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +671,15 @@ public void remove() { builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat(ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..a32ddec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + return Records.newRecord(ContainerQueuingLimit.class); + } + + public abstract int getMaxWaitTime(); + + public abstract void setMaxWaitTime(int waitTime); + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..45fa222 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxWaitTime() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxWaitTime(); + } + + @Override + public void setMaxWaitTime(int waitTime) { + maybeInitBuilder(); + builder.setMaxWaitTime(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..84dfb86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_wait_time = 1; + optional int32 max_queue_length = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0..fb2bfa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..d68e15e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +142,6 @@ void setRMDelegatedNodeLabelsUpdater( void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getContainerQueueLimitCalculator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..cfe7f4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -43,6 +44,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +78,8 @@ private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +478,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getContainerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator(QueueLimitCalculator + queueLimitCalculator) { + this.queueLimitCalculator = queueLimitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ebf6027..3cd58e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1143,6 +1143,8 @@ protected ApplicationMasterService createApplicationMasterService() { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getClusterQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 902244b..85fd222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -488,6 +488,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + // 6. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate (or increase) the number of Containers queued + // for execution. This is used only if Distributed Scheduling is enabled + // on the node + if (this.rmContext.getContainerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getContainerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java index f0235f7..ab47bcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java @@ -89,7 +89,7 @@ LogFactory.getLog(DistributedSchedulingService.class); private final ClusterMonitor clusterMonitor; - private final NodeSelector nodeSelector; + private final QueueLimitCalculator queueLimitCalculator; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); @@ -110,10 +110,33 @@ public DistributedSchedulingService(RMContext rmContext, rmContext.getYarnConfiguration().get( YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); + TopKNodeSelector topKSelector = new TopKNodeSelector(k, topKComputationInterval, comparator); + + QueueLimitCalculator.Policy limitingPolicy = + QueueLimitCalculator.Policy.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_POLICY, + YarnConfiguration. + DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT)); + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT); + + int limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT); + int limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT); + + this.queueLimitCalculator = + new QueueLimitCalculator(limitingPolicy, topKSelector, + sigma, limitMin, limitMax); + + topKSelector.setThresholdCalculator(queueLimitCalculator); this.clusterMonitor = topKSelector; - this.nodeSelector = topKSelector; } @Override @@ -153,6 +176,10 @@ public AllocateResponse allocate(AllocateRequest request) throws return super.allocate(request); } + private NodeSelector getNodeSelector() { + return (NodeSelector)this.clusterMonitor; + } + @Override public DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( @@ -202,8 +229,7 @@ public AllocateResponse allocate(AllocateRequest request) throws this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); // Set nodes to be used for scheduling - dsResp.setNodesForScheduling( - new ArrayList<>(this.nodeSelector.selectNodes())); + dsResp.setNodesForScheduling(getNodeSelector().selectNodes()); return dsResp; } @@ -215,8 +241,7 @@ public AllocateResponse allocate(AllocateRequest request) throws (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>( - this.nodeSelector.selectNodes(createSelectionHints(request)))); + getNodeSelector().selectNodes(createSelectionHints(request))); return dsResp; } @@ -272,6 +297,10 @@ private void removeFromMapping(ConcurrentHashMap> mapping, } } + public QueueLimitCalculator getClusterQueueLimitCalculator() { + return queueLimitCalculator; + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { @@ -336,6 +365,7 @@ public void handle(SchedulerEvent event) { LOG.error("Unknown event arrived at DistributedSchedulingService: " + event.toString()); } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 0000000..b0a50e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .TopKNodeSelector.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .TopKNodeSelector.TopKComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the TopKNodeSelector to keep track of the mean, + * median and corresponding stand deviations of the configured metric (Wait time + * or Queue length) used to characterize the load on the specific node. + * The TopKNodeSelector triggers an update (by calling the 'update()' method) + * every time it performs an re-ordering of all nodes. + */ +public class QueueLimitCalculator { + + enum Policy { + MEAN_SIGMA, + MEDIAN_SIGMA + } + + public class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger median = new AtomicInteger(0); + private final AtomicInteger stdDevMean = new AtomicInteger(0); + private final AtomicInteger stdDevMedian = new AtomicInteger(0); + + /** + * Not thread safe.. Caller should synchronize on topKNodes + */ + public void update() { + List topKNodes = nodeSelector.getTopKNodes(); + if (topKNodes.size() > 0) { + // calculate mean + int sum = 0; + for (NodeId n : topKNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / topKNodes.size()); + + // calculate median + int mid = topKNodes.size() / 2; + int midVal = getMetric(getNode(topKNodes.get(mid))); + if (topKNodes.size() % 2 != 0) { + median.set(midVal); + } else { + int midVal2 = getMetric(getNode(topKNodes.get(mid - 1))); + median.set((midVal + midVal2) / 2); + } + + // calculate std deviation + int sqrSumMean = 0; + int sqrSumMedian = 0; + for (NodeId n : topKNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + sqrSumMedian += Math.pow(val - median.get(), 2); + } + stdDevMean.set((int)Math.sqrt(sqrSumMean / topKNodes.size())); + stdDevMean.set((int)Math.sqrt(sqrSumMedian / topKNodes.size())); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((TopKComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getMedian() { + return median.get(); + } + + public int getStdDevMean() { + return stdDevMean.get(); + } + + public int getStdDevMedian() { + return stdDevMedian.get(); + } + } + + private final Policy policy; + private final TopKNodeSelector nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + + public QueueLimitCalculator( + Policy policy, TopKNodeSelector selector, float sigma, + int rangeMin, int rangeMax) { + this.policy = policy; + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + if (policy == Policy.MEAN_SIGMA) { + return (int) (stats.getMean() + sigma * stats.getStdDevMean()); + } else { + return (int) (stats.getMedian() + sigma * stats.getStdDevMedian()); + } + } + + void update() { + this.stats.update(); + } + + public int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public Policy getPolicy() { + return policy; + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == TopKComparator.WAIT_TIME) { + containerQueuingLimit.setMaxWaitTime(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxWaitTime(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java index 5aedbed..b0adf57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -33,13 +33,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; -import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,19 +48,19 @@ final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); - enum TopKComparator implements Comparator { + public enum TopKComparator implements Comparator { WAIT_TIME, QUEUE_LENGTH; @Override public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { + if (getMetric(o1) == getMetric(o2)) { return o1.timestamp < o2.timestamp ? +1 : -1; } - return getQuant(o1) > getQuant(o2) ? +1 : -1; + return getMetric(o1) > getMetric(o2) ? +1 : -1; } - private int getQuant(ClusterNode c) { + public int getMetric(ClusterNode c) { return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; } } @@ -93,10 +93,13 @@ public ClusterNode updateTimestamp() { } private final int k; - private final List topKNodes; private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; + + private final List topKNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final TopKComparator comparator; + private QueueLimitCalculator thresholdCalculator; Runnable computeTask = new Runnable() { @Override @@ -104,6 +107,9 @@ public void run() { synchronized (topKNodes) { topKNodes.clear(); topKNodes.addAll(computeTopKNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } } } }; @@ -127,6 +133,22 @@ public TopKNodeSelector(int k, long nodeComputationInterval, TimeUnit.MILLISECONDS); } + List getTopKNodes() { + return topKNodes; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void setThresholdCalculator( + QueueLimitCalculator thresholdCalculator) { + this.thresholdCalculator = thresholdCalculator; + } @Override public void addNode(List containerStatuses, RMNode @@ -270,4 +292,5 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { return retList; } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java index a21ae19..1ea9c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -184,6 +185,63 @@ public void testQueueLengthSort() { Assert.assertEquals("h4:4", nodeIds.get(3).toString()); } + @Test + public void testContainerQueuingLimit() { + TopKNodeSelector selector = new TopKNodeSelector(5, + TopKNodeSelector.TopKComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + QueueLimitCalculator calculator = + new QueueLimitCalculator + (QueueLimitCalculator + .Policy.MEAN_SIGMA, selector, 0, 0, 100); + selector.setThresholdCalculator(calculator); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(0, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + + // Test Median Calculation + selector.nodeUpdate(createRMNode("h4", 4, -1, 50)); + calculator = new QueueLimitCalculator + (QueueLimitCalculator.Policy.MEDIAN_SIGMA, + selector, 0, 6, 100); + selector.setThresholdCalculator(calculator); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(12, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not exceed specified max + selector.nodeUpdate(createRMNode("h1", 1, -1, 110)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 120)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 130)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 140)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 150)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.nodeUpdate(createRMNode("h1", 1, -1, 1)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 2)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 3)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 4)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 5)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { RMNode node1 = Mockito.mock(RMNode.class);