diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cdfaaf1..f657d8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -355,6 +355,28 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = "QUEUE_LENGTH"; + /** Policy used to determine Queuing limit for distributed Scheduling */ + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY = + YARN_PREFIX + "distributed-scheduling.queue-limit-policy"; + public static final String DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT = + "MEAN_SIGMA"; + + /** Value of Sigma used for calculation of queue limit */ + public static final String DIST_SCHEDULING_QUEUE_LIMIT_SIGMA = + YARN_PREFIX + "distributed-scheduling.queue-limit-sigma"; + public static final float DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT = + 1.0f; + + /** Min Queue Limit */ + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MIN = + YARN_PREFIX + "distributed-scheduling.queue-limit-min"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT = 1; + + /** Max Queue Limit */ + public static final String DIST_SCHEDULING_QUEUE_LIMIT_MAX = + YARN_PREFIX + "distributed-scheduling.queue-limit-max"; + public static final int DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT = 10; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c92a276..81f09aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -138,6 +138,12 @@ public void initializeMemberVariables() { .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_POLICY); // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..4188666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,11 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +69,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -102,6 +107,10 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +206,30 @@ public void setNMTokenMasterKey(MasterKey masterKey) { } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +671,15 @@ public void remove() { builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat(ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..a32ddec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + return Records.newRecord(ContainerQueuingLimit.class); + } + + public abstract int getMaxWaitTime(); + + public abstract void setMaxWaitTime(int waitTime); + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..45fa222 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxWaitTime() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxWaitTime(); + } + + @Override + public void setMaxWaitTime(int waitTime) { + maybeInitBuilder(); + builder.setMaxWaitTime(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..84dfb86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_wait_time = 1; + optional int32 max_queue_length = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 626d0a1..d521ae9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager + .ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -82,7 +84,7 @@ NodeHealthStatus getNodeHealthStatus(); - ContainerManagementProtocol getContainerManager(); + ContainerManager getContainerManager(); NodeResourceMonitor getNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b48706d..c7496da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -465,7 +466,7 @@ public void run() { private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; - private ContainerManagementProtocol containerManager; + private ContainerManager containerManager; private NodeResourceMonitor nodeResourceMonitor; private final LocalDirsHandlerService dirsHandler; private final ApplicationACLsManager aclsManager; @@ -555,11 +556,11 @@ public void setNodeResourceMonitor(NodeResourceMonitor nodeResourceMonitor) { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return this.containerManager; } - public void setContainerManager(ContainerManagementProtocol containerManager) { + public void setContainerManager(ContainerManager containerManager) { this.containerManager = containerManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..50b511e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.nodemanager; @@ -68,19 +68,28 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords + .RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords + .RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords + .UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager + .ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application + .ApplicationState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; @@ -91,7 +100,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { - public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = + public static final String + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); @@ -112,7 +122,8 @@ private volatile boolean isStopped; private boolean tokenKeepAliveEnabled; private long tokenRemovalDelayMs; - /** Keeps track of when the next keep alive request should be sent for an app*/ + /** Keeps track of when the next keep alive request should be sent for an + * app*/ private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); @@ -135,7 +146,7 @@ private final NodeManagerMetrics metrics; private Runnable statusUpdaterRunnable; - private Thread statusUpdater; + private Thread statusUpdater; private boolean failedToConnect = false; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; private boolean registeredWithRM = false; @@ -170,10 +181,10 @@ protected void serviceInit(Configuration conf) throws Exception { int memoryMb = NodeManagerHardwareUtils.getContainerMemoryMB(conf); float vMemToPMem = conf.getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem); - + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + int virtualMemoryMb = (int) Math.ceil(memoryMb * vMemToPMem); + int virtualCores = NodeManagerHardwareUtils.getVCores(conf); LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB."); LOG.info("Nodemanager resources: vcores set to " + virtualCores + "."); @@ -195,17 +206,17 @@ protected void serviceInit(Configuration conf) throws Exception { // containers stopped during that time. durationToTrackStoppedContainers = conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, - 600000); + 600000); if (durationToTrackStoppedContainers < 0) { String message = "Invalid configuration for " - + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " + + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " + "value is 10Min(600000)."; LOG.error(message); throw new YarnException(message); } if (LOG.isDebugEnabled()) { LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" - + durationToTrackStoppedContainers); + + durationToTrackStoppedContainers); } super.serviceInit(conf); LOG.info("Initialized nodemanager with :" + @@ -214,7 +225,7 @@ protected void serviceInit(Configuration conf) throws Exception { this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); } @Override @@ -242,7 +253,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { // the isStopped check is for avoiding multiple unregistrations. - synchronized(shutdownMonitor) { + synchronized (shutdownMonitor) { if (this.registeredWithRM && !this.isStopped && !isNMUnderSupervisionWithRecoveryEnabled() && !context.getDecommissioned() && !failedToConnect) { @@ -260,7 +271,7 @@ private boolean isNMUnderSupervisionWithRecoveryEnabled() { return config.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED) && config.getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, - YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED); + YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED); } private void unRegisterNM() { @@ -279,8 +290,8 @@ private void unRegisterNM() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. - synchronized(shutdownMonitor) { - if(this.isStopped) { + synchronized (shutdownMonitor) { + if (this.isStopped) { LOG.info("Currently being shutdown. Aborting reboot"); return; } @@ -289,7 +300,8 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { try { statusUpdater.join(); registerWithRM(); - statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status " + + "Updater"); statusUpdater.start(); this.isStopped = false; LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); @@ -303,7 +315,7 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { @VisibleForTesting protected void stopRMProxy() { - if(this.resourceTracker != null) { + if (this.resourceTracker != null) { RPC.stopProxy(this.resourceTracker); } } @@ -325,8 +337,9 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { RegisterNodeManagerResponse regNMResponse; - Set nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); - + Set nodeLabels = nodeLabelsHandler + .getNodeLabelsForRegistration(); + // Synchronize NM-RM registration with // ContainerManagerImpl#increaseContainersResource and // ContainerManagerImpl#startContainers to avoid race condition @@ -334,7 +347,8 @@ protected void registerWithRM() synchronized (this.context) { List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, + RegisterNodeManagerRequest.newInstance(nodeId, httpPort, + totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), nodeLabels); if (containerReports != null) { @@ -352,13 +366,14 @@ protected void registerWithRM() "Message from ResourceManager: " + regNMResponse.getDiagnosticsMessage(); throw new YarnRuntimeException( - "Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, " - + message); + "Recieved SHUTDOWN signal from Resourcemanager, Registration of " + + "NodeManager failed, " + + message); } // if ResourceManager version is too old then shutdown - if (!minimumResourceManagerVersion.equals("NONE")){ - if (minimumResourceManagerVersion.equals("EqualToNM")){ + if (!minimumResourceManagerVersion.equals("NONE")) { + if (minimumResourceManagerVersion.equals("EqualToNM")) { minimumResourceManagerVersion = nodeManagerVersionId; } String rmVersion = regNMResponse.getRMVersion(); @@ -368,9 +383,10 @@ protected void registerWithRM() throw new YarnRuntimeException("Shutting down the Node Manager. " + message); } - if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) { + if (VersionUtil.compareVersions(rmVersion, + minimumResourceManagerVersion) < 0) { String message = "The Resource Manager's version (" - + rmVersion +") is less than the minimum " + + rmVersion + ") is less than the minimum " + "allowed version " + minimumResourceManagerVersion; throw new YarnRuntimeException("Shutting down the Node Manager on RM " + "version error, " + message); @@ -385,7 +401,7 @@ protected void registerWithRM() if (masterKey != null) { this.context.getContainerTokenSecretManager().setMasterKey(masterKey); } - + masterKey = regNMResponse.getNMTokenMasterKey(); if (masterKey != null) { this.context.getNMTokenSecretManager().setMasterKey(masterKey); @@ -402,7 +418,7 @@ protected void registerWithRM() LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); ((ContainerManagerImpl) this.context.getContainerManager()) - .setBlockNewContainerRequests(false); + .setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -412,7 +428,7 @@ protected void registerWithRM() List appList = new ArrayList(); for (Iterator> i = - this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) { + this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext(); ) { Entry e = i.next(); ApplicationId appId = e.getKey(); Long nextKeepAlive = e.getValue(); @@ -435,7 +451,7 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); nodeHealthStatus.setLastHealthReportTime(healthChecker - .getLastHealthReportTime()); + .getLastHealthReportTime()); if (LOG.isDebugEnabled()) { LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() + ", " + nodeHealthStatus.getHealthReport()); @@ -447,8 +463,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { = getIncreasedContainers(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization, nodeUtilization, increasedContainers); nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus()); return nodeStatus; @@ -460,6 +476,7 @@ private QueuedContainersStatus getQueuedContainerStatus() { this.context.getQueuingContext().getQueuedContainers().size()); return status; } + /** * Get the aggregated utilization of the containers in this node. * @return Resource utilization of all the containers. @@ -486,10 +503,10 @@ private ResourceUtilization getNodeUtilization() { * NM-RM heartbeat. */ private List - getIncreasedContainers() { + getIncreasedContainers() { List increasedContainers = new ArrayList<>( - this.context.getIncreasedContainers().values()); + this.context.getIncreasedContainers().values()); for (org.apache.hadoop.yarn.api.records.Container container : increasedContainers) { this.context.getIncreasedContainers().remove(container.getId()); @@ -537,7 +554,7 @@ private ResourceUtilization getNodeUtilization() { } return containerStatuses; } - + private List getRunningApplications() { List runningApplications = new ArrayList(); runningApplications.addAll(this.context.getApplications().keySet()); @@ -567,7 +584,7 @@ private ResourceUtilization getNodeUtilization() { } } LOG.info("Sending out " + containerStatuses.size() - + " NM container statuses: " + containerStatuses); + + " NM container statuses: " + containerStatuses); return containerStatuses; } @@ -613,7 +630,8 @@ public void removeOrTrackCompletedContainersFromContext( if (nmContainer == null) { iter.remove(); } else if (nmContainer.getContainerState().equals( - org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { + org.apache.hadoop.yarn.server.nodemanager.containermanager + .container.ContainerState.DONE)) { context.getContainers().remove(containerId); removedContainers.add(containerId); iter.remove(); @@ -639,8 +657,8 @@ private void trackAppForKeepAlive(ApplicationId appId) { // Next keepAlive request for app between 0.7 & 0.9 of when the token will // likely expire. long nextTime = System.currentTimeMillis() - + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs - * keepAliveDelayRandom.nextInt(100))/100); + + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs + * keepAliveDelayRandom.nextInt(100)) / 100); appTokenKeepAliveMap.put(appId, nextTime); } @@ -693,7 +711,7 @@ public void removeVeryOldStoppedContainersFromCache() { } } } - + @Override public long getRMIdentifier() { return this.rmIdentifier; @@ -703,7 +721,8 @@ public long getRMIdentifier() { Map systemCredentials) throws IOException { Map map = new HashMap(); - for (Map.Entry entry : systemCredentials.entrySet()) { + for (Map.Entry entry : systemCredentials + .entrySet()) { Credentials credentials = new Credentials(); DataInputByteBuffer buf = new DataInputByteBuffer(); ByteBuffer buffer = entry.getValue(); @@ -748,7 +767,7 @@ public void run() { // pull log aggregation status for application running in this NM List logAggregationReports = getLogAggregationReportsForApps(context - .getLogAggregationStatusForApps()); + .getLogAggregationStatusForApps()); if (logAggregationReports != null && !logAggregationReports.isEmpty()) { request.setLogAggregationReportsForApps(logAggregationReports); @@ -761,7 +780,8 @@ public void run() { updateMasterKeys(response); if (response.getNodeAction() == NodeAction.SHUTDOWN) { - LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" + LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part" + + " of" + " heartbeat, hence shutting down."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); @@ -792,7 +812,7 @@ public void run() { // when NM re-registers with RM. // Only remove the cleanedup containers that are acked removeOrTrackCompletedContainersFromContext(response - .getContainersToBeRemovedFromNM()); + .getContainersToBeRemovedFromNM()); logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); @@ -801,7 +821,7 @@ public void run() { if (!containersToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); + CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); } List appsToCleanup = response.getApplicationsToCleanup(); @@ -817,7 +837,8 @@ public void run() { response.getSystemCredentialsForApps(); if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context) - .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); + .setSystemCrendentialsForApps(parseCredentials + (systemCredentials)); } List @@ -830,13 +851,21 @@ public void run() { // SignalContainer request originally comes from end users via // ClientRMProtocol's SignalContainer. Forward the request to - // ContainerManager which will dispatch the event to ContainerLauncher. + // ContainerManager which will dispatch the event to + // ContainerLauncher. List containersToSignal = response .getContainersToSignalList(); if (containersToSignal.size() != 0) { dispatcher.getEventHandler().handle( new CMgrSignalContainersEvent(containersToSignal)); } + + // Update QueuingLimits if ContainerManager supports queueing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimits(queuingLimit); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( @@ -853,7 +882,7 @@ public void run() { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : - nextHeartBeatInterval; + nextHeartBeatInterval; try { heartbeatMonitor.wait(nextHeartBeatInterval); } catch (InterruptedException e) { @@ -869,9 +898,10 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); if (updatedMasterKey != null) { // Will be non-null only on roll-over on RM side - context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey); + context.getContainerTokenSecretManager().setMasterKey + (updatedMasterKey); } - + updatedMasterKey = response.getNMTokenMasterKey(); if (updatedMasterKey != null) { context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); @@ -1005,7 +1035,7 @@ public String verifyRMRegistrationResponseForNodeLabels( String errorMsgFromRM = regNMResponse.getDiagnosticsMessage(); LOG.error( "NodeLabels sent from NM while registration were rejected by RM. " - + ((errorMsgFromRM == null) + + ((errorMsgFromRM == null) ? "Seems like RM is configured with Centralized Labels." : "And with message " + regNMResponse.getDiagnosticsMessage())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java new file mode 100644 index 0000000..696b700 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; + +public interface ContainerManager extends ServiceStateChangeListener, + ContainerManagementProtocol, + EventHandler { + + void updateQueuingLimits(ContainerQueuingLimit queuingLimit); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 29ab7f9..f4f4670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -150,8 +151,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements - ServiceStateChangeListener, ContainerManagementProtocol, - EventHandler { + ContainerManager { /** * Extra duration to wait for applications to be killed on shutdown. @@ -1428,4 +1428,9 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) { protected boolean isServiceStopped() { return serviceStopped; } + + @Override + public void updateQueuingLimits(ContainerQueuingLimit queuingLimit) { + LOG.trace("Implementation does not support queuing of Containers !!"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index ef4e571..f43dc15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -80,6 +81,7 @@ private Queue queuedOpportunisticContainers; private Set opportunisticContainersToKill; + private final ContainerQueuingLimit queuingLimit; public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -92,6 +94,9 @@ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet()); + this.queuingLimit = ContainerQueuingLimit.newInstance(); + this.queuingLimit.setMaxQueueLength(-1); + this.queuingLimit.setMaxWaitTime(-1); } @Override @@ -487,6 +492,44 @@ public void handle(ApplicationEvent event) { } } + @Override + public void updateQueuingLimits(ContainerQueuingLimit queuingLimit) { + int maxQueueLength = queuingLimit.getMaxQueueLength(); + synchronized (this.queuingLimit) { + if (maxQueueLength > -1 + && maxQueueLength < this.queuingLimit.getMaxQueueLength()) { + this.queuingLimit.setMaxQueueLength(queuingLimit.getMaxQueueLength()); + } + } + pruneOpportunisticContainerQueue(); + } + + private void pruneOpportunisticContainerQueue() { + int counter = this.queuingLimit.getMaxQueueLength(); + Iterator iterator = queuedOpportunisticContainers + .iterator(); + while (iterator.hasNext()) { + AllocatedContainerInfo cInfo = iterator.next(); + if (counter <= 0) { + iterator.remove(); + ContainerTokenIdentifier containerTokenIdentifier = this.context + .getQueuingContext().getQueuedContainers().remove( + cInfo.getContainerTokenIdentifier().getContainerID()); + // The Container might have already started while we were + // iterating.. + if (containerTokenIdentifier != null) { + this.context.getQueuingContext().getKilledQueuedContainers() + .putIfAbsent(cInfo.getContainerTokenIdentifier(), + "Container De-queued to meet global queuing limits. " + + "Max Queue length[" + + this.queuingLimit.getMaxQueueLength() + "]"); + } + } + counter--; + } + } + + static class AllocatedContainerInfo { private final ContainerTokenIdentifier containerTokenIdentifier; private final NMTokenIdentifier nmTokenIdentifier; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index ce405f8..7b3eb90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -64,6 +64,8 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager + .ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -642,7 +644,7 @@ public NodeHealthStatus getNodeHealthStatus() { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0..fb2bfa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 170d91a..65a25a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed .TopKNodeSelector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -77,6 +79,7 @@ LogFactory.getLog(DistributedSchedulingService.class); private final TopKNodeSelector clusterMonitor; + private final QueueLimitCalculator queueLimitCalculator; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); @@ -97,8 +100,32 @@ public DistributedSchedulingService(RMContext rmContext, rmContext.getYarnConfiguration().get( YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); + TopKNodeSelector topKSelector = new TopKNodeSelector(k, topKComputationInterval, comparator); + + QueueLimitCalculator.Policy limitingPolicy = + QueueLimitCalculator.Policy.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_POLICY, + YarnConfiguration. + DIST_SCHEDULING_QUEUE_LIMIT_POLICY_DEFAULT)); + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_SIGMA_DEFAULT); + + int limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MIN_DEFAULT); + int limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX, + YarnConfiguration.DIST_SCHEDULING_QUEUE_LIMIT_MAX_DEFAULT); + + this.queueLimitCalculator = + new QueueLimitCalculator(limitingPolicy, topKSelector, + sigma, limitMin, limitMax); + + topKSelector.setThresholdCalculator(queueLimitCalculator); this.clusterMonitor = topKSelector; } @@ -188,8 +215,7 @@ public AllocateResponse allocate(AllocateRequest request) throws this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); // Set nodes to be used for scheduling - dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + dsResp.setNodesForScheduling(this.clusterMonitor.selectNodes()); return dsResp; } @@ -200,8 +226,7 @@ public AllocateResponse allocate(AllocateRequest request) throws DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); - dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + dsResp.setNodesForScheduling(this.clusterMonitor.selectNodes()); return dsResp; } @@ -226,6 +251,10 @@ private void removeFromMapping(ConcurrentHashMap> mapping, } } + public QueueLimitCalculator getClusterQueueLimitCalculator() { + return queueLimitCalculator; + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { @@ -290,6 +319,7 @@ public void handle(SchedulerEvent event) { LOG.error("Unknown event arrived at DistributedSchedulingService: " + event.toString()); } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..d68e15e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +142,6 @@ void setRMDelegatedNodeLabelsUpdater( void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getContainerQueueLimitCalculator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..cfe7f4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -43,6 +44,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +78,8 @@ private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +478,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getContainerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator(QueueLimitCalculator + queueLimitCalculator) { + this.queueLimitCalculator = queueLimitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6c80a58..9f9ce52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1154,6 +1154,8 @@ protected ApplicationMasterService createApplicationMasterService() { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getClusterQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..07d1157 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -536,6 +536,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + // 6. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate (or increase) the number of Containers queued + // for execution. This is used only if Distributed Scheduling is enabled + // on the node + if (this.rmContext.getContainerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getContainerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 0000000..0d14cc9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .TopKNodeSelector.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .TopKNodeSelector.TopKComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the TopKNodeSelector to keep track of the mean, + * median and corresponding stand deviations of the configured metric (Wait time + * or Queue length) used to characterize the load on the specific node. + * The TopKNodeSelector triggers an update (by calling the 'update()' method) + * every time it performs an re-ordering of all nodes. + */ +public class QueueLimitCalculator { + + public enum Policy { + MEAN_SIGMA, + MEDIAN_SIGMA + } + + public class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger median = new AtomicInteger(0); + private final AtomicInteger stdDevMean = new AtomicInteger(0); + private final AtomicInteger stdDevMedian = new AtomicInteger(0); + + /** + * Not thread safe.. Caller should synchronize on topKNodes + */ + public void update() { + List topKNodes = nodeSelector.getTopKNodes(); + if (topKNodes.size() > 0) { + // calculate mean + int sum = 0; + for (NodeId n : topKNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / topKNodes.size()); + + // calculate median + int mid = topKNodes.size() / 2; + int midVal = getMetric(getNode(topKNodes.get(mid))); + if (topKNodes.size() % 2 != 0) { + median.set(midVal); + } else { + int midVal2 = getMetric(getNode(topKNodes.get(mid - 1))); + median.set((midVal + midVal2) / 2); + } + + // calculate std deviation + int sqrSumMean = 0; + int sqrSumMedian = 0; + for (NodeId n : topKNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + sqrSumMedian += Math.pow(val - median.get(), 2); + } + stdDevMean.set((int)Math.sqrt(sqrSumMean / topKNodes.size())); + stdDevMean.set((int)Math.sqrt(sqrSumMedian / topKNodes.size())); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((TopKComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getMedian() { + return median.get(); + } + + public int getStdDevMean() { + return stdDevMean.get(); + } + + public int getStdDevMedian() { + return stdDevMedian.get(); + } + } + + private final Policy policy; + private final TopKNodeSelector nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + + public QueueLimitCalculator( + Policy policy, TopKNodeSelector selector, float sigma, + int rangeMin, int rangeMax) { + this.policy = policy; + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + if (policy == Policy.MEAN_SIGMA) { + return (int) (stats.getMean() + sigma * stats.getStdDevMean()); + } else { + return (int) (stats.getMedian() + sigma * stats.getStdDevMedian()); + } + } + + void update() { + this.stats.update(); + } + + public int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public Policy getPolicy() { + return policy; + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == TopKComparator.WAIT_TIME) { + containerQueuingLimit.setMaxWaitTime(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxWaitTime(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java index 7e24687..b6f24fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -31,8 +31,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -47,13 +48,13 @@ @Override public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { + if (getMetric(o1) == getMetric(o2)) { return o1.timestamp < o2.timestamp ? +1 : -1; } - return getQuant(o1) > getQuant(o2) ? +1 : -1; + return getMetric(o1) > getMetric(o2) ? +1 : -1; } - private int getQuant(ClusterNode c) { + public int getMetric(ClusterNode c) { return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; } } @@ -86,10 +87,13 @@ public ClusterNode updateTimestamp() { } private final int k; - private final List topKNodes; private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; + + private final List topKNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final TopKComparator comparator; + private QueueLimitCalculator thresholdCalculator; Runnable computeTask = new Runnable() { @Override @@ -97,6 +101,9 @@ public void run() { synchronized (topKNodes) { topKNodes.clear(); topKNodes.addAll(computeTopKNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } } } }; @@ -120,6 +127,22 @@ public TopKNodeSelector(int k, long nodeComputationInterval, TimeUnit.MILLISECONDS); } + List getTopKNodes() { + return topKNodes; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void setThresholdCalculator( + QueueLimitCalculator thresholdCalculator) { + this.thresholdCalculator = thresholdCalculator; + } @Override public void addNode(List containerStatuses, RMNode @@ -220,4 +243,5 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { return retList; } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java index aec4e86..30ff958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.junit.Assert; @@ -130,6 +131,63 @@ public void testQueueLengthSort() { Assert.assertEquals("h4:4", nodeIds.get(3).toString()); } + @Test + public void testContainerQueuingLimit() { + TopKNodeSelector selector = new TopKNodeSelector(5, + TopKNodeSelector.TopKComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + QueueLimitCalculator calculator = + new QueueLimitCalculator + (QueueLimitCalculator + .Policy.MEAN_SIGMA, selector, 0, 0, 100); + selector.setThresholdCalculator(calculator); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(0, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTime()); + + // Test Median Calculation + selector.nodeUpdate(createRMNode("h4", 4, -1, 50)); + calculator = new QueueLimitCalculator + (QueueLimitCalculator.Policy.MEDIAN_SIGMA, + selector, 0, 6, 100); + selector.setThresholdCalculator(calculator); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(12, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not exceed specified max + selector.nodeUpdate(createRMNode("h1", 1, -1, 110)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 120)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 130)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 140)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 150)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.nodeUpdate(createRMNode("h1", 1, -1, 1)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 2)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 3)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 4)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 5)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { RMNode node1 = Mockito.mock(RMNode.class);