diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e303f..6c71ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1486,6 +1486,7 @@ */ public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX + "manager-class"; + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX @@ -1494,6 +1495,20 @@ NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION = + NODE_LABELS_PREFIX + "decentralized-configuration.enabled"; + + /** Configurations in NodeManager for NodeLabelsFeature*/ + public static final String NM_NODE_LABELS_PREFIX = NODE_LABELS_PREFIX + + "nm."; + public static final String NM_NODE_LABELS_PROVIDER_CLASS = + NM_NODE_LABELS_PREFIX + "labels-provider.class"; + public static final String NM_LABELS_FETCH_INTERVAL_MS = + NM_NODE_LABELS_PREFIX + "interval-ms"; + + public static final long DEFAULT_NM_LABELS_FETCH_INTERVAL_MS = 10 * 60 * 1000; + + /** End of Configurations in NodeManager for NodeLabelsFeature*/ public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..ecc8d9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null, null); + YarnVersionInfo.getVersion(), null, null,null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception { NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..a903ea6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey,Set nodeLabels) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +36,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } @@ -45,4 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 0e3d7e4..011055e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -30,7 +31,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, - List runningApplications) { + List runningApplications,Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +49,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..85e9389 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -36,6 +40,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +85,10 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private void mergeLocalToProto() { @@ -178,4 +187,27 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..2c58eab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,23 +20,18 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -56,6 +51,7 @@ private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -86,7 +82,10 @@ private void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +291,29 @@ public void setNMVersion(String version) { builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -323,4 +345,5 @@ private NMContainerStatusPBImpl convertFromProtoFormat(NMContainerStatusProto c) private NMContainerStatusProto convertToProtoFormat(NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb..33919f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; + repeated string nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -47,6 +48,7 @@ message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated string nodeLabels = 4; } message NodeHeartbeatResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 7165445..7f5e4db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.junit.Assert; @@ -78,7 +77,7 @@ public void testRegisterNodeManagerRequest() { RegisterNodeManagerRequest.newInstance( NodeId.newInstance("1.1.1.1", 1000), 8080, Resource.newInstance(1024, 1), "NM-version-id", reports, - Arrays.asList(appId)); + Arrays.asList(appId),new HashSet()); RegisterNodeManagerRequest requestProto = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java index fdacd92..672b9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,7 +45,7 @@ public void testRegisterNodeManagerRequest() { ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", -1, Priority.newInstance(0), 1234)), Arrays.asList( ApplicationId.newInstance(1234L, 1), - ApplicationId.newInstance(1234L, 2))); + ApplicationId.newInstance(1234L, 2)),new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = @@ -67,8 +68,9 @@ public void testRegisterNodeManagerRequest() { @Test public void testRegisterNodeManagerRequestWithNullArrays() { RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), - 1234, Resource.newInstance(0, 0), "version", null, null); + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 43770c1..e2c0b46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -53,6 +54,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -75,6 +78,7 @@ protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsProviderService nodeLabelsProviderService; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -91,9 +95,25 @@ public NodeManager() { } protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics,nodeLabelsProviderService); + } + + protected NodeLabelsProviderService createNodeLabelsProviderService( + Configuration conf) throws InstantiationException, IllegalAccessException { + NodeLabelsProviderService provider = null; + if (conf.getBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false)) { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + ConfigurationNodeLabelsProvider.class, + NodeLabelsProviderService.class); + provider = labelsProviderClass.newInstance(); + addService((Service) provider); + } + return provider; } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -220,8 +240,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); + nodeLabelsProviderService = createNodeLabelsProviderService(conf); + nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsProviderService); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index bed58f5..9dd3b74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -62,10 +62,11 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -112,10 +113,16 @@ private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + private final NodeLabelsProviderService nodeLabelsProvider; + + private boolean isDecentralizedNodeLabelsConf; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsProvider = nodeLabelsProviderService; this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -166,6 +173,8 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" + durationToTrackStoppedContainers); } + isDecentralizedNodeLabelsConf = conf.getBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false); super.serviceInit(conf); LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + @@ -244,8 +253,15 @@ protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + RegisterNodeManagerRequest.newInstance( + nodeId, + httpPort, + totalResource, + nodeManagerVersionId, + containerReports, + getRunningApplications(), + (isDecentralizedNodeLabelsConf) ? nodeLabelsProvider + .getNodeLabels() : null); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -532,18 +548,34 @@ protected void startStatusUpdater() { @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; + Set nodeLabelsLastReceived = + nodeLabelsProvider.getNodeLabels(); + Set nodeLabelsForHeartBeat = null; while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); + nodeLabelsForHeartBeat = null; + if (isDecentralizedNodeLabelsConf) { + nodeLabelsForHeartBeat = + nodeLabelsProvider.getNodeLabels(); + if (arelabelsSameAsPrevOutPut(nodeLabelsForHeartBeat, + nodeLabelsLastReceived)) { + nodeLabelsForHeartBeat = null; + } else { + nodeLabelsLastReceived = nodeLabelsForHeartBeat; + } + } + NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartBeat); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -572,6 +604,13 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } + + if (nodeLabelsForHeartBeat != null + && response.getDiagnosticsMessage() != null) { + LOG.info("Node Labels were rejected from RM " + + response.getDiagnosticsMessage()); + } + // Explicitly put this method after checking the resync response. We // don't want to remove the completed containers before resync @@ -622,6 +661,14 @@ public void run() { } } } + + private boolean arelabelsSameAsPrevOutPut(Set nodeLabelsNew, + Set nodeLabelsOld) { + if (nodeLabelsNew.size() != nodeLabelsOld.size()) { + return false; + } + return nodeLabelsOld.containsAll(nodeLabelsNew); + } private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..054cb42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Provides Node's Labels by constantly monitoring the configuration. + */ +public class ConfigurationNodeLabelsProvider extends AbstractService implements + NodeLabelsProviderService { + + /** Timer used to schedule node Label monitoring */ + private Timer configurationMonitorTimer; + + public ConfigurationNodeLabelsProvider() { + super("Configuration Based NodeLabels Provider Service"); + } + + private Set nodeLabels = new HashSet(); + private long intervalTime; + + @Override + public Set getNodeLabels() { + readLock.lock(); + try { + return nodeLabels; + } finally { + readLock.unlock(); + } + } + + protected Lock readLock; + protected Lock writeLock; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.intervalTime = + conf.getLong(YarnConfiguration.NM_LABELS_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_INTERVAL_MS); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + } + + @Override + protected void serviceStart() { + configurationMonitorTimer = + new Timer("Node Labels Configuration Monitor", true); + configurationMonitorTimer.scheduleAtFixedRate( + new ConfigurationMonitorTimerTask(), 0, intervalTime); + } + + private void checkForNodeLabelModification(Configuration conf) { + String[] nodeLabelsFromScript = + StringUtils.getStrings(conf.get(YarnConfiguration.NM_NODE_LABELS_PREFIX, "")); + writeLock.lock(); + try { + nodeLabels = new HashSet(Arrays.asList(nodeLabelsFromScript)); + } finally { + writeLock.unlock(); + } + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + checkForNodeLabelModification(new YarnConfiguration()); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java new file mode 100644 index 0000000..bf45e53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.Set; + +import org.apache.hadoop.service.Service; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public interface NodeLabelsProviderService extends Service{ + /** + * Provides the labels + * @return + */ + public Set getNodeLabels(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index 3f4091c..aaa1d8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; - import java.nio.ByteBuffer; import org.apache.commons.logging.Log; @@ -37,6 +36,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; /** @@ -52,8 +52,9 @@ private ResourceTracker resourceTracker; public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, nodeLabelsProviderService); resourceTracker = createResourceTracker(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index fabb03b..91ab143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -100,10 +102,14 @@ public int getHttpPort() { Dispatcher dispatcher = new AsyncDispatcher(); NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); healthChecker.init(conf); + NodeLabelsProviderService nodeLabelsProviderService = + new ConfigurationNodeLabelsProvider(); + nodeLabelsProviderService.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeStatusUpdater nodeStatusUpdater = - new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { + new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index e69170e..9e2b50c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -269,9 +270,11 @@ public MyNodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, nodeLabelsProviderService); return myNodeStatusUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 85bafb3..1e91ed2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.After; @@ -226,9 +227,10 @@ public void testNMSentContainerStatusOnResync() throws Exception { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new TestNodeStatusUpdaterResync(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics,nodeLabelsProviderService) { @Override protected ResourceTracker createResourceTracker() { return new MockResourceTracker() { @@ -308,8 +310,9 @@ public NodeHeartbeatResponse nodeHeartbeat( // This can be used as a common base class for testing NM resync behavior. class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater { public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics,nodeLabelsProviderService); } @Override protected void rebootNodeStatusUpdaterAndRegisterWithRM() { @@ -355,9 +358,10 @@ public void setExistingContainerId(ContainerId cId) { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new TestNodeStatusUpdaterImpl1(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,nodeLabelsProviderService); } public int getNMRegistrationCount() { @@ -367,8 +371,9 @@ public int getNMRegistrationCount() { class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics,nodeLabelsProviderService); } @Override @@ -423,9 +428,10 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { Thread launchContainersThread = null; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new TestNodeStatusUpdaterImpl2(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,nodeLabelsProviderService); } @Override @@ -466,8 +472,9 @@ public void setBlockNewContainerRequests( class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, nodeLabelsProviderService); } @Override @@ -553,9 +560,10 @@ public void run() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker, - metrics); + metrics,nodeLabelsProviderService); } public int getNMRegistrationCount() { @@ -573,8 +581,9 @@ protected void shutDown() { class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics,nodeLabelsProviderService); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index c44f7b8..bb74477 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -32,8 +32,6 @@ import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -65,9 +63,11 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -309,9 +309,11 @@ private static File createUnhaltingScriptFile(ContainerId cId, @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, nodeLabelsProviderService); return myNodeStatusUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 7593ce6..adf8120 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -289,8 +290,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); this.context = context; resourceTracker = new MyResourceTracker(this.context); } @@ -312,8 +315,10 @@ protected void stopRMProxy() { public ResourceTracker resourceTracker; public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); resourceTracker = new MyResourceTracker4(context); } @@ -333,8 +338,10 @@ protected void stopRMProxy() { private Context context; public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); this.context = context; this.resourceTracker = new MyResourceTracker3(this.context); } @@ -362,8 +369,10 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { public ResourceTracker resourceTracker; public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - long rmStartIntervalMS, boolean rmNeverStart) { - super(context, dispatcher, healthChecker, metrics); + long rmStartIntervalMS, boolean rmNeverStart, + NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; } @@ -401,8 +410,10 @@ protected void stopRMProxy() { private Configuration conf; public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + Configuration conf, NodeLabelsProviderService nodeLabelsProviderService) { + super(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); resourceTracker = new MyResourceTracker5(); this.conf = conf; } @@ -425,9 +436,11 @@ protected void stopRMProxy() { private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { this.nodeStatusUpdater = - new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); + new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics, + nodeLabelsProviderService); return this.nodeStatusUpdater; } @@ -448,10 +461,11 @@ public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { nodeStatusUpdater = new MyNodeStatusUpdater5(context, dispatcher, healthChecker, - metrics, conf); + metrics, conf,nodeLabelsProviderService); return nodeStatusUpdater; } @@ -917,9 +931,10 @@ public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics,nodeLabelsProviderService); } }; @@ -978,9 +993,11 @@ public void testStopReentrant() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { + MyNodeStatusUpdater myNodeStatusUpdater = + new MyNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, nodeLabelsProviderService); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; myNodeStatusUpdater.resourceTracker = myResourceTracker2; @@ -1063,8 +1080,9 @@ private NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker) { - updater = createUpdater(context, dispatcher, healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { + updater = createUpdater(context, dispatcher, healthChecker,nodeLabelsProviderService); return updater; } @@ -1074,7 +1092,8 @@ public NodeStatusUpdater getUpdater() { abstract NodeStatusUpdater createUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService); } @Test @@ -1083,9 +1102,10 @@ public void testNMShutdownForRegistrationFailure() throws Exception { nm = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,nodeLabelsProviderService); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; myResourceTracker2.shutDownMessage = "RM Shutting Down Node"; @@ -1116,10 +1136,11 @@ public void testNMConnectionToRM() throws Exception { nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, - rmStartIntervalMS, true); + rmStartIntervalMS, true,labelsFetcher); return nodeStatusUpdater; } }; @@ -1148,10 +1169,11 @@ protected NodeStatusUpdater createUpdater(Context context, nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, rmStartIntervalMS, - false); + false,labelsFetcher); return nodeStatusUpdater; } }; @@ -1195,9 +1217,10 @@ public void testNoRegistrationWhenNMServicesFail() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics, labelsFetcher); } @Override @@ -1261,10 +1284,11 @@ public void testCompletedContainerStatusBackup() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { MyNodeStatusUpdater2 myNodeStatusUpdater = new MyNodeStatusUpdater2(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); return myNodeStatusUpdater; } @@ -1338,9 +1362,10 @@ public void testRMVersionLessThanMinimum() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL; myResourceTracker2.rmVersion = "3.0.0"; @@ -1498,9 +1523,11 @@ private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsFetcher) { + MyNodeStatusUpdater myNodeStatusUpdater = + new MyNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; myNodeStatusUpdater.resourceTracker = myResourceTracker2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..a0eb1ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -118,7 +118,7 @@ public int getHttpPort() { protected final long DUMMY_RM_IDENTIFIER = 1234; protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { + context, new AsyncDispatcher(), null, metrics,null) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f5583bc..6149067 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,6 +100,8 @@ private int minAllocMb; private int minAllocVcores; + private boolean isDecentralizedNodeLabelsConf; + static { resync.setNodeAction(NodeAction.RESYNC); @@ -145,7 +150,9 @@ protected void serviceInit(Configuration conf) throws Exception { minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); - + + isDecentralizedNodeLabelsConf = conf.getBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false); super.serviceInit(conf); } @@ -329,11 +336,46 @@ public RegisterNodeManagerResponse registerNodeManager( } } } + + //Update node's labels to RM's NodeLabelManager. + Set nodeLabels=null; + if (isDecentralizedNodeLabelsConf) { + nodeLabels = request.getNodeLabels(); + String errorMessage = null; + if (null == nodeLabels) { + errorMessage = + "NodeManager from " + host + + " needs to be configured in Decentralized Node Labels" + + " Configuration when RM is configured in Decentralized Node" + + " Labels Configuration, Sending SHUTDOWN" + + " signal to the NodeManager."; + } else if (null != nodeLabels) { + Map> labelsUpdate = + new HashMap>(); + labelsUpdate.put(rmNode.getNodeID(), nodeLabels); + try { + this.rmContext.getNodeLabelManager() + .replaceLabelsOnNode(labelsUpdate); + } catch (IOException ex) { + errorMessage = + "Node Labels of NodeManager from " + host + + " is not properly configured: " + ex.getMessage() + + ", Sending SHUTDOWN signal to the NodeManager."; + } + if (errorMessage != null) { + LOG.info(errorMessage); + response.setDiagnosticsMessage(errorMessage); + response.setNodeAction(NodeAction.SHUTDOWN); + return response; + } + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " + httpPort + ") " + "registered with capability: " + capability - + ", assigned nodeId " + nodeId; + + ", assigned nodeId " + nodeId + ", node labels { " + + nodeLabels.toString()+" } "; LOG.info(message); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); @@ -415,10 +457,28 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + + // 5. Update node's labels to RM's NodeLabelManager. + if (isDecentralizedNodeLabelsConf) { + Set nodeLabels = request.getNodeLabels(); + if (null != nodeLabels) { + Map> labelsUpdate = + new HashMap>(); + labelsUpdate.put(rmNode.getNodeID(), nodeLabels); + try { + this.rmContext.getNodeLabelManager() + .replaceLabelsOnNode(labelsUpdate); + } catch (IOException ex) { + nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage()); + LOG.info("Node Labels from Node " + rmNode.getNodeID() + + " failed to get validated by RM : " + ex.getMessage()); + } + } + } return nodeHeartBeatResponse; } - + private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index e83d601..ca6382e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -57,12 +57,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -592,9 +592,10 @@ protected void doSecureLogin() throws IOException { private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, nodeLabelsProviderService) { @Override protected ResourceTracker getRMClient() { final ResourceTrackerService rt =