diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 965b6c5..cbfabfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -344,17 +344,43 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "distributed-scheduling.top-k"; public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; - /** Frequency for computing Top K Best Nodes */ - public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = - YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; - public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; + /** Frequency for computing Least Loaded Nodes */ + public static final String NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS = + YARN_PREFIX + "nm-container-queuing.node-sort-compute-interval-ms"; + public static final long NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_DEFAULT = + 1000; /** Comparator for determining Node Load for Distributed Scheduling */ - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR = - YARN_PREFIX + "distributed-scheduling.top-k-comparator"; - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR = + YARN_PREFIX + "nm-container-queuing.load-comparator"; + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT = "QUEUE_LENGTH"; + /** Value of Sigma used for calculation of queue limit thresholds */ + public static final String NM_CONTAINER_QUEUING_LIMIT_SIGMA = + YARN_PREFIX + "nm-container-queuing.queue-limit-sigma"; + public static final float NM_CONTAINER_QUEUING_LIMIT_SIGMA_DEFAULT = + 1.0f; + + /** Min Queue Length */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.min-queue-length"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1; + + /** Max Queue Length */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.max-queue-length"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; + + /** Min Queue Length */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_DEFAULT = 1; + + /** Max Queue Length */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_DEFAULT = 10; /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c92a276..79ec041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -135,9 +135,19 @@ public void initializeMemberVariables() { configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA); // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..2e05167 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +68,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -102,6 +106,10 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +205,30 @@ public void setNMTokenMasterKey(MasterKey masterKey) { } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +670,15 @@ public void remove() { builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat(ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..06a0a57 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + ContainerQueuingLimit containerQueuingLimit = Records.newRecord + (ContainerQueuingLimit.class); + containerQueuingLimit.setMaxQueueLength(-1); + containerQueuingLimit.setMaxWaitTimeInMs(-1); + return containerQueuingLimit; + } + + public abstract int getMaxWaitTimeInMs(); + + public abstract void setMaxWaitTimeInMs(int waitTime); + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..27d924c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxWaitTimeInMs() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxWaitTimeInMs(); + } + + @Override + public void setMaxWaitTimeInMs(int waitTime) { + maybeInitBuilder(); + builder.setMaxWaitTimeInMs(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..46089a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_wait_time_in_ms = 1; + optional int32 max_queue_length = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 626d0a1..cfcf1bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -82,7 +82,7 @@ NodeHealthStatus getNodeHealthStatus(); - ContainerManagementProtocol getContainerManager(); + ContainerManager getContainerManager(); NodeResourceMonitor getNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b48706d..c7496da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -465,7 +466,7 @@ public void run() { private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; - private ContainerManagementProtocol containerManager; + private ContainerManager containerManager; private NodeResourceMonitor nodeResourceMonitor; private final LocalDirsHandlerService dirsHandler; private final ApplicationACLsManager aclsManager; @@ -555,11 +556,11 @@ public void setNodeResourceMonitor(NodeResourceMonitor nodeResourceMonitor) { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return this.containerManager; } - public void setContainerManager(ContainerManagementProtocol containerManager) { + public void setContainerManager(ContainerManager containerManager) { this.containerManager = containerManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..dc864aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -401,8 +402,7 @@ protected void registerWithRM() LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); - ((ContainerManagerImpl) this.context.getContainerManager()) - .setBlockNewContainerRequests(false); + this.context.getContainerManager().setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -465,10 +465,8 @@ private QueuedContainersStatus getQueuedContainerStatus() { * @return Resource utilization of all the containers. */ private ResourceUtilization getContainersUtilization() { - ContainerManagerImpl containerManager = - (ContainerManagerImpl) this.context.getContainerManager(); ContainersMonitor containersMonitor = - containerManager.getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); return containersMonitor.getContainersUtilization(); } @@ -837,6 +835,13 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrSignalContainersEvent(containersToSignal)); } + + // Update QueuingLimits if ContainerManager supports queuing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimit(queuingLimit); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java new file mode 100644 index 0000000..92157b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitor; + +public interface ContainerManager extends ServiceStateChangeListener, + ContainerManagementProtocol, + EventHandler { + + void updateQueuingLimit(ContainerQueuingLimit queuingLimit); + + ContainersMonitor getContainersMonitor(); + + void setBlockNewContainerRequests(boolean blockNewContainerRequests); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 162823c..2e61bb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -150,8 +151,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements - ServiceStateChangeListener, ContainerManagementProtocol, - EventHandler { + ContainerManager { /** * Extra duration to wait for applications to be killed on shutdown. @@ -402,6 +402,7 @@ protected LogHandler createLogHandler(Configuration conf, Context context, } } + @Override public ContainersMonitor getContainersMonitor() { return this.containersMonitor; } @@ -1391,6 +1392,7 @@ public void handle(ContainerManagerEvent event) { } } + @Override public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { this.blockNewContainerRequests.set(blockNewContainerRequests); } @@ -1427,4 +1429,9 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) { protected boolean isServiceStopped() { return serviceStopped; } + + @Override + public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { + LOG.trace("Implementation does not support queuing of Containers !!"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index ef4e571..72e493c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -80,6 +81,7 @@ private Queue queuedOpportunisticContainers; private Set opportunisticContainersToKill; + private final ContainerQueuingLimit queuingLimit; public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -92,6 +94,7 @@ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet()); + this.queuingLimit = ContainerQueuingLimit.newInstance(); } @Override @@ -487,6 +490,41 @@ public void handle(ApplicationEvent event) { } } + @Override + public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { + this.queuingLimit.setMaxQueueLength(queuingLimit.getMaxQueueLength()); + // TODO: Include wait time as well once its implemented + if (this.queuingLimit.getMaxQueueLength() > -1) { + shedQueuedOpportunisticContainers(); + } + } + + private void shedQueuedOpportunisticContainers() { + int numAllowed = this.queuingLimit.getMaxQueueLength(); + Iterator containerIter = queuedOpportunisticContainers + .iterator(); + while (containerIter.hasNext()) { + AllocatedContainerInfo cInfo = containerIter.next(); + if (numAllowed <= 0) { + containerIter.remove(); + ContainerTokenIdentifier containerTokenIdentifier = this.context + .getQueuingContext().getQueuedContainers().remove( + cInfo.getContainerTokenIdentifier().getContainerID()); + // The Container might have already started while we were + // iterating.. + if (containerTokenIdentifier != null) { + this.context.getQueuingContext().getKilledQueuedContainers() + .putIfAbsent(cInfo.getContainerTokenIdentifier(), + "Container De-queued to meet global queuing limits. " + + "Max Queue length[" + + this.queuingLimit.getMaxQueueLength() + "]"); + } + } + numAllowed--; + } + } + + static class AllocatedContainerInfo { private final ContainerTokenIdentifier containerTokenIdentifier; private final NMTokenIdentifier nmTokenIdentifier; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index ce405f8..6ff74c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -642,7 +643,7 @@ public NodeHealthStatus getNodeHealthStatus() { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 170d91a..2f427d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed - .TopKNodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -57,7 +59,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -76,30 +77,67 @@ private static final Log LOG = LogFactory.getLog(DistributedSchedulingService.class); - private final TopKNodeSelector clusterMonitor; + private final NodeManagerQueueMonitor nodeMonitor; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); private final ConcurrentHashMap> hostToNode = new ConcurrentHashMap<>(); + private final int k; public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - int k = rmContext.getYarnConfiguration().getInt( + this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.DIST_SCHEDULING_TOP_K, YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); - long topKComputationInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); - TopKNodeSelector.TopKComparator comparator = - TopKNodeSelector.TopKComparator.valueOf( + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS, + YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_DEFAULT); + NodeManagerQueueMonitor.LoadComparator comparator = + NodeManagerQueueMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); - TopKNodeSelector topKSelector = - new TopKNodeSelector(k, topKComputationInterval, comparator); - this.clusterMonitor = topKSelector; + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeManagerQueueMonitor topKSelector = + new NodeManagerQueueMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA_DEFAULT); + + int limitMin = -1; + int limitMax = -1; + switch (comparator) { + case QUEUE_LENGTH: + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + break; + case WAIT_TIME: + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_DEFAULT); + break; + default: + break; + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; } @Override @@ -189,7 +227,7 @@ public AllocateResponse allocate(AllocateRequest request) throws // Set nodes to be used for scheduling dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -201,7 +239,7 @@ public AllocateResponse allocate(AllocateRequest request) throws (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -234,7 +272,7 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), nodeAddedEvent.getAddedRMNode().getNodeID()); @@ -247,7 +285,7 @@ public void handle(SchedulerEvent event) { } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); removeFromMapping(rackToNode, nodeRemovedEvent.getRemovedRMNode().getRackName(), nodeRemovedEvent.getRemovedRMNode().getNodeID()); @@ -260,7 +298,7 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); + nodeMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); break; case NODE_RESOURCE_UPDATE: if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { @@ -268,7 +306,7 @@ public void handle(SchedulerEvent event) { } NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = (NodeResourceUpdateSchedulerEvent)event; - clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); break; @@ -290,6 +328,10 @@ public void handle(SchedulerEvent event) { LOG.error("Unknown event arrived at DistributedSchedulingService: " + event.toString()); } + } + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..b063ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +141,6 @@ void setRMDelegatedNodeLabelsUpdater( void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getNodeManagerQueueLimitCalculator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..5798528 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -43,6 +44,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +78,8 @@ private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +478,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator(QueueLimitCalculator + queueLimitCalculator) { + this.queueLimitCalculator = queueLimitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6c80a58..f9d3325 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1154,6 +1154,8 @@ protected ApplicationMasterService createApplicationMasterService() { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getNodeManagerQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..d306b60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -536,6 +536,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + // 6. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate the number of Containers queued for execution. + if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getNodeManagerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java new file mode 100644 index 0000000..688b899 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class NodeManagerQueueMonitor implements ClusterMonitor { + + final static Log LOG = LogFactory.getLog(NodeManagerQueueMonitor.class); + + public enum LoadComparator implements Comparator { + WAIT_TIME, + QUEUE_LENGTH; + + @Override + public int compare(ClusterNode o1, ClusterNode o2) { + if (getMetric(o1) == getMetric(o2)) { + return o1.timestamp < o2.timestamp ? +1 : -1; + } + return getMetric(o1) > getMetric(o2) ? +1 : -1; + } + + public int getMetric(ClusterNode c) { + return (this == WAIT_TIME) ? c.waitTime : c.queueLength; + } + } + + static class ClusterNode { + int waitTime = -1; + int queueLength = 0; + double timestamp; + final NodeId nodeId; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + updateTimestamp(); + } + + public ClusterNode setWaitTime(int waitTime) { + this.waitTime = waitTime; + return this; + } + + public ClusterNode setQueueLength(int queueLength) { + this.queueLength = queueLength; + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + } + + private final ScheduledExecutorService scheduledExecutor; + + private final List sortedNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final LoadComparator comparator; + private QueueLimitCalculator thresholdCalculator; + + Runnable computeTask = new Runnable() { + @Override + public void run() { + synchronized (sortedNodes) { + sortedNodes.clear(); + sortedNodes.addAll(sortNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } + } + } + }; + + @VisibleForTesting + NodeManagerQueueMonitor(LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.comparator = comparator; + this.scheduledExecutor = null; + } + + public NodeManagerQueueMonitor(long nodeComputationInterval, + LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.comparator = comparator; + this.scheduledExecutor.scheduleAtFixedRate(computeTask, + nodeComputationInterval, nodeComputationInterval, + TimeUnit.MILLISECONDS); + } + + List getSortedNodes() { + return sortedNodes; + } + + public QueueLimitCalculator getThresholdCalculator() { + return thresholdCalculator; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { + this.thresholdCalculator = + new QueueLimitCalculator(this, sigma, limitMin, limitMax); + } + + @Override + public void addNode(List containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { + this.clusterNodes.remove(removedRMNode.getNodeID()); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + } else { + LOG.debug("Node not in list!"); + } + } + } + + @Override + public void nodeUpdate(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNodeID()); + QueuedContainersStatus queuedContainersStatus = + rmNode.getQueuedContainersStatus(); + int estimatedQueueWaitTime = + queuedContainersStatus.getEstimatedQueueWaitTime(); + int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node + // UNLESS comparator is based on queue length, in which case, we should add + synchronized (this.clusterNodes) { + ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); + if (currentNode == null) { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength)); + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } + } else { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + currentNode + .setWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .updateTimestamp(); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + currentNode.waitTime + "] and " + + "wait queue length [" + currentNode.queueLength + "]"); + } + } + } + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently... + } + + /** + * Returns all Node Ids as ordered list from Least to Most Loaded + * @return ordered list of nodes + */ + public List selectNodes() { + return selectLeastLoadedNodes(-1); + } + + /** + * Returns 'K' of the least Loaded Node Ids as ordered list + * @param k max number of nodes to return + * @return ordered list of nodes + */ + public List selectLeastLoadedNodes(int k) { + synchronized (this.sortedNodes) { + return ((k < this.sortedNodes.size()) && (k >= 0)) ? + new ArrayList<>(this.sortedNodes).subList(0, k) : + new ArrayList<>(this.sortedNodes); + } + } + + private List sortNodes() { + synchronized (this.clusterNodes) { + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List retList = new ArrayList<>(); + Object[] nodes = aList.toArray(); + // Collections.sort would do something similar by calling Arrays.sort + // internally but would finally iterate through the input list (aList) + // to reset the value of each element.. Since we don't really care about + // 'aList', we can use the iteration to create the list of nodeIds which + // is what we ultimately care about. + Arrays.sort(nodes, (Comparator)comparator); + for (int j=0; j < nodes.length; j++) { + retList.add(((ClusterNode)nodes[j]).nodeId); + } + return retList; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 0000000..4d5e3c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor.LoadComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the NodeManagerQueueMonitor to keep track of the mean, + * median and corresponding stand deviations of the configured metric (Wait time + * or Queue length) used to characterize the load on the specific node. + * The NodeManagerQueueMonitor triggers an update (by calling the 'update()' method) + * every time it performs an re-ordering of all nodes. + */ +public class QueueLimitCalculator { + + public class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger stdDevMean = new AtomicInteger(0); + + /** + * Not thread safe.. Caller should synchronize on topKNodes + */ + public void update() { + List sortedNodes = nodeSelector.getSortedNodes(); + if (sortedNodes.size() > 0) { + // calculate mean + int sum = 0; + for (NodeId n : sortedNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / sortedNodes.size()); + + // calculate std deviation + int sqrSumMean = 0; + for (NodeId n : sortedNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + } + stdDevMean.set((int)Math.sqrt(sqrSumMean / sortedNodes.size())); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((LoadComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getStdDevMean() { + return stdDevMean.get(); + } + } + + private final NodeManagerQueueMonitor nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + + QueueLimitCalculator(NodeManagerQueueMonitor selector, float sigma, + int rangeMin, int rangeMax) { + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + return (int) (stats.getMean() + sigma * stats.getStdDevMean()); + } + + void update() { + this.stats.update(); + } + + private int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == LoadComparator.WAIT_TIME) { + containerQueuingLimit.setMaxWaitTimeInMs(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxWaitTimeInMs(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java deleted file mode 100644 index 7e24687..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.ResourceOption; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class TopKNodeSelector implements ClusterMonitor { - - final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); - - public enum TopKComparator implements Comparator { - WAIT_TIME, - QUEUE_LENGTH; - - @Override - public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { - return o1.timestamp < o2.timestamp ? +1 : -1; - } - return getQuant(o1) > getQuant(o2) ? +1 : -1; - } - - private int getQuant(ClusterNode c) { - return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; - } - } - - static class ClusterNode { - int queueTime = -1; - int waitQueueLength = 0; - double timestamp; - final NodeId nodeId; - - public ClusterNode(NodeId nodeId) { - this.nodeId = nodeId; - updateTimestamp(); - } - - public ClusterNode setQueueTime(int queueTime) { - this.queueTime = queueTime; - return this; - } - - public ClusterNode setWaitQueueLength(int queueLength) { - this.waitQueueLength = queueLength; - return this; - } - - public ClusterNode updateTimestamp() { - this.timestamp = System.currentTimeMillis(); - return this; - } - } - - private final int k; - private final List topKNodes; - private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; - - Runnable computeTask = new Runnable() { - @Override - public void run() { - synchronized (topKNodes) { - topKNodes.clear(); - topKNodes.addAll(computeTopKNodes()); - } - } - }; - - @VisibleForTesting - TopKNodeSelector(int k, TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.comparator = comparator; - this.scheduledExecutor = null; - } - - public TopKNodeSelector(int k, long nodeComputationInterval, - TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.scheduledExecutor = Executors.newScheduledThreadPool(1); - this.comparator = comparator; - this.scheduledExecutor.scheduleAtFixedRate(computeTask, - nodeComputationInterval, nodeComputationInterval, - TimeUnit.MILLISECONDS); - } - - - @Override - public void addNode(List containerStatuses, RMNode - rmNode) { - LOG.debug("Node added event from: " + rmNode.getNode().getName()); - // Ignoring this currently : atleast one NODE_UPDATE heartbeat is - // required to ensure node eligibility. - } - - @Override - public void removeNode(RMNode removedRMNode) { - LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); - synchronized (this.clusterNodes) { - if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { - this.clusterNodes.remove(removedRMNode.getNodeID()); - LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); - } else { - LOG.debug("Node not in list!"); - } - } - } - - @Override - public void nodeUpdate(RMNode rmNode) { - LOG.debug("Node update event from: " + rmNode.getNodeID()); - QueuedContainersStatus queuedContainersStatus = - rmNode.getQueuedContainersStatus(); - int estimatedQueueWaitTime = - queuedContainersStatus.getEstimatedQueueWaitTime(); - int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); - // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node - // UNLESS comparator is based on queue length, in which case, we should add - synchronized (this.clusterNodes) { - ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); - if (currentNode == null) { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - this.clusterNodes.put(rmNode.getNodeID(), - new ClusterNode(rmNode.getNodeID()) - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } - } else { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - currentNode - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength) - .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + currentNode.queueTime + "] and " + - "wait queue length [" + currentNode.waitQueueLength + "]"); - } - } - } - } - - @Override - public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { - LOG.debug("Node resource update event from: " + rmNode.getNodeID()); - // Ignoring this currently... - } - - public List selectNodes() { - synchronized (this.topKNodes) { - return this.k < this.topKNodes.size() ? - new ArrayList<>(this.topKNodes).subList(0, this.k) : - new ArrayList<>(this.topKNodes); - } - } - - private List computeTopKNodes() { - synchronized (this.clusterNodes) { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); - List retList = new ArrayList<>(); - Object[] nodes = aList.toArray(); - // Collections.sort would do something similar by calling Arrays.sort - // internally but would finally iterate through the input list (aList) - // to reset the value of each element.. Since we don't really care about - // 'aList', we can use the iteration to create the list of nodeIds which - // is what we ultimately care about. - Arrays.sort(nodes, (Comparator)comparator); - for (int j=0; j < nodes.length; j++) { - retList.add(((ClusterNode)nodes[j]).nodeId); - } - return retList; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java index aec4e86..f0ddd28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.junit.Assert; @@ -62,9 +63,9 @@ public String toString() { } @Test - public void testQueueTimeSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.WAIT_TIME); + public void testWaitTimeSort() { + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.WAIT_TIME); selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); @@ -97,8 +98,8 @@ public void testQueueTimeSort() { @Test public void testQueueLengthSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.QUEUE_LENGTH); + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.QUEUE_LENGTH); selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); @@ -130,6 +131,50 @@ public void testQueueLengthSort() { Assert.assertEquals("h4:4", nodeIds.get(3).toString()); } + @Test + public void testContainerQueuingLimit() { + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + QueueLimitCalculator calculator = selector.getThresholdCalculator(); + selector.initThresholdCalculator(0, 6, 100); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTimeInMs()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTimeInMs()); + + // Test Limits do not exceed specified max + selector.nodeUpdate(createRMNode("h1", 1, -1, 110)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 120)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 130)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 140)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 150)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.nodeUpdate(createRMNode("h1", 1, -1, 1)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 2)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 3)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 4)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 5)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { RMNode node1 = Mockito.mock(RMNode.class);