diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e303f..3278eb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1486,6 +1486,7 @@ */ public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX + "manager-class"; + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX @@ -1495,6 +1496,31 @@ public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + /** Configurations in NodeManager for NodeLabelsFeature*/ + public static final String NM_NODE_LABELS = NODE_LABELS_PREFIX + + "nm."; + public static final String NM_NODE_LABELS_PROVIDER_CLASS = + NM_NODE_LABELS + "labels-provider.class"; + public static final String NM_SCRIPT_LABELS_PROVIDER_PREFIX = NODE_LABELS_PREFIX + + "nm.script-labels-provider."; + public static final String NM_LABELS_FETCH_SCRIPT_PATH = + NM_SCRIPT_LABELS_PROVIDER_PREFIX + "script.path"; + + public static final String NM_LABELS_FETCH_INTERVAL_MS = + NM_SCRIPT_LABELS_PROVIDER_PREFIX + "interval-ms"; + + public static final long DEFAULT_NM_LABELS_FETCH_INTERVAL_MS = 10 * 60 * 1000; + + public static final String NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS = + NM_SCRIPT_LABELS_PROVIDER_PREFIX + "timeout-ms"; + + public static final long DEFAULT_NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS = + DEFAULT_NM_LABELS_FETCH_INTERVAL_MS * 2; + + public static final String NM_LABELS_FETCH_SCRIPT_OPTS = + NM_SCRIPT_LABELS_PROVIDER_PREFIX + "script.opts"; + /** End of Configurations in NodeManager for NodeLabelsFeature*/ + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..ecc8d9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null, null); + YarnVersionInfo.getVersion(), null, null,null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception { NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..a903ea6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey,Set nodeLabels) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +36,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } @@ -45,4 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 0e3d7e4..011055e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -30,7 +31,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, - List runningApplications) { + List runningApplications,Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +49,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..85e9389 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -36,6 +40,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +85,10 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private void mergeLocalToProto() { @@ -178,4 +187,27 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..2c58eab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,23 +20,18 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -56,6 +51,7 @@ private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -86,7 +82,10 @@ private void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +291,29 @@ public void setNMVersion(String version) { builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -323,4 +345,5 @@ private NMContainerStatusPBImpl convertFromProtoFormat(NMContainerStatusProto c) private NMContainerStatusProto convertToProtoFormat(NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb..33919f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; + repeated string nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -47,6 +48,7 @@ message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated string nodeLabels = 4; } message NodeHeartbeatResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 7165445..7f5e4db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.junit.Assert; @@ -78,7 +77,7 @@ public void testRegisterNodeManagerRequest() { RegisterNodeManagerRequest.newInstance( NodeId.newInstance("1.1.1.1", 1000), 8080, Resource.newInstance(1024, 1), "NM-version-id", reports, - Arrays.asList(appId)); + Arrays.asList(appId),new HashSet()); RegisterNodeManagerRequest requestProto = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java index fdacd92..672b9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,7 +45,7 @@ public void testRegisterNodeManagerRequest() { ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", -1, Priority.newInstance(0), 1234)), Arrays.asList( ApplicationId.newInstance(1234L, 1), - ApplicationId.newInstance(1234L, 2))); + ApplicationId.newInstance(1234L, 2)),new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = @@ -67,8 +68,9 @@ public void testRegisterNodeManagerRequest() { @Test public void testRegisterNodeManagerRequestWithNullArrays() { RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), - 1234, Resource.newInstance(0, 0), "version", null, null); + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java new file mode 100644 index 0000000..42a547c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.nodelabel.NodeLabelsProvider; +//import org.apache.hadoop.yarn.server.nodemanager.nodelabel.ScriptBasedNodeLabelsProvider; + +/** + * The class which provides functionality of fetching the labels of the node and + * providing them back to the service requesting it + * + */ +public class NodeLabelsFetcherService extends CompositeService { + //private static Log LOG = LogFactory.getLog(NodeLabelsFetcherService.class); + + private NodeLabelsProvider labelsProvider; + + public NodeLabelsFetcherService() { + super("Node Label Fetcher Service"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.labelsProvider = getNodeLabelsProvider(conf); + super.serviceInit(conf); + } + + public Set getNodeLabels() { + return labelsProvider.getNodeLabels(); + } + + private NodeLabelsProvider getNodeLabelsProvider(Configuration conf) + throws InstantiationException, IllegalAccessException { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + ConfigurationNodeLabelsProvider.class, NodeLabelsProvider.class); + NodeLabelsProvider provider = labelsProviderClass.newInstance(); + provider.initialize(conf); + if (org.apache.hadoop.service.Service.class + .isAssignableFrom(labelsProviderClass)) { + addService((Service) provider); + } + return provider; + } + + /** + * If script is not configured then get the labels from the configuration file + */ + private class ConfigurationNodeLabelsProvider implements NodeLabelsProvider { + private Set nodeLabels; + + @Override + public Set getNodeLabels() { + return nodeLabels; + } + + @Override + public void initialize(Configuration conf) { + String[] nodeLabelsArr = + StringUtils + .getStrings(conf.get(YarnConfiguration.NM_NODE_LABELS, "")); + nodeLabels = new HashSet(Arrays.asList(nodeLabelsArr)); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 43770c1..10f1a76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -75,6 +75,7 @@ protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsFetcherService nodeLabelsFetcher; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -91,9 +92,10 @@ public NodeManager() { } protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService nodeLabelsFetcher) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics,nodeLabelsFetcher); } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -220,8 +222,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); + nodeLabelsFetcher = new NodeLabelsFetcherService(); + nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsFetcher); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index bed58f5..72265c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -112,10 +112,14 @@ private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + private final NodeLabelsFetcherService nodeLabelsFetcher; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService nodeLabelsFetcher) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsFetcher = nodeLabelsFetcher; this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -245,7 +249,8 @@ protected void registerWithRM() List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + nodeManagerVersionId, containerReports, getRunningApplications(), + nodeLabelsFetcher.getNodeLabels()); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -543,7 +548,7 @@ public void run() { NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + .getCurrentKey(),nodeLabelsFetcher.getNodeLabels()); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java new file mode 100644 index 0000000..145b3ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabel; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public interface NodeLabelsProvider { + /** + * Provides the labels + * @return + */ + public Set getNodeLabels(); + + /** + * Initializes the Labels provider with the given configurations + * @param conf + */ + public void initialize(Configuration conf); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index 3f4091c..874b2d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -52,8 +52,9 @@ private ResourceTracker resourceTracker; public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelFetcher) { + super(context, dispatcher, healthChecker, metrics, labelFetcher); resourceTracker = createResourceTracker(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index fabb03b..7da7553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -100,10 +100,13 @@ public int getHttpPort() { Dispatcher dispatcher = new AsyncDispatcher(); NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); healthChecker.init(conf); + NodeLabelsFetcherService labelsFetcher = new NodeLabelsFetcherService(); + labelsFetcher.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeStatusUpdater nodeStatusUpdater = - new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { + new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, + labelsFetcher) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index e69170e..218f557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -269,9 +269,11 @@ public MyNodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); return myNodeStatusUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 85bafb3..30904ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -226,9 +226,10 @@ public void testNMSentContainerStatusOnResync() throws Exception { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterResync(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics,labelsFetcher) { @Override protected ResourceTracker createResourceTracker() { return new MockResourceTracker() { @@ -308,8 +309,9 @@ public NodeHeartbeatResponse nodeHeartbeat( // This can be used as a common base class for testing NM resync behavior. class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater { public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override protected void rebootNodeStatusUpdaterAndRegisterWithRM() { @@ -355,9 +357,10 @@ public void setExistingContainerId(ContainerId cId) { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl1(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,labelsFetcher); } public int getNMRegistrationCount() { @@ -367,8 +370,9 @@ public int getNMRegistrationCount() { class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override @@ -423,9 +427,10 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { Thread launchContainersThread = null; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl2(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,labelsFetcher); } @Override @@ -466,8 +471,9 @@ public void setBlockNewContainerRequests( class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics, labelsFetcher); } @Override @@ -553,9 +559,10 @@ public void run() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); } public int getNMRegistrationCount() { @@ -573,8 +580,9 @@ protected void shutDown() { class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index c44f7b8..3253566 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -309,9 +309,11 @@ private static File createUnhaltingScriptFile(ContainerId cId, @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); return myNodeStatusUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 7593ce6..e79079d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -289,8 +289,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.context = context; resourceTracker = new MyResourceTracker(this.context); } @@ -312,8 +313,9 @@ protected void stopRMProxy() { public ResourceTracker resourceTracker; public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); resourceTracker = new MyResourceTracker4(context); } @@ -333,8 +335,9 @@ protected void stopRMProxy() { private Context context; public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.context = context; this.resourceTracker = new MyResourceTracker3(this.context); } @@ -362,8 +365,9 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { public ResourceTracker resourceTracker; public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - long rmStartIntervalMS, boolean rmNeverStart) { - super(context, dispatcher, healthChecker, metrics); + long rmStartIntervalMS, boolean rmNeverStart, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; } @@ -401,8 +405,9 @@ protected void stopRMProxy() { private Configuration conf; public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); resourceTracker = new MyResourceTracker5(); this.conf = conf; } @@ -425,9 +430,11 @@ protected void stopRMProxy() { private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { this.nodeStatusUpdater = - new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); + new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics, + labelsFetcher); return this.nodeStatusUpdater; } @@ -448,10 +455,11 @@ public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { nodeStatusUpdater = new MyNodeStatusUpdater5(context, dispatcher, healthChecker, - metrics, conf); + metrics, conf,labelsFetcher); return nodeStatusUpdater; } @@ -917,9 +925,10 @@ public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); } }; @@ -978,9 +987,10 @@ public void testStopReentrant() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; myNodeStatusUpdater.resourceTracker = myResourceTracker2; @@ -1063,8 +1073,9 @@ private NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker) { - updater = createUpdater(context, dispatcher, healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { + updater = createUpdater(context, dispatcher, healthChecker,labelsFetcher); return updater; } @@ -1074,7 +1085,8 @@ public NodeStatusUpdater getUpdater() { abstract NodeStatusUpdater createUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher); } @Test @@ -1083,9 +1095,10 @@ public void testNMShutdownForRegistrationFailure() throws Exception { nm = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; myResourceTracker2.shutDownMessage = "RM Shutting Down Node"; @@ -1116,10 +1129,11 @@ public void testNMConnectionToRM() throws Exception { nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, - rmStartIntervalMS, true); + rmStartIntervalMS, true,labelsFetcher); return nodeStatusUpdater; } }; @@ -1148,10 +1162,11 @@ protected NodeStatusUpdater createUpdater(Context context, nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, rmStartIntervalMS, - false); + false,labelsFetcher); return nodeStatusUpdater; } }; @@ -1195,9 +1210,10 @@ public void testNoRegistrationWhenNMServicesFail() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics, labelsFetcher); } @Override @@ -1261,10 +1277,11 @@ public void testCompletedContainerStatusBackup() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater2 myNodeStatusUpdater = new MyNodeStatusUpdater2(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); return myNodeStatusUpdater; } @@ -1338,9 +1355,10 @@ public void testRMVersionLessThanMinimum() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL; myResourceTracker2.rmVersion = "3.0.0"; @@ -1498,9 +1516,11 @@ private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { + MyNodeStatusUpdater myNodeStatusUpdater = + new MyNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; myNodeStatusUpdater.resourceTracker = myResourceTracker2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..a0eb1ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -118,7 +118,7 @@ public int getHttpPort() { protected final long DUMMY_RM_IDENTIFIER = 1234; protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { + context, new AsyncDispatcher(), null, metrics,null) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f5583bc..bfff124 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,7 +148,6 @@ protected void serviceInit(Configuration conf) throws Exception { minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); - super.serviceInit(conf); } @@ -329,6 +331,13 @@ public RegisterNodeManagerResponse registerNodeManager( } } } + + //Update node's labels to RM's NodeLabelManager. + if(updateNodeLabels(rmNode.getNodeID(),request.getNodeLabels())){ + // TODO add code to update node register response with a flag accepting the + // labels so that it can further be used in NM side to send + // labels only when there is any change + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " @@ -415,9 +424,27 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + + // 5. Update node's labels to RM's NodeLabelManager. + if(updateNodeLabels(rmNode.getNodeID(),request.getNodeLabels())){ + // TODO add code to update heart beat response with a flag accepting the + // change in labels so that it can further be used in NM side to send + // labels only when there is any change + } return nodeHeartBeatResponse; } + + private boolean updateNodeLabels(NodeId node, Set nodeLabels) + throws IOException { + if (null != nodeLabels) { + Map> labelUpdate = new HashMap>(); + labelUpdate.put(node, nodeLabels); + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelUpdate); + return true; + } + return false; + } private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index e83d601..16630c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeLabelsFetcherService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; @@ -592,9 +592,9 @@ protected void doSecureLogin() throws IOException { private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker,NodeLabelsFetcherService labelFetcher) { return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics, labelFetcher) { @Override protected ResourceTracker getRMClient() { final ResourceTrackerService rt =