diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c0265ff..6d0844c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -339,6 +339,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = "WAIT_TIME"; + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY = + YARN_PREFIX + "distributed-scheduling.queue-limit-policy"; + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT = + "MEAN_SIGMA"; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_SIGMA = + YARN_PREFIX + "distributed-scheduling.queue-limit-sigma"; + public static final float DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT = + 1.0f; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MIN = + YARN_PREFIX + "distributed-scheduling.queue-limit-min"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT = 1; + + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MAX = + YARN_PREFIX + "distributed-scheduling.queue-limit-max"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT = 10; + /** Container token expiry for container allocated via * Distributed Scheduling */ public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..4188666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,11 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +69,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -102,6 +107,10 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +206,30 @@ public void setNMTokenMasterKey(MasterKey masterKey) { } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +671,15 @@ public void remove() { builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat(ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..a32ddec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + return Records.newRecord(ContainerQueuingLimit.class); + } + + public abstract int getMaxWaitTime(); + + public abstract void setMaxWaitTime(int waitTime); + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..45fa222 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxWaitTime() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxWaitTime(); + } + + @Override + public void setMaxWaitTime(int waitTime) { + maybeInitBuilder(); + builder.setMaxWaitTime(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..84dfb86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_wait_time = 1; + optional int32 max_queue_length = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0..c8de73b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; @@ -33,4 +34,6 @@ void nodeUpdate(RMNode rmNode); void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); + + ContainerQueuingLimit getContainerQueuingLimit(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..b830537 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -139,4 +139,6 @@ void setRMDelegatedNodeLabelsUpdater( void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + ClusterMonitor getClusterMonitor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..1d418ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -74,6 +74,8 @@ private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private ClusterMonitor clusterMonitor; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +474,13 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public ClusterMonitor getClusterMonitor() { + return this.clusterMonitor; + } + + public void setClusterMonitor(ClusterMonitor clusterMonitor) { + this.clusterMonitor = clusterMonitor; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ebf6027..3d159f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1143,6 +1143,8 @@ protected ApplicationMasterService createApplicationMasterService() { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setClusterMonitor( + distributedSchedulingService.getClusterMonitor()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 902244b..0b361b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -488,6 +488,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + // 6. Send back to the Node Container Queuing Limits + if (this.rmContext.getClusterMonitor() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getClusterMonitor().getContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java index f0235f7..57aea34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java @@ -89,7 +89,6 @@ LogFactory.getLog(DistributedSchedulingService.class); private final ClusterMonitor clusterMonitor; - private final NodeSelector nodeSelector; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); @@ -110,10 +109,33 @@ public DistributedSchedulingService(RMContext rmContext, rmContext.getYarnConfiguration().get( YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); + TopKNodeSelector topKSelector = new TopKNodeSelector(k, topKComputationInterval, comparator); + + QueueTruncationThresholdCalculator.Policy truncatePolicy = + QueueTruncationThresholdCalculator.Policy.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_POLICY, + YarnConfiguration. + DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT)); + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT); + + int limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT); + int limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT); + + QueueTruncationThresholdCalculator thresholdCalculator = + new QueueTruncationThresholdCalculator(truncatePolicy, topKSelector, + sigma, limitMin, limitMax); + + topKSelector.setThresholdCalculator(thresholdCalculator); this.clusterMonitor = topKSelector; - this.nodeSelector = topKSelector; } @Override @@ -153,6 +175,10 @@ public AllocateResponse allocate(AllocateRequest request) throws return super.allocate(request); } + private NodeSelector getNodeSelector() { + return (NodeSelector)this.clusterMonitor; + } + @Override public DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( @@ -202,8 +228,7 @@ public AllocateResponse allocate(AllocateRequest request) throws this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); // Set nodes to be used for scheduling - dsResp.setNodesForScheduling( - new ArrayList<>(this.nodeSelector.selectNodes())); + dsResp.setNodesForScheduling(getNodeSelector().selectNodes()); return dsResp; } @@ -215,8 +240,7 @@ public AllocateResponse allocate(AllocateRequest request) throws (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>( - this.nodeSelector.selectNodes(createSelectionHints(request)))); + getNodeSelector().selectNodes(createSelectionHints(request))); return dsResp; } @@ -272,6 +296,10 @@ private void removeFromMapping(ConcurrentHashMap> mapping, } } + public ClusterMonitor getClusterMonitor() { + return clusterMonitor; + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { @@ -336,6 +364,7 @@ public void handle(SchedulerEvent event) { LOG.error("Unknown event arrived at DistributedSchedulingService: " + event.toString()); } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueTruncationThresholdCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueTruncationThresholdCalculator.java new file mode 100644 index 0000000..14456dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueTruncationThresholdCalculator.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class QueueTruncationThresholdCalculator { + + enum Policy { + MEAN_SIGMA, + MEDIAN_SIGMA + } + + public class Stats { + private final AtomicInteger meanQuant = new AtomicInteger(0); + private final AtomicInteger medianQuant = new AtomicInteger(0); + private final AtomicInteger stdDevQuant = new AtomicInteger(0); + + /** + * Not thread safe.. Caller should synchronize on both topKNodes and + * clusterNodes + */ + public void update() { + List topKNodes = nodeSelector.getTopKNodes(); + if (topKNodes.size() > 0) { + // calculate mean + int sum = 0; + for (NodeId n : topKNodes) { + sum += getQuant(getNode(n)); + } + + // calculate std deviation + meanQuant.set(sum / topKNodes.size()); + int sqrSum = 0; + for (NodeId n : topKNodes) { + int val = getQuant(getNode(n)); + sqrSum += Math.pow(val - meanQuant.get(), 2); + } + stdDevQuant.set((int)Math.sqrt(sqrSum / topKNodes.size())); + + // calculate median + int mid = topKNodes.size() / 2; + int midVal = getQuant(getNode(topKNodes.get(mid))); + if (topKNodes.size() % 2 != 0) { + medianQuant.set(midVal); + } else { + int midVal2 = getQuant(getNode(topKNodes.get(mid - 1))); + medianQuant.set((midVal + midVal2) / 2); + } + } + } + + private TopKNodeSelector.ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getQuant(TopKNodeSelector.ClusterNode cn) { + return ((TopKNodeSelector.TopKComparator)nodeSelector.getComparator()) + .getQuant(cn); + } + + public int getMean() { + return meanQuant.get(); + } + + public int getMedian() { + return medianQuant.get(); + } + + public int getStdDev() { + return stdDevQuant.get(); + } + } + + private final Policy policy; + private final TopKNodeSelector nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + + public QueueTruncationThresholdCalculator( + Policy policy, TopKNodeSelector selector, float sigma, + int rangeMin, int rangeMax) { + this.policy = policy; + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + if (policy == Policy.MEAN_SIGMA) { + return (int) (stats.getMean() + sigma * stats.getStdDev()); + } else { + return (int) (stats.getMedian() + sigma * stats.getStdDev()); + } + } + + void update() { + this.stats.update(); + } + + public int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public Policy getPolicy() { + return policy; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java index 5aedbed..e835d95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; @@ -33,13 +34,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; -import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,7 +49,7 @@ final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); - enum TopKComparator implements Comparator { + public enum TopKComparator implements Comparator { WAIT_TIME, QUEUE_LENGTH; @@ -60,9 +61,25 @@ public int compare(ClusterNode o1, ClusterNode o2) { return getQuant(o1) > getQuant(o2) ? +1 : -1; } - private int getQuant(ClusterNode c) { + public int getQuant(ClusterNode c) { return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; } + + public ContainerQueuingLimit createContainerQueuingLimit( + QueueTruncationThresholdCalculator calculator) { + ContainerQueuingLimit containerQueuingLimit = null; + if (calculator != null) { + containerQueuingLimit = ContainerQueuingLimit.newInstance(); + if (this == WAIT_TIME) { + containerQueuingLimit.setMaxWaitTime(calculator.getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxWaitTime(-1); + containerQueuingLimit.setMaxQueueLength(calculator.getThreshold()); + } + } + return containerQueuingLimit; + } } static class ClusterNode { @@ -93,10 +110,13 @@ public ClusterNode updateTimestamp() { } private final int k; - private final List topKNodes; private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; + + private final List topKNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final TopKComparator comparator; + private QueueTruncationThresholdCalculator thresholdCalculator; Runnable computeTask = new Runnable() { @Override @@ -104,6 +124,9 @@ public void run() { synchronized (topKNodes) { topKNodes.clear(); topKNodes.addAll(computeTopKNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } } } }; @@ -127,6 +150,22 @@ public TopKNodeSelector(int k, long nodeComputationInterval, TimeUnit.MILLISECONDS); } + List getTopKNodes() { + return topKNodes; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void setThresholdCalculator( + QueueTruncationThresholdCalculator thresholdCalculator) { + this.thresholdCalculator = thresholdCalculator; + } @Override public void addNode(List containerStatuses, RMNode @@ -270,4 +309,10 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { return retList; } } + + @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + return this.comparator.createContainerQueuingLimit( + this.thresholdCalculator); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java index a21ae19..c769e1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -184,6 +185,64 @@ public void testQueueLengthSort() { Assert.assertEquals("h4:4", nodeIds.get(3).toString()); } + @Test + public void testContainerQueuingLimit() { + TopKNodeSelector selector = new TopKNodeSelector(5, + TopKNodeSelector.TopKComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + QueueTruncationThresholdCalculator calculator = + new QueueTruncationThresholdCalculator + (QueueTruncationThresholdCalculator + .Policy.MEAN_SIGMA, selector, 0, 0, 100); + selector.setThresholdCalculator(calculator); + ContainerQueuingLimit containerQueuingLimit = selector + .getContainerQueuingLimit(); + Assert.assertEquals(0, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + selector.computeTask.run(); + containerQueuingLimit = selector + .getContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + + // Test Median Calculation + selector.nodeUpdate(createRMNode("h4", 4, -1, 50)); + calculator = new QueueTruncationThresholdCalculator + (QueueTruncationThresholdCalculator.Policy.MEDIAN_SIGMA, + selector, 0, 6, 100); + selector.setThresholdCalculator(calculator); + selector.computeTask.run(); + containerQueuingLimit = selector.getContainerQueuingLimit(); + Assert.assertEquals(12, containerQueuingLimit.getMaxQueueLength()); + + // Test Nothing over Max + selector.nodeUpdate(createRMNode("h1", 1, -1, 110)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 120)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 130)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 140)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 150)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = selector.getContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Nothing below Min + selector.nodeUpdate(createRMNode("h1", 1, -1, 1)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 2)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 3)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 4)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 5)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = selector.getContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { RMNode node1 = Mockito.mock(RMNode.class);