diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..1091636 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1586,6 +1586,29 @@ private static void addDeprecatedKeys() { NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED = + NODE_LABELS_PREFIX + "decentralized-configuration.enabled"; + public static final boolean DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED = + false; + + /** Configurations in NodeManager for NodeLabelsFeature*/ + private static final String NM_NODE_LABELS_PREFIX = NODE_LABELS_PREFIX + + "nm."; + + public static final String NM_NODE_LABELS_PROVIDER_CLASS = + NM_NODE_LABELS_PREFIX + "node-labels-provider.class"; + + private static final String NM_NODE_LABELS_CONFIG_BASED_PREFIX = + NM_NODE_LABELS_PREFIX + "config-based."; + + public static final String NM_NODE_LABELS_CONFIG_BASED_FETCH_INTERVAL_MS = + NM_NODE_LABELS_CONFIG_BASED_PREFIX + "interval-ms"; + + public static final String NM_NODE_LABELS_FROM_CONFIG = + NM_NODE_LABELS_CONFIG_BASED_PREFIX + "node-labels"; + + public static final long DEFAULT_NM_NODE_LABELS_FETCH_INTERVAL_MS = + 10 * 60 * 1000; public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..f6df3af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception { NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null,false); resourceTracker.nodeHeartbeat(request2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index daefe8d..bbea243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -649,7 +649,7 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) } } - private void checkAndThrowLabelName(String label) throws IOException { + public static void checkAndThrowLabelName(String label) throws IOException { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { throw new IOException("label added is empty or exceeds " + MAX_LABEL_LENGTH + " character(s)"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..cabc1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, + boolean isNodeLabelsUpdated) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +37,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); + nodeHeartbeatRequest.setIsNodeLabelsUpdated(isNodeLabelsUpdated); return nodeHeartbeatRequest; } @@ -45,4 +50,10 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); + + public abstract boolean isNodeLabelsUpdated(); + public abstract void setIsNodeLabelsUpdated(boolean isNodeLabelsUpdated); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 9fb44ca..6f4c830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -67,4 +67,7 @@ void setSystemCredentialsForApps( Map systemCredentials); + + boolean getIsNodeLabelsAcceptedByRM(); + void setIsNodeLabelsAcceptedByRM(boolean rmAcceptNodeLabelsUpdate); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 0e3d7e4..00dd5a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -26,11 +27,19 @@ import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { - + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications,Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +48,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +57,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index b20803f..9010685 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -45,4 +45,7 @@ void setRMVersion(String version); String getRMVersion(); + + boolean getIsNodeLabelsAccpetedByRM(); + void setIsNodeLabelsAcceptedByRM(boolean rmAcceptNodeLabels); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..3c49e53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; @@ -36,6 +39,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +84,10 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private void mergeLocalToProto() { @@ -178,4 +186,40 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels() { + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + + @Override + public boolean isNodeLabelsUpdated() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getNodeLabelsUpdated(); + } + + @Override + public void setIsNodeLabelsUpdated(boolean isNodeLabelsUpdated) { + maybeInitBuilder(); + this.builder.setNodeLabelsUpdated(isNodeLabelsUpdated); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 1e91514..9084ef6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -482,5 +482,18 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + + @Override + public boolean getIsNodeLabelsAcceptedByRM() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getRmAcceptNodeLabelsUpdate(); + } + + @Override + public void setIsNodeLabelsAcceptedByRM(boolean rmAcceptNodeLabelsUpdate) { + maybeInitBuilder(); + this.builder.setRmAcceptNodeLabelsUpdate(rmAcceptNodeLabelsUpdate); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..4585892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,23 +20,18 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -56,6 +51,7 @@ private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -86,7 +82,10 @@ private void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +291,29 @@ public void setNMVersion(String version) { builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels() { + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index ac329ed..a68ee9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -216,4 +216,17 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public boolean getIsNodeLabelsAccpetedByRM() { + RegisterNodeManagerResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getRmAcceptNodeLabels(); + } + + @Override + public void setIsNodeLabelsAcceptedByRM(boolean rmAcceptNodeLabels) { + maybeInitBuilder(); + this.builder.setRmAcceptNodeLabels(rmAcceptNodeLabels); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 91473c5..50c70b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; + repeated string nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -41,12 +42,15 @@ message RegisterNodeManagerResponseProto { optional int64 rm_identifier = 4; optional string diagnostics_message = 5; optional string rm_version = 6; + optional bool rmAcceptNodeLabels = 7 [default = false]; } message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated string nodeLabels = 4; + optional bool nodeLabelsUpdated = 5 [default = false]; } message NodeHeartbeatResponseProto { @@ -60,6 +64,7 @@ message NodeHeartbeatResponseProto { optional string diagnostics_message = 8; repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; + optional bool rmAcceptNodeLabelsUpdate = 11 [default = false]; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 20983b6..8437d8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; @@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.junit.Assert; import org.junit.Test; /** @@ -89,11 +92,27 @@ public void testNodeHeartbeatRequestPBImpl() { original.setLastKnownContainerTokenMasterKey(getMasterKey()); original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); + original.setNodeLabels(getValidNodeLabels()); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + // check labels are coming with valid values + Assert.assertEquals(true, + original.getNodeLabels().containsAll(copy.getNodeLabels())); + } + + /** + * Test NodeHeartbeatRequestPBImpl. + */ + @Test + public void testNodeHeartbeatRequestPBImplWithNullLabels() { + NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl(); + original.setNodeLabels(null); + NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( + original.getProto()); + Assert.assertEquals(0,copy.getNodeLabels().size()); } /** @@ -119,6 +138,16 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(false, copy.getIsNodeLabelsAcceptedByRM()); + } + + @Test + public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() { + NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); + original.setIsNodeLabelsAcceptedByRM(true); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( + original.getProto()); + assertEquals(true, copy.getIsNodeLabelsAcceptedByRM()); } /** @@ -208,6 +237,64 @@ public void testNodeStatusPBImpl() { } + @Test + public void testRegisterNodeManagerRequestWithLabels() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with no values + Assert.assertEquals(0, request1.getNodeLabels().size()); + } + + @Test + public void testRegisterNodeManagerRequestWithNullLabels() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with no values + Assert.assertEquals(0, request1.getNodeLabels().size()); + } + + @Test + public void testRegisterNodeManagerRequestWithValidLabels() { + HashSet nodeLabels = getValidNodeLabels(); + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, nodeLabels); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with valid values + Assert.assertEquals(true,nodeLabels.containsAll(request1.getNodeLabels())); + } + + private HashSet getValidNodeLabels() { + HashSet nodeLabels = new HashSet(); + nodeLabels.add("java"); + nodeLabels.add("windows"); + nodeLabels.add("gpu"); + nodeLabels.add("x86"); + return nodeLabels; + } + private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { ContainerStatus status = recordFactory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 53cbb11..512006d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -78,6 +80,7 @@ protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsProviderService nodeLabelsProviderService; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -95,8 +98,34 @@ public NodeManager() { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics,nodeLabelsProviderService); + } + + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + metrics,nodeLabelsProviderService); + } + + /** + * Useful for testing hence making it static method + */ + protected NodeLabelsProviderService createNodeLabelsProviderService( + Configuration conf) throws InstantiationException, IllegalAccessException { + NodeLabelsProviderService provider = null; + if (conf + .getBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, + YarnConfiguration.DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED)) { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + ConfigurationNodeLabelsProvider.class, + NodeLabelsProviderService.class); + provider = labelsProviderClass.newInstance(); + } + return provider; } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -223,8 +252,17 @@ protected void serviceInit(Configuration conf) throws Exception { this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + nodeLabelsProviderService = createNodeLabelsProviderService(conf); + + if (null == nodeLabelsProviderService) { + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + } else { + addService(nodeLabelsProviderService); + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsProviderService); + } NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f561dbb..149d370 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -117,10 +120,23 @@ private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; Set pendingContainersToRemove = new HashSet(); + private final NodeLabelsProviderService nodeLabelsProvider; + private final boolean isDecentralizedNodeLabelsConf; + + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, null); + } + + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsProvider = nodeLabelsProviderService; + this.isDecentralizedNodeLabelsConf = + (nodeLabelsProvider != null); this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -248,9 +264,13 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); + Set nodeLabels = + (!isDecentralizedNodeLabelsConf) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : nodeLabelsProvider.getNodeLabels(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + nodeManagerVersionId, containerReports, getRunningApplications(), + nodeLabels); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -301,6 +321,13 @@ protected void registerWithRM() this.context.getNMTokenSecretManager().setMasterKey(masterKey); } + if (isDecentralizedNodeLabelsConf + && !regNMResponse.getIsNodeLabelsAccpetedByRM()) { + LOG.error("Failed to register Nodelabels {" + + StringUtils.join(", ", nodeLabels) + "} for NodeManager " + + this.nodeId); + } + LOG.info("Registered with ResourceManager as " + this.nodeId + " with total resource of " + this.totalResource); LOG.info("Notifying ContainerManager to unblock new container-requests"); @@ -491,7 +518,7 @@ private void trackAppForKeepAlive(ApplicationId appId) { @Override public void sendOutofBandHeartBeat() { synchronized (this.heartbeatMonitor) { - this.heartbeatMonitor.notify(); + this.heartbeatMonitor.notifyAll(); } } @@ -567,18 +594,38 @@ protected void startStatusUpdater() { @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; + Set nodeLabelsLastUpdatedToRM = + (!isDecentralizedNodeLabelsConf) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : nodeLabelsProvider.getNodeLabels(); + boolean nodeLabelsUpdated = false; while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; + Set nodeLabelsForHeartBeat = null; NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); + if (isDecentralizedNodeLabelsConf) { + nodeLabelsForHeartBeat = nodeLabelsProvider.getNodeLabels(); + nodeLabelsForHeartBeat = + (nodeLabelsForHeartBeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET + : nodeLabelsForHeartBeat; + if (areNodeLabelsUpdated(nodeLabelsForHeartBeat, + nodeLabelsLastUpdatedToRM)) { + nodeLabelsUpdated = true; + nodeLabelsLastUpdatedToRM = nodeLabelsForHeartBeat; + } else { + nodeLabelsUpdated = false; + } + } + NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartBeat, nodeLabelsUpdated); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -607,6 +654,19 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } + + if (nodeLabelsUpdated) { + if (response.getIsNodeLabelsAcceptedByRM()) { + LOG.info("Node Labels {" + + StringUtils.join(",", nodeLabelsForHeartBeat) + + "} were Accepted by RM "); + } else { + LOG.error("Node Labels {" + + StringUtils.join(",", nodeLabelsForHeartBeat) + + "} were rejected from RM. Please check RM logs for " + + "more details "); + } + } // Explicitly put this method after checking the resync response. We // don't want to remove the completed containers before resync @@ -664,6 +724,15 @@ public void run() { } } } + + private boolean areNodeLabelsUpdated(Set nodeLabelsNew, + Set nodeLabelsOld) { + if (nodeLabelsNew.size() != nodeLabelsOld.size() + || !nodeLabelsOld.containsAll(nodeLabelsNew)) { + return true; + } + return false; + } private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..879c4ff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * Provides Node's Labels by constantly monitoring the configuration. + */ +public class ConfigurationNodeLabelsProvider extends NodeLabelsProviderService { + + private static final Log LOG = LogFactory + .getLog(ConfigurationNodeLabelsProvider.class); + + /** Timer used to schedule node health monitoring script execution */ + private Timer configurationMonitorTimer; + + public ConfigurationNodeLabelsProvider() { + super("Configuration Based NodeLabels Provider Service"); + } + + private Set nodeLabels = new HashSet(); + private long intervalTime; + // for testing purpose + long startTime = 0; + + @Override + public Set getNodeLabels() { + readLock.lock(); + try { + return nodeLabels; + } finally { + readLock.unlock(); + } + } + + protected Lock readLock; + protected Lock writeLock; + + private TimerTask timerTask; + + public TimerTask getTimerTask() { + return timerTask; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.intervalTime = + conf.getLong( + YarnConfiguration.NM_NODE_LABELS_CONFIG_BASED_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_LABELS_FETCH_INTERVAL_MS); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + updateNodeLabelsFromConfig(conf); + } + + @Override + protected void serviceStart() { + configurationMonitorTimer = + new Timer("Node Labels Configuration Monitor", true); + timerTask = new ConfigurationMonitorTimerTask(); + configurationMonitorTimer.scheduleAtFixedRate(timerTask, startTime, + intervalTime); + } + + private void updateNodeLabelsFromConfig(Configuration conf) + throws IOException { + String confLabelString = + conf.get(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, ""); + String[] nodeLabelsFromConfiguration = + (confLabelString == null || confLabelString.isEmpty()) ? new String[] {} + : StringUtils.getStrings(confLabelString); + boolean validLabels = true; + StringBuffer errorMsg = new StringBuffer(""); + for (int i = 0; i < nodeLabelsFromConfiguration.length; i++) { + try { + CommonNodeLabelsManager + .checkAndThrowLabelName(nodeLabelsFromConfiguration[i]); + } catch (IOException e) { + validLabels = false; + errorMsg.append(e.getMessage()); + errorMsg.append(" , "); + } + } + if (validLabels) { + writeLock.lock(); + try { + nodeLabels = + new HashSet(Arrays.asList(nodeLabelsFromConfiguration)); + } finally { + writeLock.unlock(); + } + } else { + throw new IOException(errorMsg.toString()); + } + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + updateNodeLabelsFromConfig(new YarnConfiguration()); + } catch (Exception e) { + LOG.error("Failed to update node Labels from configuration.xml ", e); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java new file mode 100644 index 0000000..044f5ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.Set; + +import org.apache.hadoop.service.AbstractService; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public abstract class NodeLabelsProviderService extends AbstractService { + + public NodeLabelsProviderService(String name) { + super(name); + } + + /** + * Provides the labels + * + * @return Set of node label strings applicable for a node + */ + public abstract Set getNodeLabels(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java new file mode 100644 index 0000000..0c95cf7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private NodeManager nm; + + public static MasterKey createMasterKey() { + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) + .byteValue() })); + return masterKey; + } + + @After + public void tearDown() { + if (null != nm) { + ServiceOperations.stop(nm); + } + } + + private class ResourceTrackerForLabels implements ResourceTracker { + int heartBeatID = 0; + Set labels; + boolean nodeLabelsUpdated; + + private boolean receivedNMHeartbeat = false; + private boolean receivedNMRegister = false; + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + labels = request.getNodeLabels(); + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(NodeAction.NORMAL); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + response.setIsNodeLabelsAcceptedByRM(true); + synchronized (ResourceTrackerForLabels.class) { + receivedNMRegister = true; + ResourceTrackerForLabels.class.notifyAll(); + } + return response; + } + + public void waitTillHeartBeat() { + if (receivedNMHeartbeat) { + return; + } + while (!receivedNMHeartbeat) { + synchronized (ResourceTrackerForLabels.class) { + try { + ResourceTrackerForLabels.class.wait(); + } catch (InterruptedException e) { + Assert.fail("Exception caught while waiting for HeartBeat"); + e.printStackTrace(); + } + } + } + } + + public void waitTillRegister() { + if (receivedNMRegister) { + return; + } + while (!receivedNMRegister) { + synchronized (ResourceTrackerForLabels.class) { + try { + ResourceTrackerForLabels.class.wait(); + } catch (InterruptedException e) { + Assert.fail("Exception caught while waiting for register"); + e.printStackTrace(); + } + } + } + } + + /** + * Flag to indicate received any + */ + public void resetNMHeartbeatReceiveFlag() { + synchronized (ResourceTrackerForLabels.class) { + receivedNMHeartbeat = false; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + labels = request.getNodeLabels(); + nodeLabelsUpdated = request.isNodeLabelsUpdated(); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + NodeAction.NORMAL, null, null, null, null, 1000L); + + // to ensure that heartbeats are sent only when required. + nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE); + + nhResponse.setIsNodeLabelsAcceptedByRM(nodeLabelsUpdated); + synchronized (ResourceTrackerForLabels.class) { + receivedNMHeartbeat = true; + ResourceTrackerForLabels.class.notifyAll(); + } + return nhResponse; + } + } + + public static class DummyNodeLabelsProvider extends NodeLabelsProviderService { + + @SuppressWarnings("unchecked") + private Set nodeLabels = Collections.EMPTY_SET; + + public DummyNodeLabelsProvider() { + super(DummyNodeLabelsProvider.class.getName()); + } + + @Override + public synchronized Set getNodeLabels() { + return nodeLabels; + } + + synchronized void setNodeLabels(Set nodeLabels) { + this.nodeLabels = nodeLabels; + } + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + conf.setClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + DummyNodeLabelsProvider.class, NodeLabelsProviderService.class); + return conf; + } + + private YarnConfiguration createNMConfigWithDistLabelsDisabled() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, false); + return conf; + } + + private YarnConfiguration createNMConfigWithDistLabelsWithoutClass() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + return conf; + } + + protected DummyNodeLabelsProvider labelsProviderRef; + + @Test + public void testCreationOfNodeLabelsProviderService() + throws InterruptedException { + try { + NodeManager nodeManager = new NodeManager(); + NodeLabelsProviderService labelsProviderService = + nodeManager + .createNodeLabelsProviderService(createNMConfigWithDistLabelsDisabled()); + Assert + .assertNull( + "labelsProviderService should not be initialized in Centralized Node Labels configuration ", + labelsProviderService); + } catch (Exception e) { + Assert.fail("Exception caught"); + e.printStackTrace(); + } + } + + @Test + public void testCreationOfNodeLabelsProviderWithoutClassConfig() + throws InterruptedException { + try { + NodeManager nodeManager = new NodeManager(); + NodeLabelsProviderService labelsProviderService = + nodeManager + .createNodeLabelsProviderService(createNMConfigWithDistLabelsWithoutClass()); + Assert + .assertTrue( + "Distribution Node Labels should be disabled as per configuration", + (labelsProviderService != null && labelsProviderService instanceof ConfigurationNodeLabelsProvider)); + } catch (Exception e) { + Assert.fail("Exception caught"); + e.printStackTrace(); + } + } + + @Test + public void testNMRegistrationWithLabels() throws InterruptedException { + final ResourceTrackerForLabels resourceTracker = + new ResourceTrackerForLabels(); + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsProvider) { + + Assert.assertTrue( + "NodeLabelsProviderService not set as per Configuration", + labelsProvider instanceof DummyNodeLabelsProvider); + + labelsProviderRef = (DummyNodeLabelsProvider) labelsProvider; + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, labelsProvider) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + resourceTracker.resetNMHeartbeatReceiveFlag(); + nm.start(); + resourceTracker.waitTillRegister(); + assertCollectionEquals(resourceTracker.labels, + labelsProviderRef.getNodeLabels()); + + resourceTracker.waitTillHeartBeat();// wait till the first heartbeat + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // heartbeat with updated labels + labelsProviderRef.setNodeLabels(toSet("P", "X", "z")); + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartBeat(); + Assert.assertTrue("Labels needs to be updated along with the heartbeat", + resourceTracker.nodeLabelsUpdated); + assertCollectionEquals(resourceTracker.labels, + labelsProviderRef.getNodeLabels()); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + // Sleep is required here as we have set nextHeartBeatInterval to max value + // and sendOutofBandHeartBeat notifies the Nodestatus updater thread so if + // the + // NodestatusUpdater thread has not gone into wait state from the prev call + // then + // sendOutofBandHeartBeat will not be of any use. + Thread.sleep(5000l); + // heartbeat without updating labels + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + resourceTracker.waitTillHeartBeat(); + Assert + .assertFalse( + "No change in Labels from the NM's heartbeat but heartbeat request had nodeLabelsUpdated set to true", + resourceTracker.nodeLabelsUpdated); + resourceTracker.resetNMHeartbeatReceiveFlag(); + + nm.stop(); + labelsProviderRef = null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..ab040e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Date; +import java.util.TimerTask; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase { + + protected static File testRootDir = new File("target", + TestConfigurationNodeLabelsProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + final static File nodeLabelsConfigFile = new File(testRootDir, + "yarn-site.xml"); + + private static XMLPathClassLoader loader; + + private ConfigurationNodeLabelsProvider nodeLabelsProvider; + + @Before + public void setup() { + loader = + new XMLPathClassLoader( + TestConfigurationNodeLabelsProvider.class.getClassLoader()); + testRootDir.mkdirs(); + + Configuration conf = getConfForNodeLabels(); + nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); + nodeLabelsProvider.init(conf); + // To delay the timer to run and we call timerTask.run manually + nodeLabelsProvider.startTime = new Date().getTime() + 1 * 60 * 60 * 1000l; + } + + @After + public void tearDown() throws Exception { + if (nodeLabelsProvider != null) { + nodeLabelsProvider.close(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + } + + private Configuration getConfForNodeLabels() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, "A,B,CX"); + return conf; + } + + @Test + public void testNodeLabelsFromConfig() throws IOException, + InterruptedException { + // test for ensuring labels are set during initialization of the class + nodeLabelsProvider.start(); + Thread.sleep(5000l); // sleep so that timer has run once during + // initialization + assertCollectionEquals(toSet("A", "B", "CX"), + nodeLabelsProvider.getNodeLabels()); + + // test for valid Modification + TimerTask timerTask = nodeLabelsProvider.getTimerTask(); + modifyConfAndCallTimer(timerTask, "X,y,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + + // test for Invalid Modification. Provider is expected to return the last + // read labels + modifyConfAndCallTimer(timerTask, "A,#Xy,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + nodeLabelsProvider.close(); + } + + private static void modifyConfAndCallTimer(TimerTask timerTask, + String nodeLabels) throws FileNotFoundException, IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, nodeLabels); + conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); + ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(loader); + timerTask.run(); + } finally { + Thread.currentThread().setContextClassLoader(actualLoader); + } + } + + private static class XMLPathClassLoader extends ClassLoader { + public XMLPathClassLoader(ClassLoader wrapper) { + super(wrapper); + } + + public URL getResource(String name) { + if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) { + try { + return nodeLabelsConfigFile.toURI().toURL(); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + } + return super.getResource(name); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 4beb895..0084328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,6 +21,9 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -31,6 +34,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -100,6 +105,8 @@ private int minAllocMb; private int minAllocVcores; + private boolean isDecentralizedNodeLabelsConf; + static { resync.setNodeAction(NodeAction.RESYNC); @@ -148,7 +155,11 @@ protected void serviceInit(Configuration conf) throws Exception { minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); - + + isDecentralizedNodeLabelsConf = + conf.getBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, + YarnConfiguration.DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED); super.serviceInit(conf); } @@ -332,12 +343,36 @@ public RegisterNodeManagerResponse registerNodeManager( } } } + + // Update node's labels to RM's NodeLabelManager. + boolean nodeLabelsRegistered = false; + Set nodeLabels = CommonNodeLabelsManager.EMPTY_STRING_SET; + if (isDecentralizedNodeLabelsConf) { + try { + updateNodeLabelsInNodeLabelsManager(request.getNodeLabels(), rmNode); + response.setIsNodeLabelsAcceptedByRM(true); + nodeLabelsRegistered = true; + } catch (IOException ex) { + LOG.error("Node Labels {" + StringUtils.join(",", nodeLabels) + + "} of NodeManager from " + host + + " is not properly configured: " + ex.getMessage() + " "); + response.setIsNodeLabelsAcceptedByRM(false); + nodeLabels = CommonNodeLabelsManager.EMPTY_STRING_SET; + } + } - String message = - "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " - + httpPort + ") " + "registered with capability: " + capability - + ", assigned nodeId " + nodeId; - LOG.info(message); + StringBuilder message = new StringBuilder(); + message.append("NodeManager from node ").append(host).append("(cmPort: ") + .append(cmPort).append(" httpPort: "); + message.append(httpPort).append(") ") + .append("registered with capability: ").append(capability); + message.append(", assigned nodeId ").append(nodeId); + if (nodeLabelsRegistered) { + message.append(", node labels { ").append( + StringUtils.join(",", nodeLabels) + " } "); + } + + LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); response.setRMVersion(YarnVersionInfo.getVersion()); @@ -424,10 +459,31 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + + // 5. Update node's labels to RM's NodeLabelManager. + if (isDecentralizedNodeLabelsConf && request.isNodeLabelsUpdated()) { + try { + updateNodeLabelsInNodeLabelsManager(request.getNodeLabels(),rmNode); + nodeHeartBeatResponse.setIsNodeLabelsAcceptedByRM(true); + } catch (IOException ex) { + nodeHeartBeatResponse.setIsNodeLabelsAcceptedByRM(false); + LOG.error("Node Labels from Node " + rmNode.getNodeID() + + " failed to get validated by RM : " + ex.getMessage()); + } + } return nodeHeartBeatResponse; } + private void updateNodeLabelsInNodeLabelsManager(Set nodeLabels, + RMNode rmNode) throws IOException { + Map> labelsUpdate = new HashMap>(); + labelsUpdate.put(rmNode.getNodeID(), nodeLabels); + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + LOG.info("Node Labels {" + StringUtils.join(",", nodeLabels) + + "} from Node " + rmNode.getNodeID() + " were Accepted from RM"); + } + private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 7c12848..3d20051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -27,6 +27,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -49,11 +50,16 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -65,7 +71,7 @@ import org.junit.Assert; import org.junit.Test; -public class TestResourceTrackerService { +public class TestResourceTrackerService extends NodeLabelTestBase{ private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); @@ -309,6 +315,395 @@ public void testNodeRegistrationSuccess() throws Exception { } @Test + public void testNodeRegistrationWithLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(registerReq); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + registerReq.getNodeLabels()); + Assert.assertTrue("Valid Node Labels were not accepted by RM", + response.getIsNodeLabelsAccpetedByRM()); + rm.stop(); + } + + @Test + public void testNodeRegistrationWithInvalidLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(registerReq); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert.assertFalse("Node Labels should not accepted by RM If Invalid", + response.getIsNodeLabelsAccpetedByRM()); + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("Y", "#B", "Z")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert.assertFalse("Invalid Node Labels should not accepted by RM ", + response.getIsNodeLabelsAccpetedByRM()); + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testNodeRegistrationWithCentralLabelConfig() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, false); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("A", "B")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + // registered to RM wth central label config + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert + .assertFalse( + "Node Labels should not accepted by RM If its configured with Central configuration", + response.getIsNodeLabelsAccpetedByRM()); + if (rm != null) { + rm.stop(); + } + } + + @SuppressWarnings("unchecked") + private NodeStatus getNodeStatusObject(NodeId nodeId) { + NodeStatus status = Records.newRecord(NodeStatus.class); + status.setNodeId(nodeId); + status.setResponseId(0); + status.setContainersStatuses(Collections.EMPTY_LIST); + status.setKeepAliveApplications(Collections.EMPTY_LIST); + return status; + } + + @Test + public void testNodeHeartBeatWithLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + // adding valid labels + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + // Registering of labels and other required info to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A")); // Node register label + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + // modification of labels during heartbeat + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setIsNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "C")); // Node heartbeat label update + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + heartbeatReq.getNodeLabels()); + Assert.assertTrue("Valid Node Labels were not accepted by RM", + nodeHeartbeatResponse.getIsNodeLabelsAcceptedByRM()); + rm.stop(); + } + + @Test + public void testNodeHeartBeatWithInvalidLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A")); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setIsNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // response should be ok but the RMacceptNodeLabelsUpdate should be false + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + registerReq.getNodeLabels()); // no change in the labels, + Assert.assertFalse("Invalid Node Labels should not accepted by RM", + nodeHeartbeatResponse.getIsNodeLabelsAcceptedByRM());// heartbeat labels + // rejected + rm.stop(); + } + + @Test + public void testNodeHeartbeatWithCentralLabelConfig() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED, false); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(req); + + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setIsNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // response should be ok but the RMacceptNodeLabelsUpdate should be false + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); // no change in the + // labels, + Assert.assertFalse("Invalid Node Labels should not accepted by RM", + nodeHeartbeatResponse.getIsNodeLabelsAcceptedByRM());// heartbeat labels + // rejected + if (rm != null) { + rm.stop(); + } + } + + @Test public void testNodeRegistrationVersionLessThanRM() throws Exception { writeToHostsFile("host2"); Configuration conf = new Configuration(); -- 1.9.2.msysgit.0