diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 965b6c5..799f0b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -344,17 +344,43 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "distributed-scheduling.top-k"; public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; - /** Frequency for computing Top K Best Nodes */ - public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = - YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; - public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; - - /** Comparator for determining Node Load for Distributed Scheduling */ - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR = - YARN_PREFIX + "distributed-scheduling.top-k-comparator"; - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = + /** Frequency for computing Least Loaded Nodes. */ + public static final String NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS = + YARN_PREFIX + "nm-container-queuing.node-sort-compute-interval-ms"; + public static final long NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_DEFAULT = + 1000; + + /** Comparator for determining Node Load for Distributed Scheduling. */ + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR = + YARN_PREFIX + "nm-container-queuing.load-comparator"; + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT = "QUEUE_LENGTH"; + /** Value of Sigma used for calculation of queue limit thresholds. */ + public static final String NM_CONTAINER_QUEUING_LIMIT_SIGMA = + YARN_PREFIX + "nm-container-queuing.queue-limit-sigma"; + public static final float NM_CONTAINER_QUEUING_LIMIT_SIGMA_DEFAULT = + 1.0f; + + /** Min Length of Container Queue on the NodeManager . */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.min-queue-length"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1; + + /** Max Length of Container Queue on the NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.max-queue-length"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; + + /** Min Wait time of Container Queue on the NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_DEFAULT = 1; + + /** Max Wait time of Container Queue on the NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_DEFAULT = 10; /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c92a276..79ec041 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -135,9 +135,19 @@ public void initializeMemberVariables() { configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA); // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..a259158 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +68,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -102,6 +106,10 @@ private void mergeLocalToBuilder() { builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +205,30 @@ public void setNMTokenMasterKey(MasterKey masterKey) { } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +670,16 @@ public void remove() { builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat( + ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..028854d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +/** + * Used to hold max wait time / queue length information to be + * passed back to the NodeManager. + */ +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + ContainerQueuingLimit containerQueuingLimit = Records.newRecord + (ContainerQueuingLimit.class); + containerQueuingLimit.setMaxQueueLength(-1); + containerQueuingLimit.setMaxWaitTimeInMs(-1); + return containerQueuingLimit; + } + + public abstract int getMaxWaitTimeInMs(); + + public abstract void setMaxWaitTimeInMs(int waitTime); + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..27d924c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxWaitTimeInMs() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxWaitTimeInMs(); + } + + @Override + public void setMaxWaitTimeInMs(int waitTime) { + maybeInitBuilder(); + builder.setMaxWaitTimeInMs(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..46089a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_wait_time_in_ms = 1; + optional int32 max_queue_length = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 626d0a1..cfcf1bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -82,7 +82,7 @@ NodeHealthStatus getNodeHealthStatus(); - ContainerManagementProtocol getContainerManager(); + ContainerManager getContainerManager(); NodeResourceMonitor getNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b48706d..6ca7ffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -47,7 +47,6 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -465,7 +465,7 @@ public void run() { private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; - private ContainerManagementProtocol containerManager; + private ContainerManager containerManager; private NodeResourceMonitor nodeResourceMonitor; private final LocalDirsHandlerService dirsHandler; private final ApplicationACLsManager aclsManager; @@ -555,11 +555,11 @@ public void setNodeResourceMonitor(NodeResourceMonitor nodeResourceMonitor) { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return this.containerManager; } - public void setContainerManager(ContainerManagementProtocol containerManager) { + public void setContainerManager(ContainerManager containerManager) { this.containerManager = containerManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..cabf20f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -401,8 +402,7 @@ protected void registerWithRM() LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); - ((ContainerManagerImpl) this.context.getContainerManager()) - .setBlockNewContainerRequests(false); + this.context.getContainerManager().setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -465,10 +465,8 @@ private QueuedContainersStatus getQueuedContainerStatus() { * @return Resource utilization of all the containers. */ private ResourceUtilization getContainersUtilization() { - ContainerManagerImpl containerManager = - (ContainerManagerImpl) this.context.getContainerManager(); ContainersMonitor containersMonitor = - containerManager.getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); return containersMonitor.getContainersUtilization(); } @@ -760,82 +758,73 @@ public void run() { nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); - if (response.getNodeAction() == NodeAction.SHUTDOWN) { - LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" - + " heartbeat, hence shutting down."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - context.setDecommissioned(true); - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - break; - } - if (response.getNodeAction() == NodeAction.RESYNC) { - LOG.warn("Node is out of sync with ResourceManager," - + " hence resyncing."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - // Invalidate the RMIdentifier while resync - NodeStatusUpdaterImpl.this.rmIdentifier = - ResourceManagerConstants.RM_INVALID_IDENTIFIER; - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.RESYNC)); - pendingCompletedContainers.clear(); - break; - } - - nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response); - - // Explicitly put this method after checking the resync response. We - // don't want to remove the completed containers before resync - // because these completed containers will be reported back to RM - // when NM re-registers with RM. - // Only remove the cleanedup containers that are acked - removeOrTrackCompletedContainersFromContext(response + if (!handleShutdownOrResyncCommand(response)) { + nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels + (response); + + // Explicitly put this method after checking the resync + // response. We + // don't want to remove the completed containers before resync + // because these completed containers will be reported back to RM + // when NM re-registers with RM. + // Only remove the cleanedup containers that are acked + removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); - logAggregationReportForAppsTempList.clear(); - lastHeartbeatID = response.getResponseId(); - List containersToCleanup = response - .getContainersToCleanup(); - if (!containersToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); - } - List appsToCleanup = - response.getApplicationsToCleanup(); - //Only start tracking for keepAlive on FINISH_APP - trackAppsForKeepAlive(appsToCleanup); - if (!appsToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedAppsEvent(appsToCleanup, - CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); - } + logAggregationReportForAppsTempList.clear(); + lastHeartbeatID = response.getResponseId(); + List containersToCleanup = response + .getContainersToCleanup(); + if (!containersToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedContainersEvent(containersToCleanup, + CMgrCompletedContainersEvent.Reason + .BY_RESOURCEMANAGER)); + } + List appsToCleanup = + response.getApplicationsToCleanup(); + //Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(appsToCleanup); + if (!appsToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedAppsEvent(appsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + } - Map systemCredentials = - response.getSystemCredentialsForApps(); - if (systemCredentials != null && !systemCredentials.isEmpty()) { - ((NMContext) context) - .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); - } + Map systemCredentials = + response.getSystemCredentialsForApps(); + if (systemCredentials != null && !systemCredentials.isEmpty()) { + ((NMContext) context) + .setSystemCrendentialsForApps(parseCredentials + (systemCredentials)); + } - List - containersToDecrease = response.getContainersToDecrease(); - if (!containersToDecrease.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrDecreaseContainersResourceEvent(containersToDecrease) - ); - } + List + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent + (containersToDecrease) + ); + } - // SignalContainer request originally comes from end users via - // ClientRMProtocol's SignalContainer. Forward the request to - // ContainerManager which will dispatch the event to ContainerLauncher. - List containersToSignal = response - .getContainersToSignalList(); - if (containersToSignal.size() != 0) { - dispatcher.getEventHandler().handle( - new CMgrSignalContainersEvent(containersToSignal)); + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to + // ContainerLauncher. + List containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } + + // Update QueuingLimits if ContainerManager supports queuing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimit(queuingLimit); + } } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM @@ -883,6 +872,34 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { statusUpdater.start(); } + private boolean handleShutdownOrResyncCommand( + NodeHeartbeatResponse response) { + if (response.getNodeAction() == NodeAction.SHUTDOWN) { + LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" + + " heartbeat, hence shutting down."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + context.setDecommissioned(true); + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + return true; + } + if (response.getNodeAction() == NodeAction.RESYNC) { + LOG.warn("Node is out of sync with ResourceManager," + + " hence resyncing."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + // Invalidate the RMIdentifier while resync + NodeStatusUpdaterImpl.this.rmIdentifier = + ResourceManagerConstants.RM_INVALID_IDENTIFIER; + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.RESYNC)); + pendingCompletedContainers.clear(); + return true; + } + return false; + } + private List getLogAggregationReportsForApps( ConcurrentLinkedQueue lastestLogAggregationStatus) { LogAggregationReport status; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java new file mode 100644 index 0000000..f68b39c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitor; + +/** + * The ContainerManager is an entity that manages the life cycle of Containers. + */ +public interface ContainerManager extends ServiceStateChangeListener, + ContainerManagementProtocol, + EventHandler { + + void updateQueuingLimit(ContainerQueuingLimit queuingLimit); + + ContainersMonitor getContainersMonitor(); + + void setBlockNewContainerRequests(boolean blockNewContainerRequests); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 162823c..0420dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; -import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -95,6 +94,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -150,8 +150,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements - ServiceStateChangeListener, ContainerManagementProtocol, - EventHandler { + ContainerManager { /** * Extra duration to wait for applications to be killed on shutdown. @@ -402,6 +401,7 @@ protected LogHandler createLogHandler(Configuration conf, Context context, } } + @Override public ContainersMonitor getContainersMonitor() { return this.containersMonitor; } @@ -1391,6 +1391,7 @@ public void handle(ContainerManagerEvent event) { } } + @Override public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { this.blockNewContainerRequests.set(blockNewContainerRequests); } @@ -1427,4 +1428,9 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) { protected boolean isServiceStopped() { return serviceStopped; } + + @Override + public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { + LOG.trace("Implementation does not support queuing of Containers !!"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index ef4e571..ca0785b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -80,6 +81,7 @@ private Queue queuedOpportunisticContainers; private Set opportunisticContainersToKill; + private final ContainerQueuingLimit queuingLimit; public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -92,6 +94,7 @@ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet()); + this.queuingLimit = ContainerQueuingLimit.newInstance(); } @Override @@ -487,6 +490,41 @@ public void handle(ApplicationEvent event) { } } + @Override + public void updateQueuingLimit(ContainerQueuingLimit limit) { + this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); + // TODO: Include wait time as well once its implemented + if (this.queuingLimit.getMaxQueueLength() > -1) { + shedQueuedOpportunisticContainers(); + } + } + + private void shedQueuedOpportunisticContainers() { + int numAllowed = this.queuingLimit.getMaxQueueLength(); + Iterator containerIter = + queuedOpportunisticContainers.iterator(); + while (containerIter.hasNext()) { + AllocatedContainerInfo cInfo = containerIter.next(); + if (numAllowed <= 0) { + containerIter.remove(); + ContainerTokenIdentifier containerTokenIdentifier = this.context + .getQueuingContext().getQueuedContainers().remove( + cInfo.getContainerTokenIdentifier().getContainerID()); + // The Container might have already started while we were + // iterating.. + if (containerTokenIdentifier != null) { + this.context.getQueuingContext().getKilledQueuedContainers() + .putIfAbsent(cInfo.getContainerTokenIdentifier(), + "Container De-queued to meet global queuing limits. " + + "Max Queue length[" + + this.queuingLimit.getMaxQueueLength() + "]"); + } + } + numAllowed--; + } + } + + static class AllocatedContainerInfo { private final ContainerTokenIdentifier containerTokenIdentifier; private final NMTokenIdentifier nmTokenIdentifier; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index ce405f8..6ff74c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -642,7 +643,7 @@ public NodeHealthStatus getNodeHealthStatus() { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0..e3e97ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -24,6 +24,10 @@ import java.util.List; +/** + * Implementations of this class are notified of changes to the Cluster State + * such as node removal/addition and updates. + */ public interface ClusterMonitor { void addNode(List containerStatuses, RMNode rmNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 170d91a..810cc2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed - .TopKNodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -57,7 +59,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -76,30 +77,63 @@ private static final Log LOG = LogFactory.getLog(DistributedSchedulingService.class); - private final TopKNodeSelector clusterMonitor; + private final NodeManagerQueueMonitor nodeMonitor; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); private final ConcurrentHashMap> hostToNode = new ConcurrentHashMap<>(); + private final int k; public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - int k = rmContext.getYarnConfiguration().getInt( + this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.DIST_SCHEDULING_TOP_K, YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); - long topKComputationInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); - TopKNodeSelector.TopKComparator comparator = - TopKNodeSelector.TopKComparator.valueOf( + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_MS, + YarnConfiguration.NM_CONTAINER_QUEUING_NODE_SORT_COMPUTE_INT_DEFAULT); + NodeManagerQueueMonitor.LoadComparator comparator = + NodeManagerQueueMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); - TopKNodeSelector topKSelector = - new TopKNodeSelector(k, topKComputationInterval, comparator); - this.clusterMonitor = topKSelector; + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeManagerQueueMonitor topKSelector = + new NodeManagerQueueMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_SIGMA_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeManagerQueueMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; } @Override @@ -189,7 +223,7 @@ public AllocateResponse allocate(AllocateRequest request) throws // Set nodes to be used for scheduling dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -201,7 +235,7 @@ public AllocateResponse allocate(AllocateRequest request) throws (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -229,67 +263,72 @@ private void removeFromMapping(ConcurrentHashMap> mapping, @Override public void handle(SchedulerEvent event) { switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); - addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = - (NodeRemovedSchedulerEvent)event; - clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); - removeFromMapping(rackToNode, - nodeRemovedEvent.getRemovedRMNode().getRackName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - removeFromMapping(hostToNode, - nodeRemovedEvent.getRemovedRMNode().getHostName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); - break; - case NODE_RESOURCE_UPDATE: - if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent)event; - clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), - nodeResourceUpdatedEvent.getResourceOption()); - break; - - // <-- IGNORED EVENTS : START --> - case APP_ADDED: - break; - case APP_REMOVED: - break; - case APP_ATTEMPT_ADDED: - break; - case APP_ATTEMPT_REMOVED: - break; - case CONTAINER_EXPIRED: - break; - case NODE_LABELS_UPDATE: - break; - // <-- IGNORED EVENTS : END --> - default: - LOG.error("Unknown event arrived at DistributedSchedulingService: " - + event.toString()); + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingService: " + + event.toString()); } + } + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..b063ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +141,6 @@ void setRMDelegatedNodeLabelsUpdater( void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getNodeManagerQueueLimitCalculator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..d228388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +76,8 @@ private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +476,14 @@ public PlacementManager getQueuePlacementManager() { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6c80a58..f9d3325 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1154,6 +1154,8 @@ protected ApplicationMasterService createApplicationMasterService() { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getNodeManagerQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..d306b60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -536,6 +536,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } } + // 6. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate the number of Containers queued for execution. + if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getNodeManagerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java new file mode 100644 index 0000000..f7da57a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeManagerQueueMonitor.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The NodeManagerQueueMonitor keeps track of load metrics (such as queue length + * and total wait time) associated with Container Queues on the Node Manager. + * It uses this information to periodically sort the Nodes from least to most + * loaded. + */ +public class NodeManagerQueueMonitor implements ClusterMonitor { + + final static Log LOG = LogFactory.getLog(NodeManagerQueueMonitor.class); + + /** + * The comparator used to specify the metric against which the load + * of two Nodes are compared. + */ + public enum LoadComparator implements Comparator { + WAIT_TIME, + QUEUE_LENGTH; + + @Override + public int compare(ClusterNode o1, ClusterNode o2) { + if (getMetric(o1) == getMetric(o2)) { + return o1.timestamp < o2.timestamp ? +1 : -1; + } + return getMetric(o1) > getMetric(o2) ? +1 : -1; + } + + public int getMetric(ClusterNode c) { + return (this == WAIT_TIME) ? c.waitTime : c.queueLength; + } + } + + static class ClusterNode { + int waitTime = -1; + int queueLength = 0; + double timestamp; + final NodeId nodeId; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + updateTimestamp(); + } + + public ClusterNode setWaitTime(int wTime) { + this.waitTime = wTime; + return this; + } + + public ClusterNode setQueueLength(int qLength) { + this.queueLength = qLength; + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + } + + private final ScheduledExecutorService scheduledExecutor; + + private final List sortedNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final LoadComparator comparator; + private QueueLimitCalculator thresholdCalculator; + + Runnable computeTask = new Runnable() { + @Override + public void run() { + synchronized (sortedNodes) { + sortedNodes.clear(); + sortedNodes.addAll(sortNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } + } + } + }; + + @VisibleForTesting + NodeManagerQueueMonitor(LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.comparator = comparator; + this.scheduledExecutor = null; + } + + public NodeManagerQueueMonitor(long nodeComputationInterval, + LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.comparator = comparator; + this.scheduledExecutor.scheduleAtFixedRate(computeTask, + nodeComputationInterval, nodeComputationInterval, + TimeUnit.MILLISECONDS); + } + + List getSortedNodes() { + return sortedNodes; + } + + public QueueLimitCalculator getThresholdCalculator() { + return thresholdCalculator; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { + this.thresholdCalculator = + new QueueLimitCalculator(this, sigma, limitMin, limitMax); + } + + @Override + public void addNode(List containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { + this.clusterNodes.remove(removedRMNode.getNodeID()); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + } else { + LOG.debug("Node not in list!"); + } + } + } + + @Override + public void nodeUpdate(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNodeID()); + QueuedContainersStatus queuedContainersStatus = + rmNode.getQueuedContainersStatus(); + int estimatedQueueWaitTime = + queuedContainersStatus.getEstimatedQueueWaitTime(); + int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node + // UNLESS comparator is based on queue length, in which case, we should add + synchronized (this.clusterNodes) { + ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); + if (currentNode == null) { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength)); + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } + } else { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + currentNode + .setWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .updateTimestamp(); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + currentNode.waitTime + "] and " + + "wait queue length [" + currentNode.queueLength + "]"); + } + } + } + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently... + } + + /** + * Returns all Node Ids as ordered list from Least to Most Loaded. + * @return ordered list of nodes + */ + public List selectNodes() { + return selectLeastLoadedNodes(-1); + } + + /** + * Returns 'K' of the least Loaded Node Ids as ordered list. + * @param k max number of nodes to return + * @return ordered list of nodes + */ + public List selectLeastLoadedNodes(int k) { + synchronized (this.sortedNodes) { + return ((k < this.sortedNodes.size()) && (k >= 0)) ? + new ArrayList<>(this.sortedNodes).subList(0, k) : + new ArrayList<>(this.sortedNodes); + } + } + + private List sortNodes() { + synchronized (this.clusterNodes) { + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List retList = new ArrayList<>(); + Object[] nodes = aList.toArray(); + // Collections.sort would do something similar by calling Arrays.sort + // internally but would finally iterate through the input list (aList) + // to reset the value of each element.. Since we don't really care about + // 'aList', we can use the iteration to create the list of nodeIds which + // is what we ultimately care about. + Arrays.sort(nodes, (Comparator)comparator); + for (int j=0; j < nodes.length; j++) { + retList.add(((ClusterNode)nodes[j]).nodeId); + } + return retList; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 0000000..598a6a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeManagerQueueMonitor.LoadComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the NodeManagerQueueMonitor to keep track of the + * mean, median and corresponding stand deviations of the configured metric + * (Wait time or Queue length) used to characterize the load on the specific + * node. + * The NodeManagerQueueMonitor triggers an update (by calling the 'update()' + * method) every time it performs an re-ordering of all nodes. + */ +public class QueueLimitCalculator { + + public class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger stdDevMean = new AtomicInteger(0); + + /** + * Not thread safe.. Caller should synchronize on topKNodes + */ + public void update() { + List sortedNodes = nodeSelector.getSortedNodes(); + if (sortedNodes.size() > 0) { + // calculate mean + int sum = 0; + for (NodeId n : sortedNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / sortedNodes.size()); + + // calculate std deviation + int sqrSumMean = 0; + for (NodeId n : sortedNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + } + stdDevMean.set((int)Math.sqrt(sqrSumMean / sortedNodes.size())); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((LoadComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getStdDevMean() { + return stdDevMean.get(); + } + } + + private final NodeManagerQueueMonitor nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + + QueueLimitCalculator(NodeManagerQueueMonitor selector, float sigma, + int rangeMin, int rangeMax) { + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + return (int) (stats.getMean() + sigma * stats.getStdDevMean()); + } + + void update() { + this.stats.update(); + } + + private int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == LoadComparator.WAIT_TIME) { + containerQueuingLimit.setMaxWaitTimeInMs(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxWaitTimeInMs(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java deleted file mode 100644 index 7e24687..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.ResourceOption; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class TopKNodeSelector implements ClusterMonitor { - - final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); - - public enum TopKComparator implements Comparator { - WAIT_TIME, - QUEUE_LENGTH; - - @Override - public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { - return o1.timestamp < o2.timestamp ? +1 : -1; - } - return getQuant(o1) > getQuant(o2) ? +1 : -1; - } - - private int getQuant(ClusterNode c) { - return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; - } - } - - static class ClusterNode { - int queueTime = -1; - int waitQueueLength = 0; - double timestamp; - final NodeId nodeId; - - public ClusterNode(NodeId nodeId) { - this.nodeId = nodeId; - updateTimestamp(); - } - - public ClusterNode setQueueTime(int queueTime) { - this.queueTime = queueTime; - return this; - } - - public ClusterNode setWaitQueueLength(int queueLength) { - this.waitQueueLength = queueLength; - return this; - } - - public ClusterNode updateTimestamp() { - this.timestamp = System.currentTimeMillis(); - return this; - } - } - - private final int k; - private final List topKNodes; - private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; - - Runnable computeTask = new Runnable() { - @Override - public void run() { - synchronized (topKNodes) { - topKNodes.clear(); - topKNodes.addAll(computeTopKNodes()); - } - } - }; - - @VisibleForTesting - TopKNodeSelector(int k, TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.comparator = comparator; - this.scheduledExecutor = null; - } - - public TopKNodeSelector(int k, long nodeComputationInterval, - TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.scheduledExecutor = Executors.newScheduledThreadPool(1); - this.comparator = comparator; - this.scheduledExecutor.scheduleAtFixedRate(computeTask, - nodeComputationInterval, nodeComputationInterval, - TimeUnit.MILLISECONDS); - } - - - @Override - public void addNode(List containerStatuses, RMNode - rmNode) { - LOG.debug("Node added event from: " + rmNode.getNode().getName()); - // Ignoring this currently : atleast one NODE_UPDATE heartbeat is - // required to ensure node eligibility. - } - - @Override - public void removeNode(RMNode removedRMNode) { - LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); - synchronized (this.clusterNodes) { - if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { - this.clusterNodes.remove(removedRMNode.getNodeID()); - LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); - } else { - LOG.debug("Node not in list!"); - } - } - } - - @Override - public void nodeUpdate(RMNode rmNode) { - LOG.debug("Node update event from: " + rmNode.getNodeID()); - QueuedContainersStatus queuedContainersStatus = - rmNode.getQueuedContainersStatus(); - int estimatedQueueWaitTime = - queuedContainersStatus.getEstimatedQueueWaitTime(); - int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); - // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node - // UNLESS comparator is based on queue length, in which case, we should add - synchronized (this.clusterNodes) { - ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); - if (currentNode == null) { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - this.clusterNodes.put(rmNode.getNodeID(), - new ClusterNode(rmNode.getNodeID()) - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } - } else { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - currentNode - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength) - .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + currentNode.queueTime + "] and " + - "wait queue length [" + currentNode.waitQueueLength + "]"); - } - } - } - } - - @Override - public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { - LOG.debug("Node resource update event from: " + rmNode.getNodeID()); - // Ignoring this currently... - } - - public List selectNodes() { - synchronized (this.topKNodes) { - return this.k < this.topKNodes.size() ? - new ArrayList<>(this.topKNodes).subList(0, this.k) : - new ArrayList<>(this.topKNodes); - } - } - - private List computeTopKNodes() { - synchronized (this.clusterNodes) { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); - List retList = new ArrayList<>(); - Object[] nodes = aList.toArray(); - // Collections.sort would do something similar by calling Arrays.sort - // internally but would finally iterate through the input list (aList) - // to reset the value of each element.. Since we don't really care about - // 'aList', we can use the iteration to create the list of nodeIds which - // is what we ultimately care about. - Arrays.sort(nodes, (Comparator)comparator); - for (int j=0; j < nodes.length; j++) { - retList.add(((ClusterNode)nodes[j]).nodeId); - } - return retList; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeManagerQueueMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeManagerQueueMonitor.java new file mode 100644 index 0000000..63a6a45 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeManagerQueueMonitor.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +public class TestNodeManagerQueueMonitor { + + static class FakeNodeId extends NodeId { + final String host; + final int port; + + public FakeNodeId(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + protected void setHost(String host) {} + @Override + protected void setPort(int port) {} + @Override + protected void build() {} + + @Override + public String toString() { + return host + ":" + port; + } + } + + @Test + public void testWaitTimeSort() { + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.WAIT_TIME); + selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); + selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); + selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); + selector.computeTask.run(); + List nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.nodeUpdate(createRMNode("h3", 3, 2, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time + selector.nodeUpdate(createRMNode("h4", 4, -1, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + } + + @Test + public void testQueueLengthSort() { + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + selector.computeTask.run(); + List nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.nodeUpdate(createRMNode("h3", 3, -1, 2)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time but valid length + selector.nodeUpdate(createRMNode("h4", 4, -1, 20)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + } + + @Test + public void testContainerQueuingLimit() { + NodeManagerQueueMonitor selector = new NodeManagerQueueMonitor( + NodeManagerQueueMonitor.LoadComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + selector.initThresholdCalculator(0, 6, 100); + QueueLimitCalculator calculator = selector.getThresholdCalculator(); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTimeInMs()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxWaitTimeInMs()); + + // Test Limits do not exceed specified max + selector.nodeUpdate(createRMNode("h1", 1, -1, 110)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 120)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 130)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 140)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 150)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.nodeUpdate(createRMNode("h1", 1, -1, 1)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 2)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 3)); + selector.nodeUpdate(createRMNode("h4", 4, -1, 4)); + selector.nodeUpdate(createRMNode("h5", 5, -1, 5)); + selector.nodeUpdate(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength) { + RMNode node1 = Mockito.mock(RMNode.class); + NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getNodeID()).thenReturn(nID1); + QueuedContainersStatus status1 = + Mockito.mock(QueuedContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()) + .thenReturn(waitTime); + Mockito.when(status1.getWaitQueueLength()) + .thenReturn(queueLength); + Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1); + return node1; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java deleted file mode 100644 index aec4e86..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; - -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.List; - -public class TestTopKNodeSelector { - - static class FakeNodeId extends NodeId { - final String host; - final int port; - - public FakeNodeId(String host, int port) { - this.host = host; - this.port = port; - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getPort() { - return port; - } - - @Override - protected void setHost(String host) {} - @Override - protected void setPort(int port) {} - @Override - protected void build() {} - - @Override - public String toString() { - return host + ":" + port; - } - } - - @Test - public void testQueueTimeSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.WAIT_TIME); - selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); - selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); - selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); - selector.computeTask.run(); - List nodeIds = selector.selectNodes(); - System.out.println("1-> " + nodeIds); - Assert.assertEquals("h2:2", nodeIds.get(0).toString()); - Assert.assertEquals("h3:3", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, 2, 10)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("2-> "+ nodeIds); - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now send update with -1 wait time - selector.nodeUpdate(createRMNode("h4", 4, -1, 10)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("3-> "+ nodeIds); - // No change - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - } - - @Test - public void testQueueLengthSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.QUEUE_LENGTH); - selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); - selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); - selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); - selector.computeTask.run(); - List nodeIds = selector.selectNodes(); - System.out.println("1-> " + nodeIds); - Assert.assertEquals("h2:2", nodeIds.get(0).toString()); - Assert.assertEquals("h3:3", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, -1, 2)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("2-> "+ nodeIds); - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now send update with -1 wait time but valid length - selector.nodeUpdate(createRMNode("h4", 4, -1, 20)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("3-> "+ nodeIds); - // No change - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - Assert.assertEquals("h4:4", nodeIds.get(3).toString()); - } - - private RMNode createRMNode(String host, int port, - int waitTime, int queueLength) { - RMNode node1 = Mockito.mock(RMNode.class); - NodeId nID1 = new FakeNodeId(host, port); - Mockito.when(node1.getNodeID()).thenReturn(nID1); - QueuedContainersStatus status1 = - Mockito.mock(QueuedContainersStatus.class); - Mockito.when(status1.getEstimatedQueueWaitTime()) - .thenReturn(waitTime); - Mockito.when(status1.getWaitQueueLength()) - .thenReturn(queueLength); - Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1); - return node1; - } -}